##################################################################################################### # This part of code is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio) # I don't directly use the project, but it helped me figure out # what to do. # Dejanović I., Milosavljević G., Vaderna R.: # Arpeggio: A flexible PEG parser for Python, # Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004 ##################################################################################################### from dataclasses import field, dataclass from collections import defaultdict from core.builtin_concepts import BuiltinConcepts from core.concept import Concept from core.tokenizer import TokenKind, Tokenizer, Token from parsers.BaseParser import BaseParser, Node, ErrorNode import core.utils import logging log = logging.getLogger(__name__) def flatten(iterable): if iterable is None: return [] result = [] for e in iterable: if e.parsing_expression.rule_name is not None and e.parsing_expression.rule_name != "": if hasattr(e, "children"): e.children = flatten(e.children) result.append(e) elif hasattr(e, "children"): result.extend(flatten(e.children)) else: result.append(e) return result @dataclass() class LexerNode(Node): start: int end: int class ConceptNode(LexerNode): def __init__(self, concept, start, end, tokens=None, source=None, children=None): super().__init__(start, end) self.concept = concept self.tokens = tokens self.source = source self.children = children if self.source is None: self.source = BaseParser.get_text_from_tokens(self.tokens) def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptNode): return False return self.concept == other.concept and \ self.source == other.source def __hash__(self): return hash((self.concept, self.start, self.end, self.source)) class NonTerminalNode(LexerNode): def __init__(self, parsing_expression, start, end, children=None): super().__init__(start, end) self.parsing_expression = parsing_expression self.children = children def __repr__(self): name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__ if len(self.children) > 0: sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")" else: sub_names = "" return name + sub_names class TerminalNode(LexerNode): def __init__(self, parsing_expression, start, end, value): super().__init__(start, end) self.parsing_expression = parsing_expression self.value = value def __repr__(self): name = self.parsing_expression.rule_name or "" return name + f"'{self.value}'" @dataclass() class GrammarErrorNode(ErrorNode): message: str class ParsingExpression: def __init__(self, *args, **kwargs): self.elements = args nodes = kwargs.get('nodes', []) if not hasattr(nodes, '__iter__'): nodes = [nodes] self.nodes = nodes self.rule_name = kwargs.get('rule_name', '') def parse(self, parser): return self._parse(parser) class Sequence(ParsingExpression): """ Will match sequence of parser expressions in exact order they are defined. """ def _parse(self, parser): init_pos = parser.pos end_pos = parser.pos children = [] for e in self.nodes: node = e.parse(parser) if node is None: return None else: if node.end != -1: # because Optional returns -1 when no match children.append(node) end_pos = node.end return NonTerminalNode(self, init_pos, end_pos, children) class OrderedChoice(ParsingExpression): """ Will match one among multiple It will stop at the first match (so the order of definition is important) """ def _parse(self, parser): init_pos = parser.pos for e in self.nodes: node = e.parse(parser) if node: return NonTerminalNode(self, init_pos, node.end, [node]) parser.seek(init_pos) # backtrack return None class Optional(ParsingExpression): """ Will match or not the elements if many matches, will choose longest one If you need order, use Optional(OrderedChoice) """ def _parse(self, parser): init_pos = parser.pos selected_node = NonTerminalNode(self, parser.pos, -1, []) for e in self.nodes: node = e.parse(parser) if node: if node.end > selected_node.end: selected_node = node parser.seek(init_pos) # backtrack if selected_node.end != -1: parser.seek(selected_node.end) parser.next_token() # eat the tokens found return selected_node class Match(ParsingExpression): """ Base class for all classes that will try to match something from the input. """ def __init__(self, rule_name, root=False): super(Match, self).__init__(rule_name=rule_name, root=root) def parse(self, parser): result = self._parse(parser) return result class StrMatch(Match): """ Matches a literal """ def __init__(self, to_match, rule_name="", root=False, ignore_case=None): super(Match, self).__init__(rule_name=rule_name, root=root) self.to_match = to_match self.ignore_case = ignore_case def __repr__(self): return f"StrMatch('{self.to_match}')" def _parse(self, parser): token = parser.get_token() m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \ else token.value == self.to_match if m: node = TerminalNode(self, parser.pos, parser.pos, token.value) parser.next_token() return node return None class CrossRef: """ During the creation of the model, Creates reference to a concept, as it may not be resolved yet """ def __init__(self, concept): self.concept = concept class ConceptLexerParser(BaseParser): def __init__(self): super().__init__("ConceptLexer") self.concepts_dict = {} self.ignore_case = True self.token = None self.pos = -1 self.tokens = None self.context = None self.text = None self.sheerka = None def add_error(self, error, next_token=True): self.has_error = True self.error_sink.append(error) if next_token: self.next_token() return error def reset_parser(self, context, text): self.context = context self.sheerka = context.sheerka self.text = text if isinstance(text, str): self.tokens = list(Tokenizer(text)) else: self.tokens = list(text) self.tokens.append(Token(TokenKind.EOF, "", -1, -1, -1)) # make sure to finish with end of file token self.token = None self.pos = -1 self.next_token() def get_token(self) -> Token: return self.token def next_token(self, skip_whitespace=True): if self.token and self.token.type == TokenKind.EOF: return False self.pos += 1 self.token = self.tokens[self.pos] if skip_whitespace: while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE: self.pos += 1 self.token = self.tokens[self.pos] return self.token.type != TokenKind.EOF def seek(self, pos): self.pos = pos self.token = self.tokens[self.pos] return True def rewind(self, offset, skip_whitespace=True): self.pos += offset self.token = self.tokens[self.pos] if skip_whitespace: while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE): self.pos -= 1 self.token = self.tokens[self.pos] def initialize(self, dict): """ Adds a bunch of concepts, and how they can be recognized :param dict: dictionary of concept; concept_definition :return: """ nodes_to_resolve = [] concepts_to_resolve = set() # ## Gets the grammars for concept, concept_def in dict.items(): concept.init_key() # make sure that the key is initialized grammar = self.get_model(concept, concept_def, nodes_to_resolve, concepts_to_resolve) self.concepts_dict[concept] = grammar # ## Removes concepts with infinite recursions concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve) for concept in concepts_to_remove: concepts_to_resolve.remove(concept) del self.concepts_dict[concept] # ## Resolves cross references and remove grammar with unresolved references self.resolve_cross_references(concepts_to_resolve, nodes_to_resolve) def get_model(self, concept, concept_def, nodes_to_resolve, concepts_to_resolve): def inner_get_model(expression): if isinstance(expression, Concept): ret = CrossRef(expression) concepts_to_resolve.add(concept) nodes_to_resolve.append(ret) elif isinstance(expression, str): ret = StrMatch(expression, ignore_case=self.ignore_case) elif isinstance(expression, StrMatch): ret = expression if ret.ignore_case is None: ret.ignore_case = self.ignore_case elif isinstance(expression, Sequence) or \ isinstance(expression, OrderedChoice) or \ isinstance(expression, Optional): ret = expression ret.nodes.extend([inner_get_model(e) for e in ret.elements]) if any((isinstance(x, CrossRef) for x in ret.nodes)): concepts_to_resolve.add(concept) nodes_to_resolve.append(ret) else: ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'.")) return ret model = inner_get_model(concept_def) if isinstance(model, CrossRef): concepts_to_resolve.add(concept) model.rule_name = concept.key return model def detect_infinite_recursion(self, concepts_to_resolve): # infinite recursion matcher def _is_infinite_recursion(ref_concept, node): if isinstance(node, CrossRef): if node.concept == ref_concept: return True return _is_infinite_recursion(ref_concept, self.concepts_dict[node.concept]) if isinstance(node, OrderedChoice): return _is_infinite_recursion(ref_concept, node.nodes[0]) if isinstance(node, Sequence): for node in node.nodes: if _is_infinite_recursion(ref_concept, node): return True return False return False removed_concepts = [] for e in concepts_to_resolve: to_resolve = self.concepts_dict[e] if _is_infinite_recursion(e, to_resolve): removed_concepts.append(e) return removed_concepts # Cross-ref resolving def resolve_cross_references(self, concepts_to_resolve, nodes_to_resolve): repeat = True while repeat: repeat = False for e in concepts_to_resolve: to_resolve = self.concepts_dict[e] if isinstance(to_resolve, CrossRef): repeat = True self.concepts_dict[e] = self.concepts_dict[to_resolve.concept] for e in nodes_to_resolve: if not isinstance(e, ParsingExpression): continue # cases when a concept directly references another concept for i, node in enumerate(e.nodes): if isinstance(node, CrossRef): if node.concept in self.concepts_dict: e.nodes[i] = self.concepts_dict[node.concept] def parse(self, context, text): if text == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) self.reset_parser(context, text) concepts_found = [[]] # actually list of list # The first dimension is the number of possibilities found # The second dimension is the number of concepts found, under one possibility # # Example 1 # concept foo : 'one' 'two' # concept bar : 'one' 'two' # input 'one two' -> will produce two possibilities (foo and bar). # # Example 2 # concept foo : 'one' # concept bar : 'two' # input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar) while True: init_pos = self.pos res = [] for concept, grammar in self.concepts_dict.items(): self.seek(init_pos) node = grammar.parse(self) if node is not None: concept_node = ConceptNode(concept, node.start, node.end, self.tokens[node.start: node.end + 1]) if hasattr(node, "children"): concept_node.children = node.children res.append(concept_node) if len(res) == 0: # not recognized self.seek(init_pos) not_recognized = self.get_text_from_tokens(self.get_token()) self.add_error(self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=not_recognized)) break res = self.get_bests(res) # only keep the concept that eat the more tokens for r in res: r.children = flatten(r.children) concepts_found = core.utils.product(concepts_found, res) # loop self.seek(res[0].end) if not self.next_token(): break # manage when nothing is recognized (or other error) if self.has_error: return self.sheerka.ret( self.name, False, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=self.error_sink, try_parsed=concepts_found[0] if len(concepts_found) == 1 else concepts_found)) # else # returns as many ReturnValue than choices found ret = [] for choice in concepts_found: ret.append( self.sheerka.ret( self.name, True, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=choice, try_parsed=choice))) return ret[0] if len(ret) == 1 else ret @staticmethod def get_bests(results): """ Returns the result that is the longest :param results: :return: """ by_end_pos = defaultdict(list) for result in results: by_end_pos[result.end].append(result) return by_end_pos[max(by_end_pos)]