I can finally chain Processor calls
This commit is contained in:
189
src/core/preprocessor.py
Normal file
189
src/core/preprocessor.py
Normal file
@@ -0,0 +1,189 @@
|
||||
from arpeggio import RegExMatch, ZeroOrMore, OneOrMore, ParserPython, EOF, NoMatch
|
||||
|
||||
|
||||
class VariableParsingError(Exception):
|
||||
"""Custom exception for variable parsing errors"""
|
||||
|
||||
def __init__(self, message, position):
|
||||
self.message = message
|
||||
self.position = position
|
||||
super().__init__(f"Variable parsing error at position {position}: {message}")
|
||||
|
||||
|
||||
class VariableProcessingError(Exception):
|
||||
"""Custom exception for variable parsing errors"""
|
||||
|
||||
def __init__(self, message, position):
|
||||
self.message = message
|
||||
self.position = position
|
||||
super().__init__(f"Variable processing error at position {position}: {message}")
|
||||
|
||||
|
||||
def variable_name():
|
||||
"""Variable name: alphanumeric characters and underscores"""
|
||||
return RegExMatch(r'[a-zA-Z_][a-zA-Z0-9_]*')
|
||||
|
||||
|
||||
def property_name():
|
||||
"""Property name: same rules as variable name"""
|
||||
return RegExMatch(r'[a-zA-Z_][a-zA-Z0-9_]*')
|
||||
|
||||
|
||||
def variable_property():
|
||||
"""A property access: .property_name"""
|
||||
return ".", property_name
|
||||
|
||||
|
||||
def variable():
|
||||
"""A complete variable: $variable_name(.property)*"""
|
||||
return "$", variable_name, ZeroOrMore(variable_property)
|
||||
|
||||
|
||||
def text_char():
|
||||
"""Any character that is not the start of a variable"""
|
||||
return RegExMatch(r'[^$]')
|
||||
|
||||
|
||||
def text_segment():
|
||||
"""One or more non-variable characters"""
|
||||
return OneOrMore(text_char)
|
||||
|
||||
|
||||
def element():
|
||||
"""Either a variable or a text segment"""
|
||||
return [variable, text_segment]
|
||||
|
||||
|
||||
def expression():
|
||||
"""Complete expression: sequence of elements"""
|
||||
return ZeroOrMore(element), EOF
|
||||
|
||||
|
||||
class PlainTextPreprocessor:
|
||||
def __init__(self):
|
||||
self.parser = ParserPython(expression, debug=False, skipws=False)
|
||||
|
||||
@staticmethod
|
||||
def _post_validation(elements):
|
||||
if len(elements) < 2:
|
||||
return
|
||||
|
||||
for element, next_element in [(element, elements[i + 1]) for i, element in enumerate(elements[:-1])]:
|
||||
if element['type'] == 'variable' and next_element['type'] == 'variable':
|
||||
raise VariableParsingError("Invalid syntax.", next_element['start'])
|
||||
|
||||
@staticmethod
|
||||
def _extract_elements_from_tree(parse_tree, original_text):
|
||||
"""Extract elements with positions from the parse tree"""
|
||||
elements = []
|
||||
|
||||
def process_node(node, current_pos=0):
|
||||
nonlocal elements
|
||||
|
||||
if hasattr(node, 'rule_name'):
|
||||
if node.rule_name == 'variable':
|
||||
# Extract variable information
|
||||
var_start = node.position
|
||||
var_end = node.position_end
|
||||
var_text = original_text[var_start:var_end]
|
||||
|
||||
parts = var_text[1:].split('.') # Remove $ and split by .
|
||||
var_name = parts[0]
|
||||
properties = parts[1:] if len(parts) > 1 else []
|
||||
|
||||
elements.append({
|
||||
"type": "variable",
|
||||
"name": var_name,
|
||||
"properties": properties,
|
||||
"start": var_start,
|
||||
"end": var_end
|
||||
})
|
||||
|
||||
elif node.rule_name == 'text_segment':
|
||||
# Extract text segment
|
||||
text_start = node.position
|
||||
text_end = node.position_end
|
||||
content = original_text[text_start:text_end]
|
||||
|
||||
stripped = content.strip()
|
||||
if len(stripped) > 0 and stripped[0] == '.':
|
||||
raise VariableParsingError("Invalid syntax in property name.", text_start)
|
||||
|
||||
elements.append({
|
||||
"type": "text",
|
||||
"content": content,
|
||||
"start": text_start,
|
||||
"end": text_end
|
||||
})
|
||||
|
||||
elif node.rule_name in ('expression', 'element'):
|
||||
for child in node:
|
||||
process_node(child, current_pos)
|
||||
|
||||
# Process children
|
||||
if hasattr(node, '_tx_children') and node._tx_children:
|
||||
for child in node._tx_children:
|
||||
process_node(child, current_pos)
|
||||
|
||||
process_node(parse_tree)
|
||||
return elements
|
||||
|
||||
def parse(self, text):
|
||||
"""
|
||||
Parse text and return structure with text segments and variables with positions
|
||||
|
||||
Returns:
|
||||
[
|
||||
{"type": "text", "content": "...", "start": int, "end": int},
|
||||
{"type": "variable", "name": "...", "properties": [...], "start": int, "end": int}
|
||||
]
|
||||
"""
|
||||
if not text:
|
||||
return []
|
||||
|
||||
try:
|
||||
# Parse the text
|
||||
parse_tree = self.parser.parse(text)
|
||||
|
||||
# Extract elements from parse tree
|
||||
elements = self._extract_elements_from_tree(parse_tree, text)
|
||||
|
||||
# Extra validations
|
||||
self._post_validation(elements)
|
||||
|
||||
# Sort elements by start position
|
||||
elements.sort(key=lambda x: x['start'])
|
||||
|
||||
return elements
|
||||
|
||||
except NoMatch as e:
|
||||
# Convert Arpeggio parsing errors to our custom error
|
||||
raise VariableParsingError(f"Invalid syntax", e.position)
|
||||
except Exception as e:
|
||||
if isinstance(e, VariableParsingError):
|
||||
raise
|
||||
raise VariableParsingError(f"Parsing failed: {str(e)}", 0)
|
||||
|
||||
def preprocess(self, text, namepace):
|
||||
result = ""
|
||||
elements = self.parse(text)
|
||||
for element in elements:
|
||||
if element['type'] == 'text':
|
||||
result += element['content']
|
||||
elif element['type'] == 'variable':
|
||||
value = namepace.get(element['name'])
|
||||
if value is None:
|
||||
raise VariableProcessingError(f"Variable '{element['name']}' is not defined.", element['start'])
|
||||
|
||||
try:
|
||||
pos = element['start'] + len(element['name']) + 1 # +1 for the starting '$'
|
||||
for property_name in element['properties']:
|
||||
value = getattr(value, property_name)
|
||||
pos += len(property_name) + 1 # +1 for the dot '.'
|
||||
except AttributeError as e:
|
||||
raise VariableProcessingError(f"Invalid property '{property_name}' for variable '{element['name']}'.",
|
||||
pos) from e
|
||||
|
||||
result += str(value)
|
||||
|
||||
return result
|
||||
Reference in New Issue
Block a user