##################################################################################################### # This implementation of the parser is highly inspired by the arpeggio project (https://github.com/textX/Arpeggio) # I don't directly use the project, but it helped me figure out # what to do. # Dejanović I., Milosavljević G., Vaderna R.: # Arpeggio: A flexible PEG parser for Python, # Knowledge-Based Systems, 2016, 95, 71 - 74, doi:10.1016/j.knosys.2015.12.004 ##################################################################################################### from collections import defaultdict from dataclasses import dataclass import core.utils from cache.Cache import Cache from core import builtin_helpers from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, DEFINITION_TYPE_BNF, DoNotResolve, ConceptParts from core.sheerka.services.SheerkaExecute import ParserInput from core.tokenizer import Tokenizer, Token, TokenKind from parsers.BaseNodeParser import BaseNodeParser, LexerNode, UnrecognizedTokensNode, ConceptNode, GrammarErrorNode from parsers.BaseParser import ErrorNode PARSERS = ["AtomNode", "SyaNode", "Python"] @dataclass class ConceptParsingError(ErrorNode): concept: Concept class NonTerminalNode(LexerNode): """ Returned by the BnfNodeParser """ def __init__(self, parsing_expression, start, end, tokens, children=None): super().__init__(start, end, tokens) self.parsing_expression = parsing_expression self.children = children def __repr__(self): name = "Node:" + (self.parsing_expression.rule_name or self.parsing_expression.__class__.__name__) if len(self.children) > 0: sub_names = "(" + ",".join([repr(child) for child in self.children]) + ")" else: sub_names = "" return name + sub_names def __eq__(self, other): if not isinstance(other, NonTerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.children == other.children def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.children)) class TerminalNode(LexerNode): """ Returned by the BnfNodeParser """ def __init__(self, parsing_expression, start, end, value): super().__init__(start, end, source=value) self.parsing_expression = parsing_expression self.value = value def __repr__(self): name = "Node:" + (self.parsing_expression.rule_name or "") return name + f"'{self.value}'" def __eq__(self, other): if not isinstance(other, TerminalNode): return False return self.parsing_expression == other.parsing_expression and \ self.start == other.start and \ self.end == other.end and \ self.value == other.value def __hash__(self): return hash((self.parsing_expression, self.start, self.end, self.value)) class ParsingExpression: def __init__(self, *args, **kwargs): self.elements = args nodes = kwargs.get('nodes', []) if not hasattr(nodes, '__iter__'): nodes = [nodes] self.nodes = nodes self.rule_name = kwargs.get('rule_name', '') def __eq__(self, other): if not isinstance(other, ParsingExpression): return False return self.rule_name == other.rule_name and self.elements == other.elements def __hash__(self): return hash((self.rule_name, self.elements)) def parse(self, parser): return self._parse(parser) def add_rule_name_if_needed(self, text): return text + "=" + self.rule_name if self.rule_name else text class ConceptExpression(ParsingExpression): """ Will match a concept It used only for rule definition When the grammar is created, it is replaced by the actual concept """ def __init__(self, concept, rule_name=""): super().__init__(rule_name=rule_name) self.concept = concept def __repr__(self): return self.add_rule_name_if_needed(f"{self.concept}") def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, ConceptExpression): return False if isinstance(self.concept, Concept): return self.concept.name == other.concept.name # when it's only the name of the concept return self.concept == other.concept def __hash__(self): return hash((self.concept, self.rule_name)) def _parse(self, parser_helper): node = self.nodes[0].parse(parser_helper) if node is None: return None return NonTerminalNode(self, node.start, node.end, parser_helper.parser.parser_input.tokens[node.start: node.end + 1], [node]) class Sequence(ParsingExpression): """ Will match sequence of parser expressions in exact order they are defined. """ def _parse(self, parser_helper): init_pos = parser_helper.pos end_pos = parser_helper.pos children = [] for e in self.nodes: node = e.parse(parser_helper) if node is None: return None else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})") class OrderedChoice(ParsingExpression): """ Will match the first one among multiple It will stop at the first match (so the order of definition is important) """ def _parse(self, parser_helper): init_pos = parser_helper.pos for e in self.nodes: node = e.parse(parser_helper) if node: return NonTerminalNode(self, init_pos, node.end, parser_helper.parser.parser_input.tokens[init_pos: node.end + 1], [node]) parser_helper.seek(init_pos) # backtrack return None def __repr__(self): to_str = "| ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})") class LongestChoice(ParsingExpression): """ Will match the longest one among multiple All elements will be tested, so the order is not important The behaviour when multiple candidate is found is not defined yet """ def _parse(self, parser_helper): init_pos = parser_helper.pos longest_node = None end_pos = -1 for e in self.nodes: node = e.parse(parser_helper) if node: if longest_node is None or node.end > longest_node.end: longest_node = node end_pos = parser_helper.pos parser_helper.seek(init_pos) # backtrack if longest_node is None: return None parser_helper.seek(end_pos) return NonTerminalNode(self, init_pos, longest_node.end, parser_helper.parser.parser_input.tokens[init_pos: longest_node.end + 1], [longest_node]) def __repr__(self): to_str = "# ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})") class Optional(ParsingExpression): """ Will match or not the elements if many matches, will choose longest one If you need order, use Optional(OrderedChoice) """ def _parse(self, parser_helper): init_pos = parser_helper.pos selected_node = NonTerminalNode(self, parser_helper.pos, -1, [], []) # means that nothing is found for e in self.nodes: node = e.parse(parser_helper) if node: if node.end > selected_node.end: selected_node = NonTerminalNode( self, node.start, node.end, parser_helper.parser.parser_input.tokens[node.start: node.end + 1], [node]) parser_helper.seek(init_pos) # backtrack if selected_node.end != -1: parser_helper.seek(selected_node.end) parser_helper.next_token() # eat the tokens found return selected_node def __repr__(self): if len(self.elements) == 1: return f"{self.elements[0]}?" else: to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})?") class Repetition(ParsingExpression): """ Base class for all repetition-like parser expressions (?,*,+) Args: eolterm(bool): Flag that indicates that end of line should terminate repetition match. """ def __init__(self, *elements, **kwargs): super(Repetition, self).__init__(*elements, **kwargs) self.sep = kwargs.get('sep', None) class ZeroOrMore(Repetition): """ ZeroOrMore will try to match parser expression specified zero or more times. It will never fail. """ def _parse(self, parser_helper): init_pos = parser_helper.pos end_pos = -1 children = [] while True: current_pos = parser_helper.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser_helper) if sep_result is None: parser_helper.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser_helper) if node is None: parser_helper.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: return NonTerminalNode(self, init_pos, -1, [], []) return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})*") class OneOrMore(Repetition): """ OneOrMore will try to match parser expression specified one or more times. """ def _parse(self, parser_helper): init_pos = parser_helper.pos end_pos = -1 children = [] while True: current_pos = parser_helper.pos # maybe eat the separator if needed if self.sep and children: sep_result = self.sep.parse(parser_helper) if sep_result is None: parser_helper.seek(current_pos) break # eat the ZeroOrMore node = self.nodes[0].parse(parser_helper) if node is None: parser_helper.seek(current_pos) break else: if node.end != -1: # because returns -1 when no match children.append(node) end_pos = node.end if len(children) == 0: # if nothing is found, it's an error return None return NonTerminalNode(self, init_pos, end_pos, parser_helper.parser.parser_input.tokens[init_pos: end_pos + 1], children) def __repr__(self): to_str = ", ".join(repr(n) for n in self.elements) return self.add_rule_name_if_needed(f"({to_str})+") class UnorderedGroup(Repetition): """ Will try to match all of the parsing expression in any order. """ def _parse(self, parser): raise NotImplementedError() # def __repr__(self): # to_str = ", ".join(repr(n) for n in self.elements) # return f"({to_str})#" class Match(ParsingExpression): """ Base class for all classes that will try to match something from the input. """ def __init__(self, rule_name, root=False): super(Match, self).__init__(rule_name=rule_name, root=root) def parse(self, parser): result = self._parse(parser) return result class StrMatch(Match): """ Matches a literal """ def __init__(self, to_match, rule_name="", ignore_case=True, skip_whitespace=True): super(Match, self).__init__(rule_name=rule_name) self.to_match = to_match self.ignore_case = ignore_case self.skip_white_space = skip_whitespace def __repr__(self): text = self.to_match if not self.ignore_case: text += "#!ic" if not self.skip_white_space: text += "#!sw" return self.add_rule_name_if_needed(f"'{text}'") def __eq__(self, other): if not super().__eq__(other): return False if not isinstance(other, StrMatch): return False return self.to_match == other.to_match and \ self.ignore_case == other.ignore_case and \ self.skip_white_space == other.skip_white_space def _parse(self, parser_helper): token = parser_helper.get_token() m = token.str_value.lower() == self.to_match.lower() if self.ignore_case \ else token.strip_quote == self.to_match if m: node = TerminalNode(self, parser_helper.pos, parser_helper.pos, token.str_value) parser_helper.next_token(self.skip_white_space) return node return None # class RegExMatch(Match): # ''' # This Match class will perform input matching based on Regular Expressions. # # Args: # to_match (regex string): A regular expression string to match. # It will be used to create regular expression using re.compile. # ignore_case(bool): If case insensitive match is needed. # Default is None to support propagation from global parser setting. # multiline(bool): allow regex to works on multiple lines # (re.DOTALL flag). Default is None to support propagation from # global parser setting. # str_repr(str): A string that is used to represent this regex. # re_flags: flags parameter for re.compile if neither ignore_case # or multiple are set. # # ''' # def __init__(self, to_match, rule_name='', root=False, ignore_case=None, # multiline=None, str_repr=None, re_flags=re.MULTILINE): # super(RegExMatch, self).__init__(rule_name, root) # self.to_match_regex = to_match # self.ignore_case = ignore_case # self.multiline = multiline # self.explicit_flags = re_flags # # self.to_match = str_repr if str_repr is not None else to_match # # def compile(self): # flags = self.explicit_flags # if self.multiline is True: # flags |= re.DOTALL # if self.multiline is False and flags & re.DOTALL: # flags -= re.DOTALL # if self.ignore_case is True: # flags |= re.IGNORECASE # if self.ignore_case is False and flags & re.IGNORECASE: # flags -= re.IGNORECASE # self.regex = re.compile(self.to_match_regex, flags) # # def __str__(self): # return self.to_match # # def __unicode__(self): # return self.__str__() # # def _parse(self, parser): # c_pos = parser.position # m = self.regex.match(parser.input, c_pos) # if m: # matched = m.group() # if parser.debug: # parser.dprint( # "++ Match '%s' at %d => '%s'" % # (matched, c_pos, parser.context(len(matched)))) # parser.position += len(matched) # if matched: # return Terminal(self, c_pos, matched, extra_info=m) # else: # if parser.debug: # parser.dprint("-- NoMatch at {}".format(c_pos)) # parser._nm_raise(self, c_pos, parser) class ParsingExpressionVisitor: """ visit ParsingExpression """ STOP = "##_Stop_##" def visit(self, parsing_expression): name = parsing_expression.__class__.__name__ method = 'visit_' + name visitor = getattr(self, method, self.generic_visit) return visitor(parsing_expression) def generic_visit(self, parsing_expression): if hasattr(self, "visit_all"): self.visit_all(parsing_expression) for node in parsing_expression.elements: if isinstance(node, Concept): res = self.visit(ConceptExpression(node.key or node.name)) elif isinstance(node, str): res = self.visit(StrMatch(node)) else: res = self.visit(node) if res == self.STOP: return class BnfNodeFirstTokenVisitor(ParsingExpressionVisitor): def __init__(self, sheerka): self.sheerka = sheerka self.first_tokens = None def add_first_token(self, first_token): if not self.first_tokens: self.first_tokens = [first_token] else: self.first_tokens.append(first_token) def visit_ConceptExpression(self, pe): concept = self.sheerka.get_by_key(pe.concept) if isinstance(pe.concept, str) else pe.concept if self.sheerka.is_known(concept): self.add_first_token(core.utils.str_concept(concept, drop_name=True)) return self.STOP def visit_StrMatch(self, pe): if not pe.to_match: return self.add_first_token(pe.to_match) return self.STOP def visit_OrderedChoice(self, parsing_expression): for node in parsing_expression.elements: self.visit(node) return self.STOP class BnfConceptParserHelper: def __init__(self, parser): self.parser = parser self.debug = [] self.errors = [] self.sequence = [] self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) self.has_unrecognized = False self.bnf_parsed = False self.forked = [] self.token = None self.pos = -1 def __repr__(self): return f"BnfConceptParserHelper({self.sequence})" def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, BnfConceptParserHelper): return False return self.sequence == other.sequence and self.errors == other.errors def __hash__(self): return len(self.sequence) + len(self.errors) def get_token(self) -> Token: return self.token def next_token(self, skip_whitespace=True): if self.token and self.token.type == TokenKind.EOF: return False self.pos += 1 self.token = self.parser.parser_input.tokens[self.pos] if skip_whitespace: while self.token.type == TokenKind.WHITESPACE or self.token.type == TokenKind.NEWLINE: self.pos += 1 self.token = self.parser.parser_input.tokens[self.pos] return self.token.type != TokenKind.EOF def seek(self, pos): self.pos = pos self.token = self.parser.parser_input.tokens[self.pos] def has_error(self): return len(self.errors) > 0 def is_locked(self): return self.parser.parser_input.pos <= self.pos or self.has_error() def eat_concept(self, concept, token): if self.is_locked(): return self.debug.append(concept) self.manage_unrecognized() for forked in self.forked: # manage the fact that some clone may have been forked forked.eat_concept(concept, token) # init parsing_expression = self.parser.get_parsing_expression(self.parser.context, concept) if not isinstance(parsing_expression, ParsingExpression): self.debug.append(concept) error_msg = f"Failed to parse concept '{concept}'" if parsing_expression is not None: error_msg += f". Reason: '{parsing_expression}'" self.errors.append(GrammarErrorNode(error_msg)) return self.pos = self.parser.parser_input.pos self.token = self.parser.parser_input.tokens[self.pos] # parse node = parsing_expression.parse(self) if node is not None and node.end != -1: self.sequence.append(self.create_concept_node(concept, node)) self.pos = node.end self.bnf_parsed = True else: self.debug.append(("Rewind", token)) self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos) self.pos = self.parser.parser_input.pos # reset position def eat_unrecognized(self, token): if self.is_locked(): return self.debug.append(token) self.unrecognized_tokens.add_token(token, self.parser.parser_input.pos) def manage_unrecognized(self): if self.unrecognized_tokens.is_empty(): return # do not put empty UnrecognizedToken in out if self.unrecognized_tokens.is_whitespace(): self.unrecognized_tokens.reset() return self.unrecognized_tokens.fix_source() # try to recognize concepts nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized( self.parser.context, self.unrecognized_tokens, PARSERS) if nodes_sequences: instances = [self] for i in range(len(nodes_sequences) - 1): clone = self.clone() instances.append(clone) self.forked.append(clone) for instance, node_sequence in zip(instances, nodes_sequences): for node in node_sequence: instance.sequence.append(node) if isinstance(node, UnrecognizedTokensNode) or \ hasattr(node, "unrecognized_tokens") and node.unrecognized_tokens: instance.has_unrecognized = True instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) else: self.sequence.append(self.unrecognized_tokens) self.has_unrecognized = True # create another instance self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) def clone(self): clone = BnfConceptParserHelper(self.parser) clone.debug = self.debug[:] self.errors = self.errors[:] clone.sequence = self.sequence[:] clone.pos = self.pos clone.unrecognized_tokens = self.unrecognized_tokens.clone() return clone def finalize(self): if self.bnf_parsed > 0: self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize() def create_concept_node(self, template, underlying): sheerka = self.parser.context.sheerka key = (template.key, template.id) if template.id else template.key concept = sheerka.new(key) concept = self.finalize_concept(sheerka, concept, underlying) concept_node = ConceptNode( concept, underlying.start, underlying.end, self.parser.parser_input.tokens[underlying.start: underlying.end + 1], None, underlying) return concept_node def finalize_concept(self, sheerka, concept, underlying, init_empty_body=True): """ Updates the properties of the concept Goes in recursion if the property is a concept """ # this cache is to make sure that we return the same concept for the same ConceptExpression _underlying_value_cache = {} def _add_prop(_concept, prop_name, value): """ Adds a new entry, makes a list if the property already exists """ if prop_name not in _concept.compiled or _concept.compiled[prop_name] is None: # new entry _concept.compiled[prop_name] = value else: # make a list if there was a value previous_value = _concept.compiled[prop_name] if isinstance(previous_value, list): previous_value.append(value) else: new_value = [previous_value, value] _concept.compiled[prop_name] = new_value def _look_for_concept_match(_underlying): """ At some point, there is either an StrMatch or a ConceptMatch, that allowed the recognition. Look for the ConceptMatch, with recursion if needed """ if isinstance(_underlying.parsing_expression, ConceptExpression): return _underlying if not isinstance(_underlying, NonTerminalNode): return None if len(_underlying.children) != 1: return None return _look_for_concept_match(_underlying.children[0]) def _get_underlying_value(_underlying): concept_match_node = _look_for_concept_match(_underlying) if concept_match_node: # the value is a concept if id(concept_match_node) in _underlying_value_cache: result = _underlying_value_cache[id(concept_match_node)] else: ref_tpl = concept_match_node.parsing_expression.concept new = sheerka.new_from_template(ref_tpl, ref_tpl.key) result = self.finalize_concept(sheerka, new, concept_match_node.children[0], init_empty_body) _underlying_value_cache[id(concept_match_node)] = result else: # the value is a string result = DoNotResolve(_underlying.source) return result def _process_rule_name(_concept, _underlying): if _underlying.parsing_expression.rule_name: value = _get_underlying_value(_underlying) _add_prop(_concept, _underlying.parsing_expression.rule_name, value) _concept.metadata.need_validation = True elif isinstance(_underlying, NonTerminalNode): for child in _underlying.children: _process_rule_name(_concept, child) if init_empty_body and concept.metadata.body is None: value = _get_underlying_value(underlying) concept.compiled[ConceptParts.BODY] = value if underlying.parsing_expression.rule_name: _add_prop(concept, underlying.parsing_expression.rule_name, value) # KSI : Why don't we set concept.metadata.need_validation to True ? if isinstance(underlying, NonTerminalNode): for node in underlying.children: _process_rule_name(concept, node) return concept @dataclass class UnderConstruction: concept_id: str @dataclass() class ToUpdate: parent_id: int parsing_expression: ParsingExpression def __hash__(self): return hash(self.parent_id) class BnfNodeParser(BaseNodeParser): def __init__(self, **kwargs): super().__init__("BnfNode", 50, **kwargs) if 'sheerka' in kwargs: sheerka = kwargs.get("sheerka") self.concepts_grammars = sheerka.concepts_grammars else: self.concepts_grammars = Cache() self.ignore_case = True @staticmethod def _is_eligible(concept): """ Predicate that select concepts that must handled by AtomNodeParser :param concept: :return: """ return concept.metadata.definition_type == DEFINITION_TYPE_BNF @staticmethod def get_valid(parsers_helpers): valid_parser_helpers = [] for parser_helper in parsers_helpers: if not parser_helper.bnf_parsed or parser_helper.has_error(): continue if parser_helper in valid_parser_helpers: continue valid_parser_helpers.append(parser_helper) return valid_parser_helpers @staticmethod def get_expression_from_concept_name(name): """ Create the parsing expression from the name This function differs from BNFParser.parse() as it does not try to resolve identifiers into concepts >>> assert get_expression_from_concept_name('one hundred') == Sequence(StrMatch("one"), StrMatch("hundred")) while BNFParser.parse("one hundred") will look for concept 'one' and concept 'hundred' :param name: :return: """ if name is None or name.strip() == "": return [] res = [] tokens = Tokenizer(name, yield_eof=False) for token in tokens: if token.type == TokenKind.WHITESPACE: continue elif token.type == TokenKind.STRING: sub_tokens = list(Tokenizer(token.strip_quote, yield_eof=False)) for sub_token in sub_tokens[:-1]: res.append(StrMatch(sub_token.str_value, skip_whitespace=False)) res.append(StrMatch(sub_tokens[-1].str_value)) else: res.append(StrMatch(token.str_value)) return res[0] if len(res) == 1 else Sequence(*res) def get_concepts_sequences(self): """ Main method that parses the tokens and extract the concepts :return: """ def _add_forked_to_concept_parser_helpers(): # check that if some new InfixToPostfix are created for parser in concept_parser_helpers: if len(parser.forked) > 0: forked.extend(parser.forked) parser.forked.clear() if len(forked) > 0: concept_parser_helpers.extend(forked) forked.clear() def _get_longest(parser_helpers): # when there is a match with several concepts # on keep the ones that eat the more tokens by_end_pos = defaultdict(list) for helper in parser_helpers: by_end_pos[helper.pos].append(helper) return by_end_pos[max(by_end_pos)] forked = [] concept_parser_helpers = [BnfConceptParserHelper(self)] while self.parser_input.next_token(False): token = self.parser_input.token try: concepts = self.get_concepts(token, self._is_eligible, strip_quotes=False) if not concepts: for concept_parser in concept_parser_helpers: concept_parser.eat_unrecognized(token) continue if len(concepts) == 1: for concept_parser in concept_parser_helpers: concept_parser.eat_concept(concepts[0], token) continue # make the cartesian product temp_res = [] for concept_parser in concept_parser_helpers: if concept_parser.is_locked(): # It means that it already eat the token # so simply add it, do not clone temp_res.append(concept_parser) continue for concept in concepts: clone = concept_parser.clone() temp_res.append(clone) clone.eat_concept(concept, token) # only keep the longest concept_parser_helpers = _get_longest(temp_res) finally: _add_forked_to_concept_parser_helpers() # make sure that remaining items in stack are moved to out for concept_parser in concept_parser_helpers: concept_parser.finalize() _add_forked_to_concept_parser_helpers() return concept_parser_helpers def check_for_infinite_recursion(self, parsing_expression, already_found, only_first=False): if isinstance(parsing_expression, ConceptExpression): if parsing_expression.concept.id in already_found: return True already_found.add(parsing_expression.concept.id) return self.check_for_infinite_recursion(parsing_expression.nodes[0], already_found, only_first) if isinstance(parsing_expression, Sequence): # for sequence, we need to check all nodes if only_first: nodes = [] if len(parsing_expression.nodes) == 0 else [parsing_expression.nodes[0]] else: nodes = parsing_expression.nodes for node in nodes: already_found_for_current_node = already_found.copy() if self.check_for_infinite_recursion(node, already_found_for_current_node, False): already_found.update(already_found_for_current_node) return True return False if isinstance(parsing_expression, OrderedChoice): # for ordered choice, if there is at least one node that does not resolved to a recursion # we are safe for node in parsing_expression.nodes: already_found_for_current_node = already_found.copy() if self.check_for_infinite_recursion(node, already_found, True): already_found.update(already_found_for_current_node) return True else: return False return False if isinstance(parsing_expression, LongestChoice): for node in parsing_expression.nodes: already_found_for_current_node = already_found.copy() if self.check_for_infinite_recursion(node, already_found_for_current_node, True): already_found.update(already_found_for_current_node) return True return False if isinstance(parsing_expression, UnderConstruction): if parsing_expression.concept_id in already_found: return True already_found.add(parsing_expression.concept_id) return False def get_parsing_expression(self, context, concept): """ Compute the parsing expression for a given concept :param context: :param concept: :return: """ if concept.id in self.concepts_grammars: return self.concepts_grammars.get(concept.id) # internal cache of already computed parsing expression to use during the recursion grammar = {} # concept that are not totally resolved, because they reference parsing expression under construction to_update = set() # the key is the instance id of the parsing expression # during the parsing of concept, we will resolve other concepts # keep the track of the concepts that can safely be added to self.concept_grammars to_keep = {concept.id} desc = f"Get parsing expression for concept {concept}" with context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, root_concept=concept, desc=desc) as sub_context: # get the parsing expression ret = self.resolve_concept_parsing_expression(sub_context, concept, grammar, to_update, to_keep) # check and update parsing expression that are still under construction # Note that we only update the concept that will update concepts_grammars # because pe.node may be large for item in to_update: if item.parent_id in to_keep: pe = item.parsing_expression for i, node in enumerate(pe.nodes): if isinstance(node, UnderConstruction): pe.nodes[i] = grammar.get(node.concept_id) # check for infinite recursion. # We are adding a new concept. Does it create an infinite recursion ? concepts_in_recursion = set() if self.check_for_infinite_recursion(ret, concepts_in_recursion): cycle = context.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=concepts_in_recursion) for concept_id in concepts_in_recursion: grammar[concept_id] = cycle # update, in case of infinite circular recursion ret = grammar[concept.id] # finally, update concept grammar for k, v in grammar.items(): if k in to_keep: self.concepts_grammars.put(k, v) # not quite sure that it is a good idea. # Why do we want to corrupt previous valid entries ? if context.sheerka.isinstance(v, BuiltinConcepts.CHICKEN_AND_EGG): self.concepts_grammars.put(k, v) sub_context.add_values(return_values=ret) return ret def resolve_concept_parsing_expression(self, context, concept, grammar, to_update, to_keep): if concept.id in self.concepts_grammars: # validated entry return self.concepts_grammars.get(concept.id) if concept.id in grammar: # under construction entry return grammar.get(concept.id) desc = f"Resolve concept parsing expression for '{concept}'" with context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as sub_context: if not concept.bnf: # to save a function call. Not sure it worth it. BaseNodeParser.ensure_bnf(sub_context, concept, self.name) grammar[concept.id] = UnderConstruction(concept.id) sheerka = context.sheerka if concept.metadata.definition_type == DEFINITION_TYPE_BNF: expression = concept.bnf desc = f"Bnf concept detected. Resolving parsing expression '{expression}'" with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc: ssc.add_inputs(expression=expression) resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_update, to_keep) ssc.add_values(return_values=resolved) elif sheerka.isaset(context, concept): desc = f"Concept is a group. Resolving parsing expression using 'isa'" with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc: ssc.add_inputs(concept=concept) concepts_in_group = self.sheerka.get_set_elements(ssc, concept) valid_concepts = [] for c in concepts_in_group: if c.id == context.root_concept.id: continue c_pe = self.resolve_concept_parsing_expression(context, c, grammar, to_update, to_keep) if self.check_for_infinite_recursion(c_pe, {concept.id}, True): continue valid_concepts.append(c) nodes = [ConceptExpression(c, rule_name=c.name) for c in valid_concepts] resolved = self.resolve_parsing_expression(ssc, LongestChoice(*nodes), grammar, to_update, to_keep) ssc.add_values(concepts_in_group=concepts_in_group) ssc.add_values(return_values=resolved) else: desc = f"Concept is a simple concept." with sub_context.push(BuiltinConcepts.INIT_BNF, concept, who=self.name, obj=concept, desc=desc) as ssc: to_keep.add(concept.id) expression = self.get_expression_from_concept_name(concept.name) resolved = self.resolve_parsing_expression(ssc, expression, grammar, to_update, to_keep) grammar[concept.id] = resolved if self.has_error: sub_context.add_values(errors=self.error_sink) return None sub_context.add_values(return_values=resolved) return resolved def resolve_parsing_expression(self, context, expression, grammar, to_update, to_keep): if isinstance(expression, str): ret = StrMatch(expression, ignore_case=self.ignore_case) elif not isinstance(expression, ParsingExpression): return expression # escalate the error elif isinstance(expression, ConceptExpression): concept = self.get_concept(context, expression.concept) expression.concept = concept if not self.sheerka.is_known(concept): unknown_concept = self.sheerka.new(BuiltinConcepts.UNKNOWN_CONCEPT, body=concept) return self.add_error(unknown_concept) pe = self.resolve_concept_parsing_expression(context, concept, grammar, to_update, to_keep) if not isinstance(pe, (ParsingExpression, UnderConstruction)): return pe # an error is detected, escalate it # # if isinstance(pe, UnderConstruction) and expression.concept.id == pe.concept_id: # return pe # we are looking for ourself, just return it if isinstance(pe, UnderConstruction): to_update.add(ToUpdate(context.obj.id, expression)) expression.nodes = [pe] expression.rule_name = expression.rule_name or concept.name ret = expression elif isinstance(expression, StrMatch): ret = expression if ret.ignore_case is None: ret.ignore_case = self.ignore_case elif isinstance(expression, Sequence) or \ isinstance(expression, OrderedChoice) or \ isinstance(expression, LongestChoice) or \ isinstance(expression, ZeroOrMore) or \ isinstance(expression, OneOrMore) or \ isinstance(expression, Optional): ret = expression ret.nodes = [] for e in ret.elements: pe = self.resolve_parsing_expression(context, e, grammar, to_update, to_keep) if not isinstance(pe, (ParsingExpression, UnderConstruction)): return pe # an error is detected, escalate it if isinstance(pe, UnderConstruction): to_update.add(ToUpdate(context.obj.id, ret)) ret.nodes.append(pe) else: ret = self.add_error(GrammarErrorNode(f"Unrecognized grammar element '{expression}'."), False) # Translate separator expression. if isinstance(ret, Repetition) and expression.sep: expression.sep = self.resolve_parsing_expression(context, expression.sep, grammar, to_update, to_keep) return ret def get_concept(self, context, concept): if isinstance(concept, Concept): return concept if concept in context.concepts: return context.concepts[concept] return self.sheerka.get_by_key(concept) def parse(self, context, parser_input: ParserInput): """ parser_input can be string, but text can also be an list of tokens :param context: :param parser_input: :return: """ if not isinstance(parser_input, ParserInput): return None context.log(f"Parsing '{parser_input}' with BnfNode", self.name) sheerka = context.sheerka if parser_input.is_empty(): return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text(), reason=BuiltinConcepts.IS_EMPTY)) if not self.reset_parser(context, parser_input): return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) sequences = self.get_concepts_sequences() valid_parser_helpers = self.get_valid(sequences) if valid_parser_helpers is None: # token error return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) if len(valid_parser_helpers) == 0: return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text())) ret = [] for parser_helper in valid_parser_helpers: ret.append( self.sheerka.ret( self.name, not parser_helper.has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input.as_text(), body=parser_helper.sequence, try_parsed=parser_helper.sequence))) if len(ret) == 1: self.log_result(context, parser_input.as_text(), ret[0]) return ret[0] else: self.log_multiple_results(context, parser_input.as_text(), ret) return ret