##################################################################################################### # This part of code is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio) # I don't directly use the project, but it helped me figure out # what to do. # Dejanović I., Milosavljević G., Vaderna R.: # Arpeggio: A flexible PEG parser for Python, # Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004 ##################################################################################################### from dataclasses import field, dataclass from collections import defaultdict from core.builtin_concepts import BuiltinConcepts from core.concept import Concept from core.sheerka import ExecutionContext from core.tokenizer import TokenKind, Tokenizer, Token from parsers.BaseParser import BaseParser, Node, ErrorNode import core.utils import logging log = logging.getLogger(__name__) def flatten(iterable): if iterable is None: return [] result = [] for e in iterable: if e.parsing_expression.rule_name is not None and e.parsing_expression.rule_name != "": if hasattr(e, "children"): e.children = flatten(e.children) result.append(e) elif hasattr(e, "children"): result.extend(flatten(e.children)) else: result.append(e) return result @dataclass() class LexerNode(Node): start: int end: int class ConceptNode(LexerNode): """ Returned by the ConceptLexerParser It represents a recognized concept """ def __init__(self, concept, start, end, tokens=None, source=None, children=None): super().__init__(start, end) self.concept = concept self.tokens = tokens self.source = source self.children = children if self.source is None: self.source = BaseParser.get_text_from_tokens(self.tokens) def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptNode): return False return self.concept == other.concept and \ self.source == other.source def __hash__(self): return hash((self.concept, self.start, self.end, self.source)) class NonTerminalNode(LexerNode): """ Returned by the ConceptLexerParser """ def __init__(self, parsing_expression, start, end, children=None): super().__init__(start, end) self.parsing_expression = parsing_expression self.children = children def __repr__(self): name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__ if len(self.children) > 0: sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")" else: sub_names = "" return name + sub_names class TerminalNode(LexerNode): """ Returned by the ConceptLexerParser """ def __init__(self, parsing_expression, start, end, value): super().__init__(start, end) self.parsing_expression = parsing_expression self.value = value def __repr__(self): name = self.parsing_expression.rule_name or "" return name + f"'{self.value}'" @dataclass() class GrammarErrorNode(ErrorNode): message: str @dataclass() class UnexpectedTokenErrorNode(ErrorNode): message: str expected_tokens: list @dataclass() class UnexpectedEndOfFileError(ErrorNode): pass @dataclass() class UnknownConceptNode(ErrorNode): concept_key: str @dataclass() class TooManyConceptNode(ErrorNode): concept_key: str class ParsingExpression: def __init__(self, *args, **kwargs): self.elements = args nodes = kwargs.get('nodes', []) if not hasattr(nodes, '__iter__'): nodes = [nodes] self.nodes = nodes self.rule_name = kwargs.get('rule_name', '') def __eq__(self, other): if not isinstance(other, ParsingExpression): return False return self.rule_name == other.rule_name and self.elements == other.elements def __hash__(self): return hash((self.rule_name, self.elements)) def parse(self, parser): return self._parse(parser) class Sequence(ParsingExpression): """ Will match sequence of parser expressions in exact order they are defined. """ def _parse(self, parser): init_pos = parser.pos end_pos = parser.pos children = [] for e in self.nodes: node = e.parse(parser) if node is None: return None else: if node.end != -1: # because Optional returns -1 when no match children.append(node) end_pos = node.end return NonTerminalNode(self, init_pos, end_pos, children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})" class OrderedChoice(ParsingExpression): """ Will match one among multiple It will stop at the first match (so the order of definition is important) """ def _parse(self, parser): init_pos = parser.pos for e in self.nodes: node = e.parse(parser) if node: return NonTerminalNode(self, init_pos, node.end, [node]) parser.seek(init_pos) # backtrack return None def __repr__(self): to_str = "| ".join(repr(n) for n in self.elements) return f"({to_str})" class Optional(ParsingExpression): """ Will match or not the elements if many matches, will choose longest one If you need order, use Optional(OrderedChoice) """ def _parse(self, parser): init_pos = parser.pos selected_node = NonTerminalNode(self, parser.pos, -1, []) for e in self.nodes: node = e.parse(parser) if node: if node.end > selected_node.end: selected_node = node parser.seek(init_pos) # backtrack if selected_node.end != -1: parser.seek(selected_node.end) parser.next_token() # eat the tokens found return selected_node def __repr__(self): if len(self.elements) == 1: return f"{self.elements[0]}?" else: to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})?" class ZeroOrMore(ParsingExpression): """ ZeroOrMore will try to match parser expression specified zero or more times. It will never fail. """ def _parse(self, parser): raise NotImplementedError() # Uncomment when _parse is implemented # def __repr__(self): # to_str = ", ".join(repr(n) for n in self.elements) # return f"({to_str})*" class OneOrMore(ParsingExpression): """ OneOrMore will try to match parser expression specified one or more times. """ def _parse(self, parser): raise NotImplementedError() class UnorderedGroup(ParsingExpression): """ Will try to match all of the parsing expression in any order. """ def _parse(self, parser): raise NotImplementedError() class Match(ParsingExpression): """ Base class for all classes that will try to match something from the input. """ def __init__(self, rule_name, root=False): super(Match, self).__init__(rule_name=rule_name, root=root) def parse(self, parser): result = self._parse(parser) return result class StrMatch(Match): """ Matches a literal """ def __init__(self, to_match, rule_name="", root=False, ignore_case=True): super(Match, self).__init__(rule_name=rule_name, root=root) self.to_match = to_match self.ignore_case = ignore_case def __repr__(self): return f"'{self.to_match}'" def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, StrMatch): return False return self.to_match == other.to_match and self.ignore_case == other.ignore_case def _parse(self, parser): token = parser.get_token() m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \ else token.value == self.to_match if m: node = TerminalNode(self, parser.pos, parser.pos, token.value) parser.next_token() return node return None class ConceptMatch(Match): """ Will match a concept It used only for rule definition When the grammar is created, it is replaced by the actual concept """ def __init__(self, concept_name): super(Match, self).__init__() self.concept_name = concept_name def __repr__(self): return f"{self.concept_name}" def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptMatch): return False return self.concept_name == other.concept_name class CrossRef: """ During the creation of the model, Creates reference to a concept, as it may not be resolved yet """ def __init__(self, concept): self.concept = concept def __repr__(self): return f"ref({self.concept.key})" def __eq__(self, other): if not isinstance(other, CrossRef): return False return self.concept == other.concept class ConceptLexerParser(BaseParser): def __init__(self, concepts_dict=None): super().__init__("ConceptLexer") self.concepts_dict = concepts_dict or {} # dict of concept, grammar self.ignore_case = True self.token = None self.pos = -1 self.tokens = None self.context = None self.text = None self.sheerka = None def add_error(self, error, next_token=True): self.has_error = True self.error_sink.append(error) if next_token: self.next_token() return error def reset_parser(self, context, text): self.context = context self.sheerka = context.sheerka self.text = text if isinstance(text, str): self.tokens = list(Tokenizer(text)) else: self.tokens = list(text) self.tokens.append(Token(TokenKind.EOF, "", -1, -1, -1)) # make sure to finish with end of file token self.token = None self.pos = -1 self.next_token() def get_token(self) -> Token: return self.token def next_token(self, skip_whitespace=True): if self.token and self.token.type == TokenKind.EOF: return False self.pos += 1 self.token = self.tokens[self.pos] if skip_whitespace: while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE: self.pos += 1 self.token = self.tokens[self.pos] return self.token.type != TokenKind.EOF def seek(self, pos): self.pos = pos self.token = self.tokens[self.pos] return True def rewind(self, offset, skip_whitespace=True): self.pos += offset self.token = self.tokens[self.pos] if skip_whitespace: while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE): self.pos -= 1 self.token = self.tokens[self.pos] def initialize(self, context, grammars): """ Adds a bunch of concepts, and how they can be recognized :param context: execution context :param grammars: dictionary of concept, concept_definition :return: """ self.context = context self.sheerka = context.sheerka nodes_to_resolve = [] concepts_to_resolve = set() # ## Gets the grammars for concept, concept_def in grammars.items(): concept.init_key() # make sure that the key is initialized grammar = self.get_model(concept, concept_def, nodes_to_resolve, concepts_to_resolve) self.concepts_dict[concept] = grammar if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) # ## Removes concepts with infinite recursions concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve) for concept in concepts_to_remove: concepts_to_resolve.remove(concept) del self.concepts_dict[concept] # ## Resolves cross references and remove grammar with unresolved references self.resolve_cross_references(concepts_to_resolve, nodes_to_resolve) if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) else: return self.sheerka.ret(self.name, True, self.concepts_dict) def get_model(self, concept, concept_def, nodes_to_resolve, concepts_to_resolve): def get_concept(concept_name): if concept_name in self.context.concepts_cache: return self.context.concepts_cache[concept_name] return self.sheerka.get(concept_name) # TODO # inner_get_model must not modify the initial ParsingExpression # A copy must be created def inner_get_model(expression): if isinstance(expression, Concept): ret = CrossRef(expression) concepts_to_resolve.add(concept) nodes_to_resolve.append(ret) elif isinstance(expression, str): ret = StrMatch(expression, ignore_case=self.ignore_case) elif isinstance(expression, StrMatch): ret = expression if ret.ignore_case is None: ret.ignore_case = self.ignore_case elif isinstance(expression, ConceptMatch): to_match = get_concept(expression.concept_name) if hasattr(to_match, "__iter__"): ret = self.add_error(TooManyConceptNode(expression.concept_name), False) elif self.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): ret = self.add_error(UnknownConceptNode(expression.concept_name), False) else: ret = CrossRef(to_match) concepts_to_resolve.add(concept) nodes_to_resolve.append(ret) elif isinstance(expression, Sequence) or \ isinstance(expression, OrderedChoice) or \ isinstance(expression, Optional): ret = expression ret.nodes.extend([inner_get_model(e) for e in ret.elements]) if any((isinstance(x, CrossRef) for x in ret.nodes)): concepts_to_resolve.add(concept) nodes_to_resolve.append(ret) else: ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False) return ret model = inner_get_model(concept_def) if isinstance(model, CrossRef): concepts_to_resolve.add(concept) model.rule_name = concept.key return model def detect_infinite_recursion(self, concepts_to_resolve): # infinite recursion matcher def _is_infinite_recursion(ref_concept, node): if isinstance(node, CrossRef): if node.concept == ref_concept: return True return _is_infinite_recursion(ref_concept, self.concepts_dict[node.concept]) if isinstance(node, OrderedChoice): return _is_infinite_recursion(ref_concept, node.nodes[0]) if isinstance(node, Sequence): for node in node.nodes: if _is_infinite_recursion(ref_concept, node): return True return False return False removed_concepts = [] for e in concepts_to_resolve: to_resolve = self.concepts_dict[e] if _is_infinite_recursion(e, to_resolve): removed_concepts.append(e) return removed_concepts # Cross-ref resolving def resolve_cross_references(self, concepts_to_resolve, nodes_to_resolve): repeat = True while repeat: repeat = False for e in concepts_to_resolve: to_resolve = self.concepts_dict[e] if isinstance(to_resolve, CrossRef): repeat = True self.concepts_dict[e] = self.concepts_dict[to_resolve.concept] for e in nodes_to_resolve: if not isinstance(e, ParsingExpression): continue # cases when a concept directly references another concept for i, node in enumerate(e.nodes): if isinstance(node, CrossRef): if node.concept in self.concepts_dict: e.nodes[i] = self.concepts_dict[node.concept] def parse(self, context, text): if text == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) self.reset_parser(context, text) concepts_found = [[]] # actually list of list # The first dimension is the number of possibilities found # The second dimension is the number of concepts found, under one possibility # # Example 1 # concept foo : 'one' 'two' # concept bar : 'one' 'two' # input 'one two' -> will produce two possibilities (foo and bar). # # Example 2 # concept foo : 'one' # concept bar : 'two' # input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar) while True: init_pos = self.pos res = [] for concept, grammar in self.concepts_dict.items(): self.seek(init_pos) node = grammar.parse(self) if node is not None: concept_node = ConceptNode(concept, node.start, node.end, self.tokens[node.start: node.end + 1]) if hasattr(node, "children"): concept_node.children = node.children res.append(concept_node) if len(res) == 0: # not recognized self.seek(init_pos) not_recognized = self.get_text_from_tokens(self.get_token()) self.add_error(self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=not_recognized)) break res = self.get_bests(res) # only keep the concept that eat the more tokens for r in res: r.children = flatten(r.children) concepts_found = core.utils.product(concepts_found, res) # loop self.seek(res[0].end) if not self.next_token(): break # manage when nothing is recognized (or other error) if self.has_error: return self.sheerka.ret( self.name, False, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=self.error_sink, try_parsed=concepts_found[0] if len(concepts_found) == 1 else concepts_found)) # else # returns as many ReturnValue than choices found ret = [] for choice in concepts_found: ret.append( self.sheerka.ret( self.name, True, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=choice, try_parsed=choice))) return ret[0] if len(ret) == 1 else ret @staticmethod def get_bests(results): """ Returns the result that is the longest :param results: :return: """ by_end_pos = defaultdict(list) for result in results: by_end_pos[result.end].append(result) return by_end_pos[max(by_end_pos)] class RegexParser: """ Parser used to transform litteral into ParsingExpression example : a | b, c -> Sequence(OrderedChoice(a, b) ,c) '|' (pipe) is used for OrderedChoice ',' (comma) is used for Sequence '?' (question mark) is used for Optional '*' (star) is used for ZeroOrMore '+' (plus) is used for OneOrMore """ def __init__(self): self.has_error = False self.error_sink = [] self.name = BaseParser.PREFIX + "RegexParser" self.lexer_iter = None self._current = None self.after_current = None self.nb_open_par = 0 self.context = None self.source = "" self.sheerka = None def __eq__(self, other): if not isinstance(other, RegexParser): return False return True def reset_parser(self, context, text): self.context = context self.sheerka = context.sheerka self.lexer_iter = iter(Tokenizer(text.strip())) if isinstance(text, str) else iter(text) self._current = None self.after_current = None self.nb_open_par = 0 self.next_token() self.eat_white_space() def add_error(self, error, next_token=True): self.has_error = True self.error_sink.append(error) if next_token: self.next_token() return error def get_token(self) -> Token: return self._current def next_token(self, skip_whitespace=False): if self._current and self._current.type == TokenKind.EOF: return try: self._current = self.after_current or next(self.lexer_iter) self.source += str(self._current.value) self.after_current = None if skip_whitespace: while self._current.type == TokenKind.WHITESPACE or self._current.type == TokenKind.NEWLINE: self._current = next(self.lexer_iter) self.source += str(self._current.value) except StopIteration: self._current = Token(TokenKind.EOF, "", -1, -1, -1) def next_after(self): if self.after_current is not None: return self.after_current try: self.after_current = next(self.lexer_iter) # self.source += str(self.after_current.value) return self.after_current except StopIteration: self.after_current = Token(TokenKind.EOF, "", -1, -1, -1) return self.after_current def eat_white_space(self): if self.after_current is not None: self._current = self.after_current self.source += str(self._current.value) self.after_current = None try: while self._current.type == TokenKind.WHITESPACE or self._current.type == TokenKind.NEWLINE: self._current = next(self.lexer_iter) self.source += str(self._current.value) except StopIteration: self._current = None def maybe_sequence(self, first, second): token = self.get_token() return token.type == second or token.type == first and self.next_after().type == second def parse(self, context: ExecutionContext, text): self.reset_parser(context, text) tree = self.parse_choice() ret = self.sheerka.ret( self.name, not self.has_error, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=self.source, body=self.error_sink if self.has_error else tree, try_parsed=tree)) return ret def parse_choice(self): sequence = self.parse_sequence() self.eat_white_space() token = self.get_token() if token is None or token.type == TokenKind.EOF or token.type != TokenKind.VBAR: return sequence elements = [sequence] while True: # maybe eat the vertical bar self.eat_white_space() token = self.get_token() if token is None or token.type == TokenKind.EOF or token.type != TokenKind.VBAR: break self.next_token(skip_whitespace=True) sequence = self.parse_sequence() elements.append(sequence) return OrderedChoice(*elements) def parse_sequence(self): expr_and_modifier = self.parse_expression_and_modifier() token = self.get_token() if token is None or token.type == TokenKind.EOF or \ self.maybe_sequence(TokenKind.WHITESPACE, TokenKind.VBAR) or \ self.nb_open_par > 0 and self.maybe_sequence(TokenKind.WHITESPACE, TokenKind.RPAR): return expr_and_modifier elements = [expr_and_modifier] while True: # maybe eat the comma token = self.get_token() if token is None or token.type == TokenKind.EOF or \ self.maybe_sequence(TokenKind.WHITESPACE, TokenKind.VBAR) or \ self.nb_open_par > 0 and self.maybe_sequence(TokenKind.WHITESPACE, TokenKind.RPAR): break self.eat_white_space() sequence = self.parse_expression_and_modifier() elements.append(sequence) return Sequence(*elements) def parse_expression_and_modifier(self): expression = self.parse_expression() token = self.get_token() if token.type == TokenKind.QMARK: self.next_token() return Optional(expression) if token.type == TokenKind.STAR: self.next_token() return ZeroOrMore(expression) if token.type == TokenKind.PLUS: self.next_token() return OneOrMore(expression) return expression def parse_expression(self): token = self.get_token() if token.type == TokenKind.EOF: self.add_error(UnexpectedEndOfFileError(), False) if token.type == TokenKind.LPAR: self.nb_open_par += 1 self.next_token() expression = self.parse_choice() token = self.get_token() if token.type == TokenKind.RPAR: self.nb_open_par -= 1 self.next_token() return expression else: self.add_error(UnexpectedTokenErrorNode(f"Unexpected token '{token.type}'", [TokenKind.RPAR])) return expression if token.type == TokenKind.IDENTIFIER: self.next_token() return ConceptMatch(token.value) # concept = self.sheerka.get(str(token.value)) # if hasattr(concept, "__iter__") or self.sheerka.isinstance(concept, BuiltinConcepts.UNKNOWN_CONCEPT): # self.add_error(CannotResolveConceptNode(str(token.value))) # self.next_token() # return None # else: # self.next_token() # return concept ret = StrMatch(core.utils.strip_quotes(token.value)) self.next_token() return ret class ParsingExpressionVisitor: """ visit ParsingExpression """ def visit(self, parsing_expression): name = parsing_expression.__class__.__name__ method = 'visit_' + name visitor = getattr(self, method, self.generic_visit) return visitor(parsing_expression) def generic_visit(self, parsing_expression): if hasattr(self, "visit_all"): self.visit_all(parsing_expression) for node in parsing_expression.elements: if isinstance(node, Concept): self.visit(ConceptMatch(node.key or node.name)) elif isinstance(node, str): self.visit(StrMatch(node)) else: self.visit(node)