##################################################################################################### # This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio) # I don't directly use the project, but it helped me figure out # what to do. # Dejanović I., Milosavljević G., Vaderna R.: # Arpeggio: A flexible PEG parser for Python, # Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004 ##################################################################################################### from collections import namedtuple from dataclasses import dataclass from collections import defaultdict from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, ConceptParts, DoNotResolve from core.tokenizer import TokenKind, Tokenizer, Token from parsers.BaseParser import BaseParser, Node, ErrorNode import core.utils @dataclass() class LexerNode(Node): start: int # starting index in the tokens list end: int # ending index in the tokens list tokens: list = None # tokens source: str = None # string representation of what was parsed def __post_init__(self): if self.source is None: self.source = BaseParser.get_text_from_tokens(self.tokens) def __eq__(self, other): if not isinstance(other, LexerNode): return False return self.start == other.start and \ self.end == other.end and \ self.source == other.source and \ self.tokens == other.tokens class UnrecognizedTokensNode(LexerNode): def __init__(self, start, end, tokens): super().__init__(start, end, tokens) def add_token(self, token, pos): self.tokens.append(token) self.end = pos def fix_source(self): self.source = BaseParser.get_text_from_tokens(self.tokens) def not_whitespace(self): return not (len(self.tokens) == 1 and self.tokens[0].type in (TokenKind.WHITESPACE, TokenKind.NEWLINE)) def __eq__(self, other): if isinstance(other, utnode): return self.start == other.start and \ self.end == other.end and \ self.source == other.source if not isinstance(other, UnrecognizedTokensNode): return False return self.start == other.start and \ self.end == other.end and \ self.source == other.source def __hash__(self): return hash((self.start, self.end, self.source)) def __repr__(self): return f"UnrecognizedTokensNode(start={self.start}, end={self.end}, source='{self.source}')" class ConceptNode(LexerNode): """ Returned by the ConceptLexerParser It represents a recognized concept """ def __init__(self, concept, start, end, tokens=None, source=None, underlying=None): super().__init__(start, end, tokens, source) self.concept = concept self.underlying = underlying if self.source is None: self.source = BaseParser.get_text_from_tokens(self.tokens) def __eq__(self, other): if isinstance(other, cnode): return self.concept.key == other.concept_key and \ self.start == other.start and \ self.end == other.end and \ self.source == other.source if isinstance(other, short_cnode): return self.concept.key == other.concept_key and self.source == other.source if not isinstance(other, ConceptNode): return False return self.concept == other.concept and \ self.start == other.start and \ self.end == other.end and \ self.source == other.source and \ self.underlying == other.underlying def __hash__(self): return hash((self.concept, self.start, self.end, self.source, self.underlying)) def __repr__(self): return f"ConceptNode(concept='{self.concept}', start={self.start}, end={self.end}, source='{self.source}')" class SourceCodeNode(LexerNode): """ Returned when some source code (like Python source code is recognized) """ def __init__(self, node, start, end, tokens=None, source=None): super().__init__(start, end, tokens, source) self.node = node # The PythonNode (or whatever language node) that is found def __eq__(self, other): if isinstance(other, scnode): return self.start == other.start and \ self.end == other.end and \ self.source == other.source if not isinstance(other, SourceCodeNode): return False return self.node == other.node and \ self.start == other.start and \ self.end == other.end and \ self.source == other.source def __hash__(self): return hash((self.start, self.end, self.source)) def __repr__(self): return f"SourceCodeNode(start={self.start}, end={self.end}, source='{self.source}')" cnode = namedtuple("ConceptNode", "concept_key start end source") short_cnode = namedtuple("ConceptNode", "concept_key source") utnode = namedtuple("UnrecognizedTokensNode", "start end source") scnode = namedtuple("SourceCodeNode", "start end source") class NonTerminalNode(LexerNode): """ Returned by the ConceptLexerParser """ def __init__(self, parsing_expression, start, end, tokens, children=None): super().__init__(start, end, tokens) self.parsing_expression = parsing_expression self.children = children def __repr__(self): name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__ if len(self.children) > 0: sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")" else: sub_names = "" return name + sub_names def __eq__(self, other): if not isinstance(other, NonTerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.children == other.children def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.children)) class TerminalNode(LexerNode): """ Returned by the ConceptLexerParser """ def __init__(self, parsing_expression, start, end, value): super().__init__(start, end, source=value) self.parsing_expression = parsing_expression self.value = value def __repr__(self): name = self.parsing_expression.rule_name or "" return name + f"'{self.value}'" def __eq__(self, other): if not isinstance(other, TerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.value == other.value def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.value)) @dataclass() class GrammarErrorNode(ErrorNode): message: str @dataclass() class UnknownConceptNode(ErrorNode): concept_key: str @dataclass() class TooManyConceptNode(ErrorNode): concept_key: str class ParsingExpression: def __init__(self, *args, **kwargs): self.elements = args nodes = kwargs.get('nodes', []) if not hasattr(nodes, '__iter__'): nodes = [nodes] self.nodes = nodes self.rule_name = kwargs.get('rule_name', '') def __eq__(self, other): if not isinstance(other, ParsingExpression): return False return self.rule_name == other.rule_name and self.elements == other.elements def __hash__(self): return hash((self.rule_name, self.elements)) def parse(self, parser): return self._parse(parser) class ConceptExpression(ParsingExpression): """ Will match a concept It used only for rule definition When the grammar is created, it is replaced by the actual concept """ def __init__(self, concept, rule_name=""): super().__init__(rule_name=rule_name) self.concept = concept def __repr__(self): return f"{self.concept}" def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptExpression): return False if isinstance(self.concept, Concept): return self.concept.name == other.concept.name # when it's only the name of the concept return self.concept == other.concept def __hash__(self): return hash((self.concept, self.rule_name)) @staticmethod def get_parsing_expression_from_name(name): tokens = Tokenizer(name) nodes = [StrMatch(core.utils.strip_quotes(token.value)) for token in list(tokens)[:-1]] if len(nodes) == 1: return nodes[0] else: sequence = Sequence(nodes) sequence.nodes = nodes return sequence def _parse(self, parser): to_match = parser.get_concept(self.concept) if isinstance(self.concept, str) else self.concept if parser.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return None self.concept = to_match # Memoize if to_match not in parser.concepts_grammars: # Try to match the concept using its name expr = self.get_parsing_expression_from_name(to_match.name) node = expr.parse(parser) else: node = parser.concepts_grammars[to_match].parse(parser) if node is None: return None return NonTerminalNode(self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) class ConceptGroupExpression(ConceptExpression): def _parse(self, parser): to_match = parser.get_concept(self.concept) if isinstance(self.concept, str) else self.concept if parser.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return None self.concept = to_match # Memoize if to_match not in parser.concepts_grammars: concepts_in_group = parser.sheerka.get_set_elements(self.concept) nodes = [ConceptExpression(c, rule_name=c.name) for c in concepts_in_group] expr = OrderedChoice(nodes) expr.nodes = nodes node = expr.parse(parser) else: node = parser.concepts_grammars[to_match].parse(parser) if node is None: return None return NonTerminalNode(self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) class Sequence(ParsingExpression): """ Will match sequence of parser expressions in exact order they are defined. """ def _parse(self, parser): init_pos = parser.pos end_pos = parser.pos children = [] for e in self.nodes: node = e.parse(parser) if node is None: return None else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})" class OrderedChoice(ParsingExpression): """ Will match one among multiple It will stop at the first match (so the order of definition is important) """ def _parse(self, parser): init_pos = parser.pos for e in self.nodes: node = e.parse(parser) if node: return NonTerminalNode(self, init_pos, node.end, parser.tokens[init_pos: node.end + 1], [node]) parser.seek(init_pos) # backtrack return None def __repr__(self): to_str = "| ".join(repr(n) for n in self.elements) return f"({to_str})" class Optional(ParsingExpression): """ Will match or not the elements if many matches, will choose longest one If you need order, use Optional(OrderedChoice) """ def _parse(self, parser): init_pos = parser.pos selected_node = NonTerminalNode(self, parser.pos, -1, [], []) # means that nothing is found for e in self.nodes: node = e.parse(parser) if node: if node.end > selected_node.end: selected_node = NonTerminalNode( self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) parser.seek(init_pos) # backtrack if selected_node.end != -1: parser.seek(selected_node.end) parser.next_token() # eat the tokens found return selected_node def __repr__(self): if len(self.elements) == 1: return f"{self.elements[0]}?" else: to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})?" class Repetition(ParsingExpression): """ Base class for all repetition-like parser expressions (?,*,+) Args: eolterm(bool): Flag that indicates that end of line should terminate repetition match. """ def __init__(self, *elements, **kwargs): super(Repetition, self).__init__(*elements, **kwargs) self.sep = kwargs.get('sep', None) class ZeroOrMore(Repetition): """ ZeroOrMore will try to match parser expression specified zero or more times. It will never fail. """ def _parse(self, parser): init_pos = parser.pos end_pos = -1 children = [] while True: current_pos = parser.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser) if sep_result is None: parser.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser) if node is None: parser.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: return NonTerminalNode(self, init_pos, -1, [], []) return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})*" class OneOrMore(Repetition): """ OneOrMore will try to match parser expression specified one or more times. """ def _parse(self, parser): init_pos = parser.pos end_pos = -1 children = [] while True: current_pos = parser.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser) if sep_result is None: parser.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser) if node is None: parser.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: # if nothing is found, it's an error return None return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return f"({to_str})+" class UnorderedGroup(Repetition): """ Will try to match all of the parsing expression in any order. """ def _parse(self, parser): raise NotImplementedError() # def __repr__(self): # to_str = ", ".join(repr(n) for n in self.elements) # return f"({to_str})#" class Match(ParsingExpression): """ Base class for all classes that will try to match something from the input. """ def __init__(self, rule_name, root=False): super(Match, self).__init__(rule_name=rule_name, root=root) def parse(self, parser): result = self._parse(parser) return result class StrMatch(Match): """ Matches a literal """ def __init__(self, to_match, rule_name="", root=False, ignore_case=True): super(Match, self).__init__(rule_name=rule_name, root=root) self.to_match = to_match self.ignore_case = ignore_case def __repr__(self): return f"'{self.to_match}'" def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, StrMatch): return False return self.to_match == other.to_match and self.ignore_case == other.ignore_case def _parse(self, parser): token = parser.get_token() m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \ else token.value == self.to_match if m: node = TerminalNode(self, parser.pos, parser.pos, token.value) parser.next_token() return node return None class ConceptLexerParser(BaseParser): def __init__(self, **kwargs): super().__init__("ConceptLexer", 50) if 'grammars' in kwargs: self.concepts_grammars = kwargs.get("grammars") elif 'sheerka' in kwargs: self.concepts_grammars = kwargs.get("sheerka").concepts_grammars else: self.concepts_grammars = {} self.ignore_case = True self.token = None self.pos = -1 self.tokens = None self.context = None self.text = None self.sheerka = None def add_error(self, error, next_token=True): self.has_error = True self.error_sink.append(error) if next_token: self.next_token() return error def reset_parser(self, context, text): self.context = context self.sheerka = context.sheerka self.text = text if isinstance(text, str): try: self.tokens = list(Tokenizer(text)) except core.tokenizer.LexerError as e: self.add_error(self.sheerka.new(BuiltinConcepts.ERROR, body=e), False) return False else: self.tokens = list(text) self.tokens.append(Token(TokenKind.EOF, "", -1, -1, -1)) # make sure to finish with end of file token self.token = None self.pos = -1 self.next_token(False) return True def get_token(self) -> Token: return self.token def next_token(self, skip_whitespace=True): if self.token and self.token.type == TokenKind.EOF: return False self.pos += 1 self.token = self.tokens[self.pos] if skip_whitespace: while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE: self.pos += 1 self.token = self.tokens[self.pos] return self.token.type != TokenKind.EOF def seek(self, pos): self.pos = pos self.token = self.tokens[self.pos] return True def rewind(self, offset, skip_whitespace=True): self.pos += offset self.token = self.tokens[self.pos] if skip_whitespace: while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE): self.pos -= 1 self.token = self.tokens[self.pos] def initialize(self, context, concepts_definitions): """ Adds a bunch of concepts, and how they can be recognized :param context: execution context :param concepts_definitions: dictionary of concept, concept_definition :return: """ self.context = context self.sheerka = context.sheerka concepts_to_resolve = set() # ## Gets the grammars for concept, concept_def in concepts_definitions.items(): concept.init_key() # make sure that the key is initialized grammar = self.get_model(concept_def, concepts_to_resolve) self.concepts_grammars[concept] = grammar if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) # ## Removes concepts with infinite recursions concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve) for concept in concepts_to_remove: concepts_to_resolve.remove(concept) del self.concepts_grammars[concept] if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) else: return self.sheerka.ret(self.name, True, self.concepts_grammars) def get_concept(self, concept_name): if concept_name in self.context.concepts: return self.context.concepts[concept_name] return self.sheerka.get(concept_name) def get_model(self, concept_def, concepts_to_resolve): # TODO # inner_get_model must not modify the initial ParsingExpression # A copy must be created def inner_get_model(expression): if isinstance(expression, Concept): if self.sheerka.isagroup(expression): ret = ConceptGroupExpression(expression, rule_name=expression.name) else: ret = ConceptExpression(expression, rule_name=expression.name) concepts_to_resolve.add(expression) elif isinstance(expression, ConceptExpression): if expression.rule_name is None or expression.rule_name == "": expression.rule_name = expression.concept.name if isinstance(expression.concept, Concept) \ else expression.concept concepts_to_resolve.add(expression.concept) ret = expression elif isinstance(expression, str): ret = StrMatch(expression, ignore_case=self.ignore_case) elif isinstance(expression, StrMatch): ret = expression if ret.ignore_case is None: ret.ignore_case = self.ignore_case elif isinstance(expression, Sequence) or \ isinstance(expression, OrderedChoice) or \ isinstance(expression, ZeroOrMore) or \ isinstance(expression, OneOrMore) or \ isinstance(expression, Optional): ret = expression ret.nodes = [inner_get_model(e) for e in ret.elements] else: ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False) # Translate separator expression. if isinstance(expression, Repetition) and expression.sep: expression.sep = inner_get_model(expression.sep) return ret model = inner_get_model(concept_def) return model def detect_infinite_recursion(self, concepts_to_resolve): # infinite recursion matcher def _is_infinite_recursion(ref_concept, node): if isinstance(node, ConceptExpression): if node.concept == ref_concept: return True if isinstance(node.concept, str): to_match = self.get_concept(node.concept) if self.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return False else: to_match = node.concept if to_match not in self.concepts_grammars: return False return _is_infinite_recursion(ref_concept, self.concepts_grammars[to_match]) if isinstance(node, OrderedChoice): return _is_infinite_recursion(ref_concept, node.nodes[0]) if isinstance(node, Sequence): for node in node.nodes: if _is_infinite_recursion(ref_concept, node): return True return False return False removed_concepts = [] for e in concepts_to_resolve: if isinstance(e, str): e = self.get_concept(e) if self.sheerka.isinstance(e, BuiltinConcepts.UNKNOWN_CONCEPT): continue if e not in self.concepts_grammars: continue to_resolve = self.concepts_grammars[e] if _is_infinite_recursion(e, to_resolve): removed_concepts.append(e) return removed_concepts def parse(self, context, text): if text == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) if not self.reset_parser(context, text): return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) concepts_found = [[]] unrecognized_tokens = None has_unrecognized = False # actually list of list # The first dimension is the number of possibilities found # The second dimension is the number of concepts found, under one possibility # # Example 1 # concept foo : 'one' 'two' # concept bar : 'one' 'two' # input 'one two' -> will produce two possibilities (foo and bar). # # Example 2 # concept foo : 'one' # concept bar : 'two' # input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar) while True: init_pos = self.pos res = [] for concept, grammar in self.concepts_grammars.items(): self.seek(init_pos) node = grammar.parse(self) # a node is TerminalNode or NonTerminalNode if node is not None and node.end != -1: updated_concept = self.finalize_concept(context.sheerka, concept, node) concept_node = ConceptNode( updated_concept, node.start, node.end, self.tokens[node.start: node.end + 1], None, node) res.append(concept_node) if len(res) == 0: # not recognized self.seek(init_pos) if unrecognized_tokens: unrecognized_tokens.add_token(self.get_token(), init_pos) else: unrecognized_tokens = UnrecognizedTokensNode(init_pos, init_pos, [self.get_token()]) if not self.next_token(False): break else: # some concepts are recognized if unrecognized_tokens and unrecognized_tokens.not_whitespace(): unrecognized_tokens.fix_source() concepts_found = core.utils.product(concepts_found, [unrecognized_tokens]) has_unrecognized = True unrecognized_tokens = None res = self.get_bests(res) # only keep the concepts that eat the more tokens concepts_found = core.utils.product(concepts_found, res) # loop self.seek(res[0].end) if not self.next_token(False): break # Fix the source for unrecognized tokens if unrecognized_tokens and unrecognized_tokens.not_whitespace(): unrecognized_tokens.fix_source() concepts_found = core.utils.product(concepts_found, [unrecognized_tokens]) has_unrecognized = True # else # returns as many ReturnValue than choices found ret = [] for choice in concepts_found: ret.append( self.sheerka.ret( self.name, not has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=text, body=choice, try_parsed=choice))) if len(ret) == 1: self.log_result(context, text, ret[0]) return ret[0] else: self.log_multiple_results(context, text, ret) return ret def finalize_concept(self, sheerka, template, underlying, init_empty_body=True): """ Updates the properties of the concept Goes in recursion if the property is a concept """ # this cache is to make sure that we return the same concept for the same ConceptExpression _underlying_value_cache = {} def _add_prop(_concept, prop_name, value): """ Adds a new entry, makes a list if the property already exists """ if prop_name not in _concept.compiled or _concept.compiled[prop_name] is None: # new entry _concept.compiled[prop_name] = value else: # make a list if there was a value previous_value = _concept.compiled[prop_name] if isinstance(previous_value, list): previous_value.append(value) else: new_value = [previous_value, value] _concept.compiled[prop_name] = new_value def _look_for_concept_match(_underlying): if isinstance(_underlying.parsing_expression, ConceptExpression): return _underlying if not isinstance(_underlying, NonTerminalNode): return None if len(_underlying.children) != 1: return None return _look_for_concept_match(_underlying.children[0]) def _get_underlying_value(_underlying): concept_match_node = _look_for_concept_match(_underlying) if concept_match_node: if id(concept_match_node) in _underlying_value_cache: result = _underlying_value_cache[id(concept_match_node)] else: ref_tpl = concept_match_node.parsing_expression.concept result = self.finalize_concept(sheerka, ref_tpl, concept_match_node.children[0], init_empty_body) _underlying_value_cache[id(concept_match_node)] = result else: result = DoNotResolve(_underlying.source) return result def _process_rule_name(_concept, _underlying): if _underlying.parsing_expression.rule_name: value = _get_underlying_value(_underlying) _add_prop(_concept, _underlying.parsing_expression.rule_name, value) if isinstance(_underlying, NonTerminalNode): for child in _underlying.children: _process_rule_name(_concept, child) key = (template.key, template.id) if template.id else template.key concept = sheerka.new(key) if init_empty_body and concept.metadata.body is None: value = _get_underlying_value(underlying) concept.compiled[ConceptParts.BODY] = value if underlying.parsing_expression.rule_name: _add_prop(concept, underlying.parsing_expression.rule_name, value) if isinstance(underlying, NonTerminalNode): for node in underlying.children: _process_rule_name(concept, node) return concept @staticmethod def get_bests(results): """ Returns the result that is the longest :param results: :return: """ by_end_pos = defaultdict(list) for result in results: by_end_pos[result.end].append(result) return by_end_pos[max(by_end_pos)] class ParsingExpressionVisitor: """ visit ParsingExpression """ def visit(self, parsing_expression): name = parsing_expression.__class__.__name__ method = 'visit_' + name visitor = getattr(self, method, self.generic_visit) return visitor(parsing_expression) def generic_visit(self, parsing_expression): if hasattr(self, "visit_all"): self.visit_all(parsing_expression) for node in parsing_expression.elements: if isinstance(node, Concept): self.visit(ConceptExpression(node.key or node.name)) elif isinstance(node, str): self.visit(StrMatch(node)) else: self.visit(node)