##################################################################################################### # This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio) # I don't directly use the project, but it helped me figure out # what to do. # Dejanović I., Milosavljević G., Vaderna R.: # Arpeggio: A flexible PEG parser for Python, # Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004 ##################################################################################################### from collections import namedtuple from dataclasses import dataclass from collections import defaultdict from core.builtin_concepts import BuiltinConcepts, ParserResultConcept from core.concept import Concept, ConceptParts, DoNotResolve from core.tokenizer import TokenKind, Tokenizer, Token from parsers.BaseNodeParser import LexerNode, GrammarErrorNode, ConceptNode, UnrecognizedTokensNode from parsers.BaseParser import BaseParser, ErrorNode import core.utils class NonTerminalNode(LexerNode): """ Returned by the BnfNodeParser """ def __init__(self, parsing_expression, start, end, tokens, children=None): super().__init__(start, end, tokens) self.parsing_expression = parsing_expression self.children = children def __repr__(self): name = self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__ if len(self.children) > 0: sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")" else: sub_names = "" return name + sub_names def __eq__(self, other): if not isinstance(other, NonTerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.children == other.children def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.children)) class TerminalNode(LexerNode): """ Returned by the BnfNodeParser """ def __init__(self, parsing_expression, start, end, value): super().__init__(start, end, source=value) self.parsing_expression = parsing_expression self.value = value def __repr__(self): name = self.parsing_expression.rule_name or "" return name + f"'{self.value}'" def __eq__(self, other): if not isinstance(other, TerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.value == other.value def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.value)) @dataclass() class UnknownConceptNode(ErrorNode): concept_key: str @dataclass() class TooManyConceptNode(ErrorNode): concept_key: str class ParsingExpression: def __init__(self, *args, **kwargs): self.elements = args nodes = kwargs.get('nodes', []) if not hasattr(nodes, '__iter__'): nodes = [nodes] self.nodes = nodes self.rule_name = kwargs.get('rule_name', '') def __eq__(self, other): if not isinstance(other, ParsingExpression): return False return self.rule_name == other.rule_name and self.elements == other.elements def __hash__(self): return hash((self.rule_name, self.elements)) def parse(self, parser): return self._parse(parser) def add_rule_name_if_needed(self, text): return text + "=" + self.rule_name if self.rule_name else text class ConceptExpression(ParsingExpression): """ Will match a concept It used only for rule definition When the grammar is created, it is replaced by the actual concept """ def __init__(self, concept, rule_name=""): super().__init__(rule_name=rule_name) self.concept = concept def __repr__(self): return self.add_rule_name_if_needed(f"{self.concept}") def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptExpression): return False if isinstance(self.concept, Concept): return self.concept.name == other.concept.name # when it's only the name of the concept return self.concept == other.concept def __hash__(self): return hash((self.concept, self.rule_name)) @staticmethod def get_parsing_expression_from_name(name): tokens = Tokenizer(name) nodes = [StrMatch(core.utils.strip_quotes(token.value)) for token in list(tokens)[:-1]] if len(nodes) == 1: return nodes[0] else: sequence = Sequence(nodes) sequence.nodes = nodes return sequence def _parse(self, parser): to_match = parser.get_concept(self.concept) if isinstance(self.concept, str) else self.concept if parser.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return None self.concept = to_match # Memoize if to_match not in parser.concepts_grammars: # Try to match the concept using its name expr = self.get_parsing_expression_from_name(to_match.name) node = expr.parse(parser) else: node = parser.concepts_grammars[to_match].parse(parser) if node is None: return None return NonTerminalNode(self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) class ConceptGroupExpression(ConceptExpression): def _parse(self, parser): to_match = parser.get_concept(self.concept) if isinstance(self.concept, str) else self.concept if parser.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return None self.concept = to_match # Memoize if to_match not in parser.concepts_grammars: concepts_in_group = parser.sheerka.get_set_elements(parser.context, self.concept) nodes = [ConceptExpression(c, rule_name=c.name) for c in concepts_in_group] expr = OrderedChoice(nodes) expr.nodes = nodes node = expr.parse(parser) else: node = parser.concepts_grammars[to_match].parse(parser) if node is None: return None return NonTerminalNode(self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) class Sequence(ParsingExpression): """ Will match sequence of parser expressions in exact order they are defined. """ def _parse(self, parser): init_pos = parser.pos end_pos = parser.pos children = [] for e in self.nodes: node = e.parse(parser) if node is None: return None else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})") class OrderedChoice(ParsingExpression): """ Will match one among multiple It will stop at the first match (so the order of definition is important) """ def _parse(self, parser): init_pos = parser.pos for e in self.nodes: node = e.parse(parser) if node: return NonTerminalNode(self, init_pos, node.end, parser.tokens[init_pos: node.end + 1], [node]) parser.seek(init_pos) # backtrack return None def __repr__(self): to_str = "| ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})") class Optional(ParsingExpression): """ Will match or not the elements if many matches, will choose longest one If you need order, use Optional(OrderedChoice) """ def _parse(self, parser): init_pos = parser.pos selected_node = NonTerminalNode(self, parser.pos, -1, [], []) # means that nothing is found for e in self.nodes: node = e.parse(parser) if node: if node.end > selected_node.end: selected_node = NonTerminalNode( self, node.start, node.end, parser.tokens[node.start: node.end + 1], [node]) parser.seek(init_pos) # backtrack if selected_node.end != -1: parser.seek(selected_node.end) parser.next_token() # eat the tokens found return selected_node def __repr__(self): if len(self.elements) == 1: return f"{self.elements[0]}?" else: to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})?") class Repetition(ParsingExpression): """ Base class for all repetition-like parser expressions (?,*,+) Args: eolterm(bool): Flag that indicates that end of line should terminate repetition match. """ def __init__(self, *elements, **kwargs): super(Repetition, self).__init__(*elements, **kwargs) self.sep = kwargs.get('sep', None) class ZeroOrMore(Repetition): """ ZeroOrMore will try to match parser expression specified zero or more times. It will never fail. """ def _parse(self, parser): init_pos = parser.pos end_pos = -1 children = [] while True: current_pos = parser.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser) if sep_result is None: parser.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser) if node is None: parser.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: return NonTerminalNode(self, init_pos, -1, [], []) return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})*") class OneOrMore(Repetition): """ OneOrMore will try to match parser expression specified one or more times. """ def _parse(self, parser): init_pos = parser.pos end_pos = -1 children = [] while True: current_pos = parser.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser) if sep_result is None: parser.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser) if node is None: parser.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: # if nothing is found, it's an error return None return NonTerminalNode(self, init_pos, end_pos, parser.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})+") class UnorderedGroup(Repetition): """ Will try to match all of the parsing expression in any order. """ def _parse(self, parser): raise NotImplementedError() # def __repr__(self): # to_str = ", ".join(repr(n) for n in self.elements) # return f"({to_str})#" class Match(ParsingExpression): """ Base class for all classes that will try to match something from the input. """ def __init__(self, rule_name, root=False): super(Match, self).__init__(rule_name=rule_name, root=root) def parse(self, parser): result = self._parse(parser) return result class StrMatch(Match): """ Matches a literal """ def __init__(self, to_match, rule_name="", ignore_case=True): super(Match, self).__init__(rule_name=rule_name) self.to_match = to_match self.ignore_case = ignore_case def __repr__(self): return self.add_rule_name_if_needed(f"'{self.to_match}'") def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, StrMatch): return False return self.to_match == other.to_match and self.ignore_case == other.ignore_case def _parse(self, parser): token = parser.get_token() m = str(token.value).lower() == self.to_match.lower() if self.ignore_case \ else token.value == self.to_match if m: node = TerminalNode(self, parser.pos, parser.pos, token.value) parser.next_token() return node return None class BnfNodeParser(BaseParser): def __init__(self, **kwargs): super().__init__("BnfNode", 50) if 'grammars' in kwargs: self.concepts_grammars = kwargs.get("grammars") elif 'sheerka' in kwargs: self.concepts_grammars = kwargs.get("sheerka").concepts_grammars else: self.concepts_grammars = {} self.ignore_case = True self.token = None self.pos = -1 self.tokens = None self.context = None self.text = None self.sheerka = None def add_error(self, error, next_token=True): self.error_sink.append(error) if next_token: self.next_token() return error def reset_parser(self, context, text): self.context = context self.sheerka = context.sheerka self.text = text try: self.tokens = list(self.get_input_as_tokens(text)) except core.tokenizer.LexerError as e: self.add_error(self.sheerka.new(BuiltinConcepts.ERROR, body=e), False) return False self.token = None self.pos = -1 self.next_token(False) return True def get_token(self) -> Token: return self.token def next_token(self, skip_whitespace=True): if self.token and self.token.type == TokenKind.EOF: return False self.pos += 1 self.token = self.tokens[self.pos] if skip_whitespace: while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE: self.pos += 1 self.token = self.tokens[self.pos] return self.token.type != TokenKind.EOF def seek(self, pos): self.pos = pos self.token = self.tokens[self.pos] return True def rewind(self, offset, skip_whitespace=True): self.pos += offset self.token = self.tokens[self.pos] if skip_whitespace: while self.pos > 0 and (self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE): self.pos -= 1 self.token = self.tokens[self.pos] def initialize(self, context, concepts_definitions): """ Adds a bunch of concepts, and how they can be recognized :param context: execution context :param concepts_definitions: dictionary of concept, concept_definition :return: """ self.context = context self.sheerka = context.sheerka concepts_to_resolve = set() for concept, concept_def in concepts_definitions.items(): # ## Gets the grammars context.log(f"Resolving grammar for '{concept}'", context.who) concept.init_key() # make sure that the key is initialized grammar = self.get_model(concept_def, concepts_to_resolve) self.concepts_grammars[concept] = grammar if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) # ## Removes concepts with infinite recursions concepts_to_remove = self.detect_infinite_recursion(concepts_to_resolve) for concept in concepts_to_remove: concepts_to_resolve.remove(concept) del self.concepts_grammars[concept] if self.has_error: return self.sheerka.ret(self.name, False, self.error_sink) else: return self.sheerka.ret(self.name, True, self.concepts_grammars) def get_concept(self, concept_name): if concept_name in self.context.concepts: return self.context.concepts[concept_name] return self.sheerka.get(concept_name) def get_model(self, concept_def, concepts_to_resolve): # TODO # inner_get_model must not modify the initial ParsingExpression # A copy must be created def inner_get_model(expression): if isinstance(expression, Concept): if self.sheerka.isaset(self.context, expression): ret = ConceptGroupExpression(expression, rule_name=expression.name) else: ret = ConceptExpression(expression, rule_name=expression.name) concepts_to_resolve.add(expression) elif isinstance(expression, ConceptExpression): # it includes ConceptGroupExpression if expression.rule_name is None or expression.rule_name == "": expression.rule_name = expression.concept.name if isinstance(expression.concept, Concept) \ else expression.concept if isinstance(expression.concept, str): concept = self.get_concept(expression.concept) if self.sheerka.is_known(concept): expression.concept = concept concepts_to_resolve.add(expression.concept) ret = expression elif isinstance(expression, str): ret = StrMatch(expression, ignore_case=self.ignore_case) elif isinstance(expression, StrMatch): ret = expression if ret.ignore_case is None: ret.ignore_case = self.ignore_case elif isinstance(expression, Sequence) or \ isinstance(expression, OrderedChoice) or \ isinstance(expression, ZeroOrMore) or \ isinstance(expression, OneOrMore) or \ isinstance(expression, Optional): ret = expression ret.nodes = [inner_get_model(e) for e in ret.elements] else: ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False) # Translate separator expression. if isinstance(expression, Repetition) and expression.sep: expression.sep = inner_get_model(expression.sep) return ret model = inner_get_model(concept_def) return model def detect_infinite_recursion(self, concepts_to_resolve): # infinite recursion matcher def _is_infinite_recursion(ref_concept, node): if isinstance(node, ConceptExpression): if node.concept == ref_concept: return True if isinstance(node.concept, str): to_match = self.get_concept(node.concept) if self.sheerka.isinstance(to_match, BuiltinConcepts.UNKNOWN_CONCEPT): return False else: to_match = node.concept if to_match not in self.concepts_grammars: return False return _is_infinite_recursion(ref_concept, self.concepts_grammars[to_match]) if isinstance(node, OrderedChoice): return _is_infinite_recursion(ref_concept, node.nodes[0]) if isinstance(node, Sequence): for node in node.nodes: if _is_infinite_recursion(ref_concept, node): return True return False return False removed_concepts = [] for e in concepts_to_resolve: if isinstance(e, str): e = self.get_concept(e) if self.sheerka.isinstance(e, BuiltinConcepts.UNKNOWN_CONCEPT): continue if e not in self.concepts_grammars: continue to_resolve = self.concepts_grammars[e] if _is_infinite_recursion(e, to_resolve): removed_concepts.append(e) return removed_concepts def parse(self, context, parser_input): if parser_input == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) if not self.reset_parser(context, parser_input): return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) concepts_found = [[]] unrecognized_tokens = None has_unrecognized = False # actually list of list # The first dimension is the number of possibilities found # The second dimension is the number of concepts found, under one possibility # # Example 1 # concept foo : 'one' 'two' # concept bar : 'one' 'two' # input 'one two' -> will produce two possibilities (foo and bar). # # Example 2 # concept foo : 'one' # concept bar : 'two' # input 'one two' -> will produce one possibility which is (foo, bar) (foo then bar) while True: init_pos = self.pos res = [] for concept, grammar in self.concepts_grammars.items(): self.seek(init_pos) node = grammar.parse(self) # a node is TerminalNode or NonTerminalNode if node is not None and node.end != -1: updated_concept = self.finalize_concept(context.sheerka, concept, node) concept_node = ConceptNode( updated_concept, node.start, node.end, self.tokens[node.start: node.end + 1], None, node) res.append(concept_node) if len(res) == 0: # not recognized self.seek(init_pos) if unrecognized_tokens: unrecognized_tokens.add_token(self.get_token(), init_pos) else: unrecognized_tokens = UnrecognizedTokensNode(init_pos, init_pos, [self.get_token()]) if not self.next_token(False): break else: # some concepts are recognized if unrecognized_tokens and unrecognized_tokens.not_whitespace(): unrecognized_tokens.fix_source() concepts_found = core.utils.product(concepts_found, [unrecognized_tokens]) has_unrecognized = True unrecognized_tokens = None res = self.get_bests(res) # only keep the concepts that eat the more tokens concepts_found = core.utils.product(concepts_found, res) # loop self.seek(res[0].end) if not self.next_token(False): break # Fix the source for unrecognized tokens if unrecognized_tokens and unrecognized_tokens.not_whitespace(): unrecognized_tokens.fix_source() concepts_found = core.utils.product(concepts_found, [unrecognized_tokens]) has_unrecognized = True # else # returns as many ReturnValue than choices found ret = [] for choice in concepts_found: ret.append( self.sheerka.ret( self.name, not has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input, body=choice, try_parsed=choice))) if len(ret) == 1: self.log_result(context, parser_input, ret[0]) return ret[0] else: self.log_multiple_results(context, parser_input, ret) return ret def finalize_concept(self, sheerka, template, underlying, init_empty_body=True): """ Updates the properties of the concept Goes in recursion if the property is a concept """ # this cache is to make sure that we return the same concept for the same ConceptExpression _underlying_value_cache = {} def _add_prop(_concept, prop_name, value): """ Adds a new entry, makes a list if the property already exists """ if prop_name not in _concept.compiled or _concept.compiled[prop_name] is None: # new entry _concept.compiled[prop_name] = value else: # make a list if there was a value previous_value = _concept.compiled[prop_name] if isinstance(previous_value, list): previous_value.append(value) else: new_value = [previous_value, value] _concept.compiled[prop_name] = new_value def _look_for_concept_match(_underlying): """ At some point, there is either an StrMatch or a ConceptMatch, that allowed the recognition. Look for the ConceptMatch, with recursion if needed """ if isinstance(_underlying.parsing_expression, ConceptExpression): return _underlying if not isinstance(_underlying, NonTerminalNode): return None if len(_underlying.children) != 1: return None return _look_for_concept_match(_underlying.children[0]) def _get_underlying_value(_underlying): concept_match_node = _look_for_concept_match(_underlying) if concept_match_node: # the value is a concept if id(concept_match_node) in _underlying_value_cache: result = _underlying_value_cache[id(concept_match_node)] else: ref_tpl = concept_match_node.parsing_expression.concept result = self.finalize_concept(sheerka, ref_tpl, concept_match_node.children[0], init_empty_body) _underlying_value_cache[id(concept_match_node)] = result else: # the value is a string result = DoNotResolve(_underlying.source) return result def _process_rule_name(_concept, _underlying): if _underlying.parsing_expression.rule_name: value = _get_underlying_value(_underlying) _add_prop(_concept, _underlying.parsing_expression.rule_name, value) _concept.metadata.need_validation = True if isinstance(_underlying, NonTerminalNode): for child in _underlying.children: _process_rule_name(_concept, child) key = (template.key, template.id) if template.id else template.key concept = sheerka.new(key) if init_empty_body and concept.metadata.body is None: value = _get_underlying_value(underlying) concept.compiled[ConceptParts.BODY] = value if underlying.parsing_expression.rule_name: _add_prop(concept, underlying.parsing_expression.rule_name, value) # KSI : Why don't we set concept.metadata.need_validation to True ? if isinstance(underlying, NonTerminalNode): for node in underlying.children: _process_rule_name(concept, node) return concept def encode_grammar(self, grammar): """ Transform the grammar into something that can easily can be serialized :param grammar: :return: """ def _encode(expression): if isinstance(expression, StrMatch): res = f"'{expression.to_match}'" elif isinstance(expression, ConceptExpression): res = core.utils.str_concept(expression.concept) elif isinstance(expression, Sequence): res = "(" + " ".join(_encode(c) for c in expression.nodes) + ")" elif isinstance(expression, OrderedChoice): res = "(" + "|".join(_encode(c) for c in expression.nodes) + ")" elif isinstance(expression, Optional): res = _encode(expression.nodes[0]) + "?" elif isinstance(expression, ZeroOrMore): res = _encode(expression.nodes[0]) + "*" elif isinstance(expression, OneOrMore): res = _encode(expression.nodes[0]) + "+" if expression.rule_name: res += "=" + expression.rule_name return res result = {} for k, v in grammar.items(): key = core.utils.str_concept(k) value = _encode(v) result[key] = value return result @staticmethod def get_bests(results): """ Returns the result that is the longest :param results: :return: """ by_end_pos = defaultdict(list) for result in results: by_end_pos[result.end].append(result) return by_end_pos[max(by_end_pos)] class ParsingExpressionVisitor: """ visit ParsingExpression """ def visit(self, parsing_expression): name = parsing_expression.__class__.__name__ method = 'visit_' + name visitor = getattr(self, method, self.generic_visit) return visitor(parsing_expression) def generic_visit(self, parsing_expression): if hasattr(self, "visit_all"): self.visit_all(parsing_expression) for node in parsing_expression.elements: if isinstance(node, Concept): self.visit(ConceptExpression(node.key or node.name)) elif isinstance(node, str): self.visit(StrMatch(node)) else: self.visit(node)