from collections import namedtuple from dataclasses import dataclass, field from operator import attrgetter from typing import List from core import builtin_helpers from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, DEFINITION_TYPE_BNF from core.sheerka.ExecutionContext import ExecutionContext from core.tokenizer import Token, TokenKind, Tokenizer from parsers.BaseNodeParser import UnrecognizedTokensNode, ConceptNode, SourceCodeNode, SyaAssociativity, \ SourceCodeWithConceptNode, BaseNodeParser from parsers.BaseParser import ErrorNode PARSERS = ["BnfNode", "AtomNode", "Python"] function_parser_res = namedtuple("FunctionParserRes", 'to_out function') class ParenthesisMismatchErrorNode(ErrorNode): def __init__(self, error_int): if isinstance(error_int, tuple): self.token = error_int[0] self.pos = error_int[1] elif isinstance(error_int, Token): self.token = error_int self.pos = -1 else: # isinstance(UnrecognizedTokensNode) for i, t in reversed(list(enumerate(error_int.tokens))): if t.type == TokenKind.LPAR: self.token = t self.pos = i + error_int.start def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, tuple): return other[0] == self.token.value and other[1] == self.pos if not isinstance(other, ParenthesisMismatchErrorNode): return False return self.token == other.token and self.pos == other.pos def __hash__(self): return hash(self.pos) def __repr__(self): return f"ParenthesisMismatchErrorNode('{self.token.value}', {self.pos}" @dataclass() class NoneAssociativeSequenceErrorNode(ErrorNode): concept: Concept first: int second: int tokens: List[Token] = None @dataclass() class TooManyParametersFound(ErrorNode): concept: Concept pos: int # position of the concept token: Token # token of the concept where the error was noticed parameters: list # list of unmatched parameters def __repr__(self): return f"Too many parameters found for '{self.concept}' before token '{self.token}'" @dataclass() class SyaConceptDef: """ Wrapper to concept It gives the precedence and the associativity for the concept """ concept: Concept precedence: int = 0 associativity: SyaAssociativity = SyaAssociativity.Right @dataclass() class SyaConceptParserHelper: """ Use because the is not enough information to create the final ConceptNode """ concept: Concept start: int # position of the token in the tokenizer (Caution, it is not token.index) end: int = field(default=-1, repr=False, compare=False, hash=None) expected: List[Token] = field(default_factory=list, repr=False, compare=False, hash=None) expected_parameters_before_first_token: int = field(default=0, repr=False, compare=False, hash=None) last_token_before_first_token: Token = field(default=None, repr=False, compare=False, hash=None) potential_pos: int = field(default=-1, repr=False, compare=False, hash=None) parameters_list_at_init: list = field(default_factory=list, repr=False, compare=False, hash=None) tokens: List[Token] = field(default_factory=list, repr=False, compare=False, hash=None) # tokens eaten remember_whitespace: Token = field(default=None, repr=False, compare=False, hash=None) error: str = None def __post_init__(self): concept = self.concept.concept if isinstance(self.concept, SyaConceptDef) else self.concept if self.end == -1: self.end = self.start first_keyword_found = None for token in Tokenizer(concept.key, yield_eof=False): if not first_keyword_found and token.type != TokenKind.WHITESPACE and token.type != TokenKind.VAR_DEF: first_keyword_found = token if first_keyword_found: self.expected.append(token) else: self.last_token_before_first_token = token if token.type != TokenKind.WHITESPACE: self.expected_parameters_before_first_token += 1 self.eat_token(first_keyword_found) # remove the first token self.tokens.append(first_keyword_found) def is_matched(self): return len(self.expected) == 0 def is_atom(self): return len(self.concept.concept.metadata.variables) == 0 and len(self.expected) == 0 def is_next(self, token): if self.is_matched() or len(self.expected) == 0: return False # True if the next token is the one that is expected # Or if the next token is a whitespace and the expected one is the one after # (whitespace are sometimes not mandatory) return token.str_value == self.expected[0].str_value or \ self.expected[0].type == TokenKind.WHITESPACE and token.str_value == self.expected[1].str_value def is_expected(self, token): if self.is_matched() or token.type == TokenKind.WHITESPACE: return False for expected in self.expected: if expected.type != TokenKind.VAR_DEF and expected.str_value == token.str_value: return True return False def expected_parameters(self): return sum(map(lambda e: e.type == TokenKind.VAR_DEF, self.expected)) def eat_token(self, until_token): """ eat until token 'until' :param until_token: :return: """ # No check, as it is used only after is_expected() or is_next() while self.expected[0].str_value != until_token.str_value: del self.expected[0] del self.expected[0] # return True is a whole sequence of keyword is eaten # example # Concept("foo a bar baz qux b").def_var("a").def_var("b") # 'bar' is just eaten. We will return False because 'baz' and 'qux' are still waiting if len(self.expected) == 0: return True # also return True at the end of a name sequence # ... bar baz qux # return True after 'qux', to indicate all the parameters from must be processed return self.expected[0].type == TokenKind.VAR_DEF def eat_parameter(self, parameter): if self.is_matched() and parameter == self: return # not a error if self.is_matched(): self.error = "No more parameter expected" return if self.expected[0].type != TokenKind.VAR_DEF: self.error = "Parameter was not expected" return del self.expected[0] def fix_concept(self): """ When the SYA is done, we only need the concept, not the sya concept :return: """ if isinstance(self.concept, SyaConceptDef): self.concept = self.concept.concept return self # @staticmethod # def _get_token_value(token): # if token.type == TokenKind.STRING: # return token.value[1:-1] # elif token.type == TokenKind.KEYWORD: # return token.value.value # else: # return token.value def clone(self): clone = SyaConceptParserHelper(self.concept, self.start, self.end) clone.expected = self.expected[:] clone.expected_parameters_before_first_token = self.expected_parameters_before_first_token clone.potential_pos = self.potential_pos clone.parameters_list_at_init = self.parameters_list_at_init clone.error = self.error return clone class InFixToPostFix: def __init__(self, context): self.context = context self.is_locked = False # when locked, cannot process input self.out = [] # shunting yard algo out self.stack = [] # shunting yard algo stack self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) # buffer that keeps tracks of tokens positions self.parameters_list = [] # list of the parameters that need to be associated to a concept self.errors = [] # Not quite sure that I can handle more than one error self.debug = [] self.false_positives = [] # concepts that looks like known one, but not (for debug purpose) self.forked = [] # use to fork InFixToPostFix when multiple parsers recognize the unrecognized_tokens def __repr__(self): return f"InFixToPostFix({self.debug})" def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, InFixToPostFix): return False return self.out == other.out and self.errors == other.errors def __hash__(self): return len(self.sequence) + len(self.errors) def _add_error(self, error): self.errors.append(error) def _is_lpar(self, token): """ True if the token is a left parenthesis '(' Note that when we are parsing non recognized tokens, we consider that the parenthesis are part of the non recognized :param token: :return: """ # return isinstance(token, Token) and token.type == TokenKind.LPAR if isinstance(token, Token) and token.type == TokenKind.LPAR: return True if isinstance(token, tuple) and token[0].type == TokenKind.LPAR: return True if isinstance(token, UnrecognizedTokensNode) and token.parenthesis_count > 0: return True return False def _is_rpar(self, token): """ True if the token is a right parenthesis ')' Note that when we are parsing non recognized tokens, we consider that the parenthesis are part of the non recognized :param token: :return: """ return isinstance(token, Token) and token.type == TokenKind.RPAR def _concepts(self): """ Return the concept currently being parsed :return: """ res = [] for item in self.stack: if isinstance(item, SyaConceptParserHelper): res.append(item) return res def _put_to_out(self, item): """ Helper function that Put an item in the out :param item: :return: """ if isinstance(item, SyaConceptParserHelper) and len(item.expected) > 0 and not item.error: if item.expected[0].type == TokenKind.VAR_DEF: item.error = "Not enough suffix parameters" else: item.error = f"token '{item.expected[0].str_value}' not found" if isinstance(item, SyaConceptParserHelper) and item.potential_pos != -1: self.out.insert(item.potential_pos, item) else: self.out.append(item) # put the item to the list of awaiting parameters self.parameters_list.append(item) if len(self._concepts()) > 0: # try to predict the final position of the current concept # This position can be altered by concept associativity and precedence # So it's only a prediction current = self._concepts()[-1] if current.expected_parameters() == len(self.parameters_list) - len(current.parameters_list_at_init): self._concepts()[-1].potential_pos = len(self.out) def _stack_isinstance(self, type): """ Check the type of the top item in the stack :param type: :return: """ return len(self.stack) > 0 and isinstance(self.stack[-1], type) def _get_lexer_nodes_from_unrecognized(self): """ Use the source of self.unrecognized_tokens gto find concepts or source code :return: """ res = builtin_helpers.parse_unrecognized(self.context, self.unrecognized_tokens.source, PARSERS) only_parsers_results = builtin_helpers.only_parsers_results(self.context, res) if not only_parsers_results.status: return None return builtin_helpers.get_lexer_nodes( only_parsers_results.body.body, self.unrecognized_tokens.start, self.unrecognized_tokens.tokens) def _make_source_code_with_concept(self, start, rpar_token, end): """ :param start: :param rpar_token: :param end: :return: """ source_code = SourceCodeWithConceptNode( self.stack.pop(), UnrecognizedTokensNode(end, end, [rpar_token]), self.out[start + 1:] ).pseudo_fix_source() return source_code def _transform_to_unrecognized(self, parser_helper): # an Unrecognized when sent to out too prematurely if len(self.out) > 0 and isinstance(self.out[-1], UnrecognizedTokensNode): self.unrecognized_tokens = self.out.pop() if parser_helper.remember_whitespace: self.unrecognized_tokens.add_token(parser_helper.remember_whitespace, parser_helper.start - 1) for i, token in enumerate(parser_helper.tokens): self.unrecognized_tokens.add_token(token, parser_helper.start + i) def get_errors(self): res = [] res.extend(self.errors) res.extend([item for item in self.out if isinstance(item, SyaConceptParserHelper) and item.error]) return res def lock(self): self.is_locked = True def reset(self): if len(self.errors) > 0: return self.is_locked = False def manage_parameters_when_new_concept(self, parser_helper): """ When a new concept is create, we need to check what to do with the parameters that were queued :param parser_helper: new concept :return: """ if len(self.parameters_list) < parser_helper.expected_parameters_before_first_token: # The new concept expect some prefix parameters, but there's not enough parser_helper.error = "Not enough prefix parameters" return if len(self.parameters_list) > parser_helper.expected_parameters_before_first_token: # There are more parameters than needed by the new concept # The others are either # - parameters for the previous concept (if any) # - concepts on their own # - syntax error # In all the cases, the only thing that matter is to pop what is expected by the new concept for i in range(parser_helper.expected_parameters_before_first_token): self.parameters_list.pop() parser_helper.parameters_list_at_init.extend(self.parameters_list) return # len(self.parameters_list) == temp_concept_node.expected_parameters_before_first_token # => We consider that the parameter are part of the new concept if len(self._concepts()) > 1: # The new concept is a parameter of the previous one. # So reset the potential_pos of the previous concept self._concepts()[-2].potential_pos = -1 # eat them all self.parameters_list.clear() def manage_parameters(self): """ Some new parameters were added to the list. What to do with them ? :return: """ # manage parenthesis that didn't find any match if self._is_lpar(self.stack[-1]): self._add_error(ParenthesisMismatchErrorNode(self.stack[-1])) # The parameter must be part the current concept being parsed assert len(self._concepts()) != 0 # sanity check current_concept = self._concepts()[-1] while len(current_concept.expected) > 0 and current_concept.expected[0].type == TokenKind.VAR_DEF: # eat everything that was expected if len(self.parameters_list) == 0: current_concept.error = f"Failed to match parameter '{current_concept.expected[0].str_value}'" return del self.parameters_list[0] del current_concept.expected[0] def manage_unrecognized(self): if self.unrecognized_tokens.is_empty(): return # do not put empty UnrecognizedToken in out if self.unrecognized_tokens.is_whitespace(): self.unrecognized_tokens.reset() return self.unrecognized_tokens.fix_source() # try to recognize concepts nodes_sequences = self._get_lexer_nodes_from_unrecognized() if nodes_sequences: # There are more than one solution found # In the case, we create a new InfixToPostfix for each new possibility if len(nodes_sequences) > 1: for node_sequence in nodes_sequences[1:]: clone = self.clone() for node in node_sequence: clone._put_to_out(node) clone.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) self.forked.append(clone) # Do not forget the first result that will go with the current InfixToPostfix for node in nodes_sequences[0]: self._put_to_out(node) else: self._put_to_out(self.unrecognized_tokens) # # try to recognize concepts # nodes = self._get_lexer_nodes_from_unrecognized() # if nodes: # for node in nodes: # self._put_to_out(node) # else: # self._put_to_out(self.unrecognized_tokens) # create another instance self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) def get_functions_from_unrecognized(self, token, pos): """ The unrecognized ends with an lpar '(' It means that its a function like foo(something) The problem is that we need to know if there are other conceps before the function ex : suffix one function(x) suffix and one are not / may not be part of the name of the function We need to call the function to recognize the parts and act accordingly :return: list of function_parser_res """ self.unrecognized_tokens.fix_source() nodes_sequences = self._get_lexer_nodes_from_unrecognized() if nodes_sequences is None: return None res = [] for sequence in nodes_sequences: if isinstance(sequence[-1], UnrecognizedTokensNode): function = sequence[-1] else: function = UnrecognizedTokensNode(sequence[-1].start, sequence[-1].end, sequence[-1].tokens) function.add_token(token, pos).fix_source() res.append(function_parser_res(sequence[:-1], function)) return res def pop_stack_to_out(self): """ Helper function that pops the stack and put the item to the output, if needed :return: """ item = self.stack[-1] # fix the concept is needed if isinstance(item, SyaConceptParserHelper): # make sure the expected parameters of this item are eaten if 0 < len(item.expected) <= len(self.parameters_list): self.manage_parameters() item.fix_concept() self.stack.pop() self._put_to_out(item) def i_can_pop(self, concept_node): """ Validate the Shunting Yard Algorithm conditions to pop out from the stack Note that it's a custom implementation as I need to manage UnrecognizedTokensNode :param concept_node: :return: """ if len(self.stack) == 0: return False stack_head = self.stack[-1] if not isinstance(stack_head, SyaConceptParserHelper): # mostly left parenthesis return False current = concept_node.concept stack = stack_head.concept if stack.associativity == SyaAssociativity.No and current.associativity == SyaAssociativity.No: self._add_error(NoneAssociativeSequenceErrorNode(current.concept, stack_head.start, concept_node.start)) if not current.precedence: # precedence is not set (None or zero) # Do not apply any rule return False if current.associativity == SyaAssociativity.Left and current.precedence <= stack.precedence: return True if current.associativity == SyaAssociativity.Right and current.precedence < stack.precedence: return True return False def handle_expected_token(self, token, pos): """ True if the token is part of the concept being parsed and the last token in a sequence is eaten Example : Concept("foo a bar b").def_var("a").def_var("b") The expected tokens are 'foo' and 'bar' (as a and b are parameters) Example: Concept("foo a bar baz b").def_var("a").def_var("b") If the token is 'bar', it will be eaten but handle_expected_token() will return False as we still expect 'baz' :param token: :param pos: :return: """ def _pop_stack(c): while self.stack[-1] != c and not self._is_lpar(c): self.pop_stack_to_out() if self._is_lpar(self.stack[-1]): self._add_error(ParenthesisMismatchErrorNode(self.stack[-1])) return False # Manage concepts ending with long names if self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1].is_matched(): self.pop_stack_to_out() for current_concept in reversed(self._concepts()): # As I may loose memory again ;-) # it's a reversed loop to manage cases like # if a plus b then ... # The current concept is 'plus', but the token is 'then' # It's means that I have finished to parse the 'plus' and started the second part of the 'if' if current_concept.is_next(token): current_concept.end = pos current_concept.tokens.append(token) if current_concept.eat_token(token): _pop_stack(current_concept) return True if len(current_concept.expected) > 0 and current_concept.expected[0].type != TokenKind.VAR_DEF: if current_concept.expected[0].type == TokenKind.WHITESPACE: # drop it. It's the case where an optional whitespace is missing del (current_concept.expected[0]) else: # error # We are not parsing the concept we tought we were parsing. # Transform the eaten tokens into unrecognized # and discard the current SyaConceptParserHelper # TODO: manage the pending LPAR, RPAR ? self._transform_to_unrecognized(current_concept) self.false_positives.append(current_concept) self.stack.pop() return False if current_concept.is_expected(token): # Fix the whitespace between var and expected if needed # current_concept.expected[0] is '' # current_concept.expected[1] is what separate var from expected (normally a whitespace) if current_concept.expected[1].type == TokenKind.WHITESPACE: self.unrecognized_tokens.pop(TokenKind.WHITESPACE) current_concept.end = pos self.manage_unrecognized() # manage that some clones may have been forked for forked in self.forked: forked.handle_expected_token(token, pos) # manage concepts found between tokens (of ternary for example) self.manage_parameters() if len(self.parameters_list) > len(current_concept.parameters_list_at_init): # we have eaten the parameters expected between two expected tokens # But there are some remaining parameters self._add_error(TooManyParametersFound( current_concept.concept.concept, current_concept.start, token, self.parameters_list[:])) return True # no need to continue while self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1] != current_concept: current = self.stack[-1] if current.error: self._transform_to_unrecognized(current) self.false_positives.append(current) self.stack.pop() if current_concept.expected[1].type == TokenKind.WHITESPACE: self.unrecognized_tokens.pop(TokenKind.WHITESPACE) self.manage_unrecognized() # manage that some clones may have been forked for forked in self.forked: forked.handle_expected_token(token, pos) else: self.pop_stack_to_out() self.manage_parameters() # maybe eat whitespace that was between and expected token if current_concept.expected[0].type == TokenKind.WHITESPACE: del current_concept.expected[0] if current_concept.eat_token(token): _pop_stack(current_concept) return True return False def eat_token(self, token, pos): """ Receive at token. It will be processed if it's expected by a concept or if it's a parenthesis :param token: :param pos: :return: """ if self.is_locked: return if self.handle_expected_token(token, pos): # a token is found, let's check if it's part of a concepts being parsed # example Concept(name="foo", definition="foo a bar b").def_var("a").def_var("b") # if the token 'bar' is found, it has to be considered as part of the concept foo self.debug.append(token) return True elif self._is_lpar(token): self.debug.append(token) if self.unrecognized_tokens.is_empty() or self.unrecognized_tokens.is_whitespace(): # first, remove what was in the buffer self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.eat_token(token, pos) self.stack.append((token, pos)) else: # the parenthesis is part of the unrecognized # So it's a function list_of_results = self.get_functions_from_unrecognized(token, pos) if list_of_results: instances = [self] for i in range(len(list_of_results) - 1): clone = self.clone() self.forked.append(clone) instances.append(clone) # Manage the result for self and its clones for instance, parsing_res in zip(instances, list_of_results): for to_out in parsing_res.to_out: instance._put_to_out(to_out) instance._put_to_out(")") # mark where the function should end instance.stack.append(parsing_res.function) instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) # reset unrecognized else: self._put_to_out(")") # mark where the function should end self.eat_unrecognized(token, pos) # add the '(' to the rest of the unknown self.stack.append(self.unrecognized_tokens.fix_source()) self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) return True elif self._is_rpar(token): self.debug.append(token) # first, remove what was in the buffer self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.eat_token(token, pos) # pop everything but the lpar from stack to 'out' while len(self.stack) > 0 and not self._is_lpar(self.stack[-1]): self.pop_stack_to_out() # checks consistency if an lpar is found if len(self.stack) == 0: self._add_error(ParenthesisMismatchErrorNode((token, pos))) return None if self._stack_isinstance(UnrecognizedTokensNode): # the parenthesis was a function # we need to return a SourceCodeWithConceptNode for i in range(len(self.out) - 1, -1, -1): if self.out[i] == ')': start = i break else: self._add_error(ParenthesisMismatchErrorNode((token, pos))) return None source_code = self._make_source_code_with_concept(start, token, pos) for item in self.out[start:]: # update the parameter list try: self.parameters_list.remove(item) except ValueError: pass del self.out[start:] self._put_to_out(source_code) # self.pop_stack_to_out() # # Replace the ')' marker by its real position # for i in range(len(self.out) - 1, -1, -1): # if self.out[i] == ')': # self.out[i] = UnrecognizedTokensNode(pos, pos, [token]) else: self.stack.pop() # discard the lpar return True return False def eat_concept(self, sya_concept_def, token, pos): """ a concept is found :param sya_concept_def: :param token: :param pos: :return: """ if self.is_locked: return self.debug.append(sya_concept_def) parser_helper = SyaConceptParserHelper(sya_concept_def, pos) if self.unrecognized_tokens.last_token_type() == TokenKind.WHITESPACE: parser_helper.remember_whitespace = self.unrecognized_tokens.tokens[-1] if Token.is_whitespace(parser_helper.last_token_before_first_token): self.unrecognized_tokens.pop(TokenKind.WHITESPACE) # First, try to recognize the tokens that are waiting self.manage_unrecognized() for forked in self.forked: # manage the fact that some clone may have been forked forked.eat_concept(sya_concept_def, token, pos) # then, check if this new concept is linked to the previous ones # ie, is the previous concept fully matched ? if parser_helper.expected_parameters_before_first_token == 0: # => does not expect pending parameter (it's suffixed concept) while self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1].potential_pos != -1: # => previous seems to have everything it needs in the parameter list self.pop_stack_to_out() if parser_helper.is_atom(): self._put_to_out(parser_helper.fix_concept()) else: # call shunting yard algorithm while self.i_can_pop(parser_helper): self.pop_stack_to_out() if parser_helper.is_matched(): # case of a prefix concept which has found happiness with self.parameters_list # directly put it in out self.manage_parameters_when_new_concept(parser_helper) self._put_to_out(parser_helper.fix_concept()) else: self.stack.append(parser_helper) self.manage_parameters_when_new_concept(parser_helper) def eat_unrecognized(self, token, pos): """ The token was not recognized, add to the current UnrecognizedTokensNode :param token: :param pos: :return: """ if self.is_locked: return self.debug.append(token) self.unrecognized_tokens.add_token(token, pos) def finalize(self): """ Put the remaining items from the stack to out :return: """ if self.is_locked: return if len(self.stack) == 0 and len(self.out) == 0: return # no need to pop the buffer, as no concept is found while len(self.stack) > 0: parser_helper = self.stack[-1] # validate parenthesis if self._is_lpar(parser_helper) or self._is_rpar(parser_helper): self._add_error(ParenthesisMismatchErrorNode(parser_helper)) return None self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize() failed_to_match = sum(map(lambda e: e.type != TokenKind.VAR_DEF, parser_helper.expected)) if failed_to_match > 0: # didn't manage to read all tokens. # Transform them into unrecognized self._transform_to_unrecognized(parser_helper) self.false_positives.append(parser_helper) self.stack.pop() # discard the parser helper else: self.pop_stack_to_out() # process it self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize() def clone(self): clone = InFixToPostFix(self.context) clone.is_locked = self.is_locked clone.out = self.out[:] clone.stack = [i.clone() if hasattr(i, "clone") else i for i in self.stack] clone.unrecognized_tokens = self.unrecognized_tokens.clone() clone.parameters_list = self.parameters_list[:] clone.errors = self.errors[:] clone.debug = self.debug[:] # clone.forked = self.forked return clone @dataclass() class PostFixToItem: concept: Concept start: int end: int has_unrecognized: bool class SyaNodeParser(BaseNodeParser): def __init__(self, **kwargs): super().__init__("SyaNode", 50, **kwargs) if 'sheerka' in kwargs: sheerka = kwargs.get("sheerka") self.sya_definitions = sheerka.resolved_sya_def else: self.concepts_by_first_keyword = {} self.sya_definitions = {} self.token = None self.pos = -1 self.tokens = None self.context: ExecutionContext = None self.text = None self.sheerka = None def init_from_concepts(self, context, concepts, **kwargs): super().init_from_concepts(context, concepts) sya_definitions = kwargs.get("sya", None) if sya_definitions: self.sya_definitions = sya_definitions @staticmethod def _is_eligible(concept): """ Predicate that select concepts that must handled by AtomNodeParser :param concept: :return: """ # We only concepts that has parameter (refuse atoms) # Bnf definitions are not supposed to be managed by this parser either return len(concept.metadata.variables) > 0 and concept.metadata.definition_type != DEFINITION_TYPE_BNF @staticmethod def _get_sya_concept_def(parser, concept): sya_concept_def = SyaConceptDef(concept) if concept.id in parser.sya_definitions: sya_def = parser.sya_definitions.get(concept.id) if sya_def[0] is not None: sya_concept_def.precedence = sya_def[0] if sya_def[1] is not None: sya_concept_def.associativity = sya_def[1] if parser.sheerka: concept_weight = parser.sheerka.get_concepts_weights(BuiltinConcepts.PRECEDENCE) if concept.id in concept_weight: sya_concept_def.precedence = concept_weight[concept.id] if associativity := concept.get_prop(BuiltinConcepts.ASSOCIATIVITY): sya_concept_def.associativity = SyaAssociativity(associativity) return sya_concept_def def infix_to_postfix(self, context, text): """ Implementing Shunting Yard Algorithm :param context: :param text: :return: """ if not self.reset_parser(context, text): return None forked = [] def _add_forked_to_res(): # check that if some new InfixToPostfix are created for in_to_post in res: if len(in_to_post.forked) > 0: forked.extend(in_to_post.forked) in_to_post.forked.clear() if len(forked) > 0: res.extend(forked) forked.clear() res = [InFixToPostFix(context)] while self.next_token(False): for infix_to_postfix in res: infix_to_postfix.reset() token = self.get_token() try: if token.type in (TokenKind.LPAR, TokenKind.RPAR): # little optim, no need to lock, unlock or get the concept when parenthesis for infix_to_postfix in res: infix_to_postfix.eat_token(token, self.pos) continue for infix_to_postfix in res: if infix_to_postfix.eat_token(token, self.pos): infix_to_postfix.lock() concepts = self.get_concepts(token, self._is_eligible, to_map=self._get_sya_concept_def) if not concepts: for infix_to_postfix in res: infix_to_postfix.eat_unrecognized(token, self.pos) continue if len(concepts) == 1: for infix_to_postfix in res: infix_to_postfix.eat_concept(concepts[0], token, self.pos) continue # make the cartesian product temp_res = [] for infix_to_postfix in res: for concept in concepts: clone = infix_to_postfix.clone() temp_res.append(clone) clone.eat_concept(concept, token, self.pos) res = temp_res finally: _add_forked_to_res() # make sure that remaining items in stack are moved to out for infix_to_postfix in res: infix_to_postfix.reset() infix_to_postfix.finalize() _add_forked_to_res() return res def postfix_to_item(self, sheerka, postfixed): item = postfixed.pop() if isinstance(item, (UnrecognizedTokensNode, SourceCodeNode, ConceptNode)): return item if isinstance(item, SourceCodeWithConceptNode): items = [] while len(item.nodes) > 0: res = self.postfix_to_item(sheerka, item.nodes) if isinstance(res, PostFixToItem): items.append(ConceptNode(res.concept, res.start, res.end, self.tokens[res.start: res.end + 1])) else: items.append(res) item.has_unrecognized |= hasattr(res, "has_unrecognized") and res.has_unrecognized or \ isinstance(res, UnrecognizedTokensNode) item.nodes = items item.fix_all_pos() item.tokens = self.tokens[item.start:item.end + 1] item.fix_source(True) return item # ParserHelper start = item.start end = item.end has_unrecognized = False concept = sheerka.new_from_template(item.concept, item.concept.id) for param_index in reversed(range(len(concept.metadata.variables))): inner_item = self.postfix_to_item(sheerka, postfixed) if inner_item.start < start: start = inner_item.start if inner_item.end > end: end = inner_item.end has_unrecognized |= isinstance(inner_item, UnrecognizedTokensNode) param_name = concept.metadata.variables[param_index][0] param_value = inner_item.concept if hasattr(inner_item, "concept") else \ [inner_item.return_value] if isinstance(inner_item, SourceCodeNode) else \ inner_item concept.compiled[param_name] = param_value return PostFixToItem(concept, start, end, has_unrecognized) def parse(self, context, parser_input): """ :param context: :param parser_input: :return: """ if parser_input == "": return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) ret = [] valid_infix_to_postfixs = self.get_valid(self.infix_to_postfix(context, parser_input)) if valid_infix_to_postfixs is None: # token error return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) if len(valid_infix_to_postfixs) == 0: return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input)) for infix_to_postfix in valid_infix_to_postfixs: sequence = [] has_unrecognized = False while len(infix_to_postfix.out) > 0: item = self.postfix_to_item(context.sheerka, infix_to_postfix.out) has_unrecognized |= hasattr(item, "has_unrecognized") and item.has_unrecognized or \ isinstance(item, UnrecognizedTokensNode) if isinstance(item, PostFixToItem): to_insert = ConceptNode(item.concept, item.start, item.end, self.tokens[item.start: item.end + 1]) else: to_insert = item sequence.insert(0, to_insert) if has_unrecognized: # Manage some sick cases where missing parenthesis mess the order or the sequence # example "foo bar(one plus two" sequence.sort(key=attrgetter("start")) ret.append( self.sheerka.ret( self.name, not has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input, body=sequence, try_parsed=sequence))) if len(ret) == 1: self.log_result(context, parser_input, ret[0]) return ret[0] else: self.log_multiple_results(context, parser_input, ret) return ret @staticmethod def get_valid(infix_to_postfixs): """ Gets the valid infixToPostfix :param infix_to_postfixs: :return: """ def _has_sya(items): for item in items: if isinstance(item, SourceCodeWithConceptNode): if _has_sya(item.nodes): return True if isinstance(item, SyaConceptParserHelper): return True return False if infix_to_postfixs is None: return None result = [] for infix_to_postfix in infix_to_postfixs: if len(infix_to_postfix.get_errors()) > 0: continue if len(infix_to_postfix.out) == 0: continue if infix_to_postfix in result: continue if not _has_sya(infix_to_postfix.out): # refuses the result if it does not involve SYA continue result.append(infix_to_postfix) return result # @staticmethod # def init_sheerka(self, sheerka): # if hasattr(BaseNodeParser, "init_sheerka"): # BaseNodeParser.init_sheerka(sheerka) # # # init syadefinitins