from dataclasses import dataclass from core import builtin_helpers from core.builtin_concepts import BuiltinConcepts from core.concept import DEFINITION_TYPE_BNF from core.tokenizer import Tokenizer from parsers.BaseNodeParser import BaseNodeParser, ConceptNode, UnrecognizedTokensNode, SourceCodeNode from parsers.BaseParser import UnexpectedTokenErrorNode, ErrorNode PARSERS = ["BnfNode", "SyaNode", "Python"] @dataclass() class TokensNodeFound(ErrorNode): expected_tokens: list def __eq__(self, other): if id(other) == id(self): return True if not isinstance(other, UnexpectedTokenErrorNode): return False if self.message != other.message: return False if self.token.type != other.token.type or self.token.value != other.token.value: return False if len(self.expected_tokens) != len(other.expected_tokens): return False for i, t in enumerate(self.expected_tokens): if t != other.expected_tokens[i]: return False return True def __hash__(self): return hash((self.message, self.token, self.expected_tokens)) class AtomConceptParserHelper: def __init__(self, context): self.context = context self.debug = [] self.sequence = [] # sequence of concepts already found found self.current_concept: ConceptNode = None # concept being parsed self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) # buffer that keeps tracks of tokens positions self.expected_tokens = None # expected tokens for this concepts self.is_locked = False self.errors = [] self.has_unrecognized = False self.forked = [] # use to duplicate AtomConceptParserHelper. See manage_unrecognized() def __eq__(self, other): if id(other) == id(self): return True if not isinstance(other, AtomConceptParserHelper): return False if len(self.sequence) != len(other.sequence): return False for item_self, item_other in zip(self.sequence, other.sequence): if item_self != item_other: return False return True def __hash__(self): return hash(len(self.sequence)) def __repr__(self): return f"{self.sequence}" def lock(self): self.is_locked = True def reset(self): self.is_locked = False def has_error(self): return len(self.errors) > 0 def eat_token(self, token, pos): if not self.expected_tokens: return False self.debug.append(token) if self.expected_tokens[0] != token.str_value: self.errors.append(UnexpectedTokenErrorNode( f"Found '{token}' while expecting '{self.expected_tokens[0]}'", token, [self.expected_tokens[0]])) return False self.current_concept.end = pos del self.expected_tokens[0] if not self.expected_tokens: # the concept is fully matched self.sequence.append(self.current_concept) self.expected_tokens = None return True def eat_concept(self, concept, pos): if self.is_locked: return self.debug.append(concept) self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.eat_concept(concept, pos) concept_node = ConceptNode(concept, pos, pos) expected = [t.str_value for t in Tokenizer(concept.name)][1:-1] if not expected: # the concept is already matched self.sequence.append(concept_node) else: self.current_concept = concept_node self.expected_tokens = expected def manage_unrecognized(self): if self.unrecognized_tokens.is_empty(): return # do not put empty UnrecognizedToken in out if self.unrecognized_tokens.is_whitespace(): self.unrecognized_tokens.reset() return self.unrecognized_tokens.fix_source() # try to recognize concepts nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized( self.context, self.unrecognized_tokens, PARSERS) if nodes_sequences: instances = [self] for i in range(len(nodes_sequences) - 1): clone = self.clone() instances.append(clone) self.forked.append(clone) for instance, node_sequence in zip(instances, nodes_sequences): for node in node_sequence: instance.sequence.append(node) if isinstance(node, (UnrecognizedTokensNode, SourceCodeNode)) or \ hasattr(node, "unrecognized_tokens") and node.unrecognized_tokens: instance.has_unrecognized = True instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) else: self.sequence.append(self.unrecognized_tokens) self.has_unrecognized = True # create another instance self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) def eat_unrecognized(self, token, pos): if self.is_locked: return self.debug.append(token) self.unrecognized_tokens.add_token(token, pos) def finalize(self): if len(self.sequence) > 0: self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize() if self.expected_tokens: self.errors.append(TokensNodeFound(self.expected_tokens)) def clone(self): clone = AtomConceptParserHelper(self.context) clone.debug = self.debug[:] clone.sequence = self.sequence[:] clone.current_concept = self.current_concept.clone() if self.current_concept else None clone.unrecognized_tokens = self.unrecognized_tokens.clone() clone.expected_tokens = self.expected_tokens[:] if self.expected_tokens else None clone.is_locked = self.is_locked clone.errors = self.errors[:] clone.has_unrecognized = self.has_unrecognized return clone # def _get_lexer_nodes_from_unrecognized(self): # """ # Use the source of self.unrecognized_tokens gto find concepts or source code # :return: # """ # # res = builtin_helpers.parse_unrecognized(self.context, self.unrecognized_tokens.source, PARSERS) # only_parsers_results = builtin_helpers.only_parsers_results(self.context, res) # # if not only_parsers_results.status: # return None # # return builtin_helpers.get_lexer_nodes( # only_parsers_results.body.body, # self.unrecognized_tokens.start, # self.unrecognized_tokens.tokens) class AtomNodeParser(BaseNodeParser): """ Parser used to recognize atoms concepts or sequence of atoms concepts An atom concept is concept that does not have any property thought it may have a body So, if 'one', 'two', 'three' are defined as atom concepts (with no property/parameter) This parser can recognize the sequence 'one two three' as [ConceptNode(one), ConceptNode(two), ConceptNode(three)] It can partly recognized 'one x$1!! two three' as [ConceptNode(one), UnrecognizedTokensNode(x$1!!), [ConceptNode(two), [ConceptNode(three)] It cannot recognize concepts with parameters (non atom) ex: 'one plus two' won't be recognized as ConceptNode(plus, one, two) it will be [ConceptNode(one), UnrecognizedTokensNode(plus), [ConceptNode(two)] Note 'one plus two' will be recognized by the SyaParser """ def __init__(self, **kwargs): super().__init__("AtomNode", 50, **kwargs) @staticmethod def _is_eligible(concept): """ Predicate that select concepts that must handled by AtomNodeParser :param concept: :return: """ # return len(concept.metadata.props) == 0 or concept.metadata.definition_type == DEFINITION_TYPE_BNF return len(concept.metadata.variables) == 0 and concept.metadata.definition_type != DEFINITION_TYPE_BNF def get_concepts_sequences(self): forked = [] def _add_forked_to_concept_parser_helpers(): # check that if some new InfixToPostfix are created for parser in concept_parser_helpers: if len(parser.forked) > 0: forked.extend(parser.forked) parser.forked.clear() if len(forked) > 0: concept_parser_helpers.extend(forked) forked.clear() def _get_concepts_by_name(name): other_concepts = self.sheerka.get_by_name(name) if isinstance(other_concepts, list): return other_concepts return [other_concepts] if self.sheerka.is_known(other_concepts) else [] concept_parser_helpers = [AtomConceptParserHelper(self.context)] while self.next_token(False): for concept_parser in concept_parser_helpers: concept_parser.reset() token = self.token try: for concept_parser in concept_parser_helpers: if concept_parser.eat_token(self.token, self.pos): concept_parser.lock() concepts = self.get_concepts(token, self._is_eligible, custom=_get_concepts_by_name) if not concepts: for concept_parser in concept_parser_helpers: concept_parser.eat_unrecognized(token, self.pos) continue if len(concepts) == 1: for concept_parser in concept_parser_helpers: concept_parser.eat_concept(concepts[0], self.pos) continue # make the cartesian product temp_res = [] for concept_parser in concept_parser_helpers: if concept_parser.is_locked: # It means that it already eat the token # so simply add it, do not clone temp_res.append(concept_parser) continue for concept in concepts: clone = concept_parser.clone() temp_res.append(clone) clone.eat_concept(concept, self.pos) concept_parser_helpers = temp_res finally: _add_forked_to_concept_parser_helpers() # make sure that remaining items in stack are moved to out for concept_parser in concept_parser_helpers: concept_parser.reset() concept_parser.finalize() _add_forked_to_concept_parser_helpers() return concept_parser_helpers def get_valid(self, concept_parser_helpers): valid_parser_helpers = [] # be careful, it will be a list of list for parser_helper in concept_parser_helpers: if parser_helper.has_error(): continue if len(parser_helper.sequence) == 0: continue for node in parser_helper.sequence: node.tokens = self.tokens[node.start:node.end + 1] node.fix_source() if parser_helper in valid_parser_helpers: continue valid_parser_helpers.append(parser_helper) return valid_parser_helpers def parse(self, context, parser_input): if parser_input == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) if not self.reset_parser(context, parser_input): return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) parser_helpers = self.get_valid(self.get_concepts_sequences()) if len(parser_helpers): ret = [] for parser_helper in parser_helpers: ret.append( self.sheerka.ret( self.name, not parser_helper.has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input, body=parser_helper.sequence, try_parsed=parser_helper.sequence))) if len(ret) == 1: self.log_result(context, parser_input, ret[0]) return ret[0] else: self.log_multiple_results(context, parser_input, ret) return ret else: return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input))