from collections import namedtuple from dataclasses import dataclass, field from operator import attrgetter from typing import List from core import builtin_helpers from core.builtin_concepts import BuiltinConcepts from core.builtin_helpers import parse_function from core.concept import Concept, DEFINITION_TYPE_BNF from core.sheerka.services.SheerkaComparisonManager import SheerkaComparisonManager from core.sheerka.services.SheerkaExecute import ParserInput from core.tokenizer import Token, TokenKind, Tokenizer from core.utils import get_n_clones from parsers.BaseNodeParser import UnrecognizedTokensNode, ConceptNode, SourceCodeNode, SyaAssociativity, \ SourceCodeWithConceptNode, BaseNodeParser from parsers.BaseParser import ErrorNode PARSERS = ["BnfNode", "AtomNode", "Python"] function_parser_res = namedtuple("FunctionParserRes", 'to_out function') DEBUG_PUSH = "PUSH" DEBUG_PUSH_UNREC = "PUSH_UNREC" DEBUG_POP = "POP" DEBUG_EAT = "EAT" DEBUG_RECOG = "RECOG" @dataclass() class DebugInfo: """ Debug item to trace how the sya parser worked Possible action: PUSH: push the token or the concept to the stack PUSH_UNREC: push the token to the UnrecognizedTokensNode POP: pop item to out EAT: eat the current token (it means that it was part of the concept currently being parsed) RECOG: when tokens from UnrecognizedTokensNode are parsed and recognized """ pos: int = -1 # position of the parser input token: Token = None # current token concept: Concept = None # current concept if ay action: str = None # action taken def __repr__(self): token_repr = self.token.repr_value if isinstance(self.token, Token) else self.token msg = f"{self.pos:3}:{token_repr}" if self.pos != -1 else " _:" if self.concept: msg += f"({self.concept})" return msg + f" => {self.action}" class ParenthesisMismatchErrorNode(ErrorNode): def __init__(self, error_int): if isinstance(error_int, tuple): if isinstance(error_int[0], Token): self.token_value = error_int[0].value self.token = error_int[0] else: self.token_value = error_int[0] self.token = None self.pos = error_int[1] elif isinstance(error_int, Token): self.token = error_int self.token_value = error_int.value self.pos = -1 else: # isinstance(UnrecognizedTokensNode) for i, t in reversed(list(enumerate(error_int.tokens))): if t.type == TokenKind.LPAR: self.token = t self.token_value = t.value self.pos = i + error_int.start def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, ParenthesisMismatchErrorNode): return False return self.token_value == other.token_value and self.pos == other.pos def __hash__(self): return hash(self.pos) def __repr__(self): return f"ParenthesisMismatchErrorNode('{self.token_value}', {self.pos}" @dataclass() class NoneAssociativeSequenceErrorNode(ErrorNode): concept: Concept first: int second: int tokens: List[Token] = None @dataclass() class TooManyParametersFound(ErrorNode): concept: Concept pos: int # position of the concept token: Token # token of the concept where the error was noticed parameters: list # list of unmatched parameters def __repr__(self): return f"Too many parameters found for '{self.concept}' before token '{self.token}'" @dataclass() class SyaConceptDef: """ Wrapper to concept It gives the precedence and the associativity for the concept """ concept: Concept precedence: int = SheerkaComparisonManager.DEFAULT_COMPARISON_VALUE associativity: SyaAssociativity = SyaAssociativity.Right @dataclass() class SyaConceptParserHelper: """ Use because the is not enough information to create the final ConceptNode """ concept: Concept start: int # position of the token in the tokenizer (Caution, it is not token.index) end: int = field(default=-1, repr=False, compare=False, hash=None) expected: List[Token] = field(default_factory=list, repr=False, compare=False, hash=None) expected_parameters_before_first_token: int = field(default=0, repr=False, compare=False, hash=None) last_token_before_first_token: Token = field(default=None, repr=False, compare=False, hash=None) potential_pos: int = field(default=-1, repr=False, compare=False, hash=None) parameters_list_at_init: list = field(default_factory=list, repr=False, compare=False, hash=None) tokens: List[Token] = field(default_factory=list, repr=False, compare=False, hash=None) # tokens eaten remember_whitespace: Token = field(default=None, repr=False, compare=False, hash=None) error: str = None def __post_init__(self): concept = self.concept.concept if isinstance(self.concept, SyaConceptDef) else self.concept if self.end == -1: self.end = self.start first_keyword_found = None for token in Tokenizer(concept.key, yield_eof=False): if not first_keyword_found and token.type != TokenKind.WHITESPACE and token.type != TokenKind.VAR_DEF: first_keyword_found = token if first_keyword_found: self.expected.append(token) else: self.last_token_before_first_token = token if token.type != TokenKind.WHITESPACE: self.expected_parameters_before_first_token += 1 self.eat_token(first_keyword_found) # remove the first token self.tokens.append(first_keyword_found) def is_matched(self): return len(self.expected) == 0 def is_atom(self): return len(self.concept.concept.metadata.variables) == 0 and len(self.expected) == 0 def is_next(self, token): """ To match long named concepts :param token: :return: """ if self.is_matched() or len(self.expected) == 0: return False # True if the next token is the one that is expected # Or if the next token is a whitespace and the expected one is the one after # (whitespace are sometimes not mandatory) return token.strip_quote == self.expected[0].strip_quote or \ self.expected[0].type == TokenKind.WHITESPACE and token.strip_quote == self.expected[1].strip_quote def is_expected(self, token): if self.is_matched() or token.type == TokenKind.WHITESPACE: return False for expected in self.expected: if expected.type != TokenKind.VAR_DEF and expected.strip_quote == token.strip_quote: return True return False def expected_parameters(self): return sum(map(lambda e: e.type == TokenKind.VAR_DEF, self.expected)) def eat_token(self, until_token): """ eat until token 'until' :param until_token: :return: """ # No check, as it is used only after is_expected() or is_next() while self.expected[0].strip_quote != until_token.strip_quote: del self.expected[0] del self.expected[0] # return True is a whole sequence of keyword is eaten # example # Concept("foo a bar baz qux b").def_var("a").def_var("b") # 'bar' is just eaten. We will return False because 'baz' and 'qux' are still waiting if len(self.expected) == 0: return True # also return True at the end of a name sequence # ... bar baz qux # return True after 'qux', to indicate all the parameters from must be processed return self.expected[0].type == TokenKind.VAR_DEF def eat_parameter(self, parameter): if self.is_matched() and parameter == self: return # not a error if self.is_matched(): self.error = "No more parameter expected" return if self.expected[0].type != TokenKind.VAR_DEF: self.error = "Parameter was not expected" return del self.expected[0] def fix_concept(self): """ When the SYA is done, we only need the concept, not the sya concept :return: """ if isinstance(self.concept, SyaConceptDef): self.concept = self.concept.concept return self def clone(self): clone = SyaConceptParserHelper(self.concept, self.start, self.end) clone.expected = self.expected[:] clone.expected_parameters_before_first_token = self.expected_parameters_before_first_token clone.potential_pos = self.potential_pos clone.parameters_list_at_init = self.parameters_list_at_init clone.error = self.error return clone class InFixToPostFix: def __init__(self, context, debug_enabled=False): self.context = context self.debug_enabled = debug_enabled self.is_locked = False # when locked, cannot process input self.out = [] # shunting yard algo out self.stack = [] # shunting yard algo stack self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) # buffer that keeps tracks of tokens positions self.parameters_list = [] # list of the parameters that need to be associated to a concept self.errors = [] # Not quite sure that I can handle more than one error self.debug = [] self.false_positives = [] # concepts that looks like known one, but not (for debug purpose) self.forked = [] # use to fork InFixToPostFix when multiple parsers recognize the unrecognized_tokens self.parsing_function = False # indicate that we are currently parsing a function def __repr__(self): return f"InFixToPostFix({self.debug})" def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, InFixToPostFix): return False return self.out == other.out and self.errors == other.errors def __hash__(self): return len(self.sequence) + len(self.errors) def _add_error(self, error): if self.debug_enabled: self.debug.append(DebugInfo(action=f"=> ERROR {error}")) self.errors.append(error) def _is_lpar(self, token): """ True if the token is a left parenthesis '(' Note that when we are parsing non recognized tokens, we consider that the parenthesis are part of the non recognized :param token: :return: """ # return isinstance(token, Token) and token.type == TokenKind.LPAR if isinstance(token, Token) and token.type == TokenKind.LPAR: return True if isinstance(token, tuple) and token[0].type == TokenKind.LPAR: return True if isinstance(token, UnrecognizedTokensNode) and token.parenthesis_count > 0: return True return False def _is_rpar(self, token): """ True if the token is a right parenthesis ')' Note that when we are parsing non recognized tokens, we consider that the parenthesis are part of the non recognized :param token: :return: """ return isinstance(token, Token) and token.type == TokenKind.RPAR def _concepts(self): """ Return the concept currently being parsed :return: """ res = [] for item in self.stack: if isinstance(item, SyaConceptParserHelper): res.append(item) return res def _put_to_out(self, item): """ Helper function that Put an item in the out :param item: :return: """ if isinstance(item, SyaConceptParserHelper) and len(item.expected) > 0 and not item.error: if item.expected[0].type == TokenKind.VAR_DEF: item.error = "Not enough suffix parameters" else: item.error = f"token '{item.expected[0].strip_quote}' not found" if self.debug_enabled: self.debug.append(DebugInfo(action=f"ERROR {item.error}")) if self.debug_enabled: self.debug.append(DebugInfo(action=f"{DEBUG_POP} {item}")) if isinstance(item, SyaConceptParserHelper) and item.potential_pos != -1: self.out.insert(item.potential_pos, item) else: self.out.append(item) # put the item to the list of awaiting parameters only if it's not the end of function marker if item != ")": self.parameters_list.append(item) if len(self._concepts()) > 0: # try to predict the final position of the current concept # This position can be altered by concept associativity and precedence # So it's only a prediction current = self._concepts()[-1] if current.expected_parameters() == len(self.parameters_list) - len(current.parameters_list_at_init): self._concepts()[-1].potential_pos = len(self.out) def _stack_isinstance(self, type): """ Check the type of the top item in the stack :param type: :return: """ return len(self.stack) > 0 and isinstance(self.stack[-1], type) def _make_source_code_with_concept(self, start, rpar_token, end): """ :param start: :param rpar_token: :param end: :return: """ source_code = SourceCodeWithConceptNode( self.stack.pop(), UnrecognizedTokensNode(end, end, [rpar_token]), self.out[start + 1:] ).pseudo_fix_source() return source_code def _transform_to_unrecognized(self, parser_helper): # an Unrecognized when sent to out too prematurely if len(self.out) > 0 and isinstance(self.out[-1], UnrecognizedTokensNode): self.unrecognized_tokens = self.out.pop() if parser_helper.remember_whitespace: self.unrecognized_tokens.add_token(parser_helper.remember_whitespace, parser_helper.start - 1) for i, token in enumerate(parser_helper.tokens): self.unrecognized_tokens.add_token(token, parser_helper.start + i) def _remove_debug_info_if_needed(self): """ Before trying to manage the unrecognized, a line is added to explain the token which has triggered the recognition try This line is useless if self.unrecognized_tokens was irrelevant :return: """ if len(self.debug) > 0 and self.debug[-1].action == "??": self.debug.pop() def _debug_nodes(self, nodes_sequences): res = "[" first = True for sequence in nodes_sequences: if not first: res += ", " res += "[" + ", ".join([n.to_short_str() for n in sequence]) + "]" first = False return res + "]" def get_errors(self): def has_error(item): if isinstance(item, SyaConceptParserHelper) and item.error: return True if isinstance(item, SourceCodeWithConceptNode): for n in item.nodes: if hasattr(n, "error") and n.error: return True return False res = [] res.extend(self.errors) res.extend([item for item in self.out if has_error(item)]) return res def lock(self): self.is_locked = True def reset(self): if len(self.errors) > 0: return self.is_locked = False def manage_parameters_when_new_concept(self, parser_helper): """ When a new concept is create, we need to check what to do with the parameters that were queued :param parser_helper: new concept :return: """ if len(self.parameters_list) < parser_helper.expected_parameters_before_first_token: # The new concept expect some prefix parameters, but there's not enough parser_helper.error = "Not enough prefix parameters" return if len(self.parameters_list) > parser_helper.expected_parameters_before_first_token: # There are more parameters than needed by the new concept # These others parameters are either # - parameters for the previous suffixed concept (if any) # - concepts on their own # - syntax error # In all the cases, the only thing that matter is to pop what is expected by the new concept for i in range(parser_helper.expected_parameters_before_first_token): self.parameters_list.pop() parser_helper.parameters_list_at_init.extend(self.parameters_list) return # len(self.parameters_list) == temp_concept_node.expected_parameters_before_first_token # => We consider that the parameter are part of the new concept if len(self._concepts()) > 1: # The new concept is a parameter of the previous one. # So reset the potential_pos of the previous concept self._concepts()[-2].potential_pos = -1 # eat them all self.parameters_list.clear() def manage_parameters(self): """ Some new parameters were added to the list. What to do with them ? :return: """ # manage parenthesis that didn't find any match if self._is_lpar(self.stack[-1]): self._add_error(ParenthesisMismatchErrorNode(self.stack[-1])) # The parameter must be part the current concept being parsed assert len(self._concepts()) != 0 # sanity check current_concept = self._concepts()[-1] while len(current_concept.expected) > 0 and current_concept.expected[0].type == TokenKind.VAR_DEF: # eat everything that was expected if len(self.parameters_list) == 0: current_concept.error = f"Failed to match parameter '{current_concept.expected[0].strip_quote}'" return del self.parameters_list[0] del current_concept.expected[0] def manage_unrecognized(self): if self.unrecognized_tokens.is_empty(): return # do not put empty UnrecognizedToken in out if self.unrecognized_tokens.is_whitespace(): self.unrecognized_tokens.reset() return self.unrecognized_tokens.fix_source() if self.unrecognized_tokens.parenthesis_count > 0: # parenthesis mismatch detected, do not try to resolve the unrecognized self._add_error(ParenthesisMismatchErrorNode(self.unrecognized_tokens)) self._put_to_out(self.unrecognized_tokens) else: # try to recognize concepts nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized( self.context, self.unrecognized_tokens, PARSERS) if nodes_sequences: # There are more than one solution found # In the case, we create a new InfixToPostfix for each new possibility if self.debug_enabled: self.debug.append(DebugInfo(action=f"{DEBUG_RECOG} {self._debug_nodes(nodes_sequences)}")) if len(nodes_sequences) > 1: for node_sequence in nodes_sequences[1:]: clone = self.clone() for node in node_sequence: clone._put_to_out(node) clone.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) self.forked.append(clone) # Do not forget the first result that will go with the current InfixToPostfix for node in nodes_sequences[0]: self._put_to_out(node) else: self._put_to_out(self.unrecognized_tokens) # create another instance self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) def get_functions_names_from_unrecognized(self, token, pos): """ The unrecognized ends with an lpar '(' It means that its a function like foo(something) The problem is that we need to know if there are other concepts before the function ex : suffix one function(x) suffix and one are not / may not be part of the name of the function We need to call the function to recognize the parts and act accordingly :return: list of function_parser_res """ self.unrecognized_tokens.fix_source() nodes_sequences = builtin_helpers.get_lexer_nodes_from_unrecognized( self.context, self.unrecognized_tokens, PARSERS) if not nodes_sequences: nodes_sequences = [[self.unrecognized_tokens.clone()]] res = [] for sequence in nodes_sequences: last_node = sequence[-1] if len(last_node.tokens) > 1: if isinstance(last_node, UnrecognizedTokensNode): to_out = [UnrecognizedTokensNode(last_node.start, pos - 2, last_node.tokens[:-1]).fix_source()] function_name = UnrecognizedTokensNode(pos - 1, pos - 1, [last_node.tokens[-1]]) function_name.add_token(token, pos) else: to_out = [last_node.fix_source()] function_name = None else: # len(last_node.tokens) == 1 if not isinstance(last_node, UnrecognizedTokensNode): function_name = UnrecognizedTokensNode(last_node.start, last_node.end, last_node.tokens) else: function_name = last_node function_name.add_token(token, pos) to_out = [] res.append(function_parser_res(sequence[:-1] + to_out, function_name)) return res def pop_stack_to_out(self): """ Helper function that pops the stack and put the item to the output, if needed :return: """ item = self.stack[-1] # fix the concept is needed if isinstance(item, SyaConceptParserHelper): # make sure the expected parameters of this item are eaten if 0 < len(item.expected) <= len(self.parameters_list): self.manage_parameters() item.fix_concept() self.stack.pop() self._put_to_out(item) def i_can_pop(self, concept_node): """ Validate the Shunting Yard Algorithm conditions to pop out from the stack Note that it's a custom implementation as I need to manage UnrecognizedTokensNode :param concept_node: :return: """ if len(self.stack) == 0: return False stack_head = self.stack[-1] if not isinstance(stack_head, SyaConceptParserHelper): # mostly left parenthesis return False current = concept_node.concept stack = stack_head.concept if stack.associativity == SyaAssociativity.No and current.associativity == SyaAssociativity.No: self._add_error(NoneAssociativeSequenceErrorNode(current.concept, stack_head.start, concept_node.start)) if current.associativity == SyaAssociativity.Left and current.precedence <= stack.precedence: return True if current.associativity == SyaAssociativity.Right and current.precedence < stack.precedence: return True return False def handle_expected_token(self, token, pos): """ True if the token is part of the concept being parsed and the last token in a sequence is eaten Example : Concept("foo a bar b").def_var("a").def_var("b") The expected tokens are 'foo' and 'bar' (as a and b are parameters) Example: Concept("foo a bar baz b").def_var("a").def_var("b") If the token is 'bar', it will be eaten but handle_expected_token() will return False as we still expect 'baz' :param token: :param pos: :return: """ def _pop_stack(c): while self.stack[-1] != c and not self._is_lpar(c): self.pop_stack_to_out() if self._is_lpar(self.stack[-1]): self._add_error(ParenthesisMismatchErrorNode(self.stack[-1])) return False # Manage concepts ending with long names if self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1].is_matched(): self.pop_stack_to_out() for current_concept in reversed(self._concepts()): # As I may loose memory again ;-) # it's a reversed loop to manage cases like # if a plus b then ... # The current concept is 'plus', but the token is 'then' # It's means that I have finished to parse the 'plus' and started the second part of the 'if' if current_concept.is_next(token): current_concept.end = pos current_concept.tokens.append(token) if current_concept.eat_token(token): _pop_stack(current_concept) return True if len(current_concept.expected) > 0 and current_concept.expected[0].type != TokenKind.VAR_DEF: if current_concept.expected[0].type == TokenKind.WHITESPACE: # drop it. It's the case where an optional whitespace is missing del (current_concept.expected[0]) else: # error # We are not parsing the concept we thought we were parsing. # Transform the eaten tokens into unrecognized # and discard the current SyaConceptParserHelper # TODO: manage the pending LPAR, RPAR ? self._transform_to_unrecognized(current_concept) self.false_positives.append(current_concept) self.stack.pop() return False if current_concept.is_expected(token): # Fix the whitespace between var and expected if needed # current_concept.expected[0] is '' # current_concept.expected[1] is what separate var from expected (normally a whitespace) if current_concept.expected[1].type == TokenKind.WHITESPACE: self.unrecognized_tokens.pop(TokenKind.WHITESPACE) current_concept.end = pos if self.debug_enabled: self.debug.append(DebugInfo(pos, token, None, "??")) self.manage_unrecognized() # manage that some clones may have been forked for forked in self.forked: forked.handle_expected_token(token, pos) # manage concepts found between tokens (of ternary for example) self.manage_parameters() if len(self.parameters_list) > len(current_concept.parameters_list_at_init): # we have eaten the parameters expected between two expected tokens # But there are some remaining parameters self._add_error(TooManyParametersFound( current_concept.concept.concept, current_concept.start, token, self.parameters_list[:])) return True # no need to continue while self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1] != current_concept: current = self.stack[-1] if current.error: self._transform_to_unrecognized(current) self.false_positives.append(current) self.stack.pop() if current_concept.expected[1].type == TokenKind.WHITESPACE: self.unrecognized_tokens.pop(TokenKind.WHITESPACE) self.manage_unrecognized() # manage that some clones may have been forked for forked in self.forked: forked.handle_expected_token(token, pos) else: self.pop_stack_to_out() self.manage_parameters() # maybe eat whitespace that was between and expected token if current_concept.expected[0].type == TokenKind.WHITESPACE: del current_concept.expected[0] if current_concept.eat_token(token): _pop_stack(current_concept) return True return False def eat_token(self, token, pos): """ Receive at token. It will be processed if it's expected by a concept or if it's a parenthesis :param token: :param pos: :return: """ if self.is_locked: return if self.parsing_function: if self.debug_enabled: self.debug.append(DebugInfo(pos, token, None, DEBUG_PUSH_UNREC)) self.unrecognized_tokens.add_token(token, pos) if self.unrecognized_tokens.parenthesis_count == 0: self.unrecognized_tokens.fix_source() res = parse_function(self.context, self.unrecognized_tokens.source, self.unrecognized_tokens.tokens[:], self.unrecognized_tokens.start) instances = get_n_clones(self, len(res)) self.forked.extend(instances[1:]) for instance, res_i in zip(instances, res): if res_i.status or instance.context.sheerka.isinstance(res_i.body, BuiltinConcepts.PARSER_RESULT): # 1. we manage to recognize a function # 2. we almost manage, ex func(one two). It's not a function but almost instance._put_to_out(res_i.body.body) instance.unrecognized_tokens.reset() else: # it is not a function, try to recognized the token # This situation is unlikely to occur instance.manage_unrecognized() instance.parsing_function = False return True if self.handle_expected_token(token, pos): # a token is found, let's check if it's part of a concepts being parsed # example Concept(name="foo", definition="foo a bar b").def_var("a").def_var("b") # if the token 'bar' is found, it has to be considered as part of the concept foo if self.debug_enabled: self._remove_debug_info_if_needed() self.debug.append(DebugInfo(pos, token, None, DEBUG_EAT)) return True elif self._is_lpar(token): if self.debug_enabled: self.debug.append(DebugInfo(pos, token, None, DEBUG_PUSH_UNREC)) if self.unrecognized_tokens.is_empty() or self.unrecognized_tokens.is_whitespace(): # first, remove what was in the buffer self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.eat_token(token, pos) self.stack.append((token, pos)) else: # the parenthesis is part of the unrecognized # So it's maybe a function call list_of_results = self.get_functions_names_from_unrecognized(token, pos) instances = [self] for i in range(len(list_of_results) - 1): clone = self.clone() self.forked.append(clone) instances.append(clone) # Manage the result for self and its clones for instance, parsing_res in zip(instances, list_of_results): for to_out in parsing_res.to_out: instance._put_to_out(to_out) if parsing_res.function: instance.unrecognized_tokens = parsing_res.function instance.parsing_function = True else: # special case of "twenty two(". It's not considered as a function # The manage_unrecognized() what somewhat done by get_functions_names_from_unrecognized() # So we just put the unrecognized to out instance.unrecognized_tokens.reset() # make sure to pop the current concept if self._stack_isinstance(SyaConceptParserHelper): self.pop_stack_to_out() instance.stack.append((token, pos)) # # instance._put_to_out(")") # mark where the function should end # # instance.stack.append(parsing_res.function) # # instance.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) # reset unrecognized # else: # # handle when there are multiple pending tokens # if len(self.unrecognized_tokens.tokens) > 1: # unrecognized = UnrecognizedTokensNode(self.unrecognized_tokens.start, # pos - 2, # self.unrecognized_tokens.tokens[:-1]) # unrecognized.fix_source() # self._put_to_out(unrecognized) # last_token = self.unrecognized_tokens.tokens[-1] # self.unrecognized_tokens.reset() # self.unrecognized_tokens.add_token(last_token, pos - 1) # # self.eat_unrecognized(token, pos) # add the '(' to the rest of the unknown # self.parsing_function = True # # self.stack.append(self.unrecognized_tokens.fix_source()) # # self.unrecognized_tokens = UnrecognizedTokensNode(-1, -1, []) return True elif self._is_rpar(token): if self.debug_enabled: self.debug.append(DebugInfo(pos, token, None, DEBUG_EAT)) # first, remove what was in the buffer self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.eat_token(token, pos) # pop everything but the lpar from stack to 'out' while len(self.stack) > 0 and not self._is_lpar(self.stack[-1]): self.pop_stack_to_out() # checks consistency if an lpar is found if len(self.stack) == 0: self._add_error(ParenthesisMismatchErrorNode((token, pos))) return None if self._stack_isinstance(UnrecognizedTokensNode): # the parenthesis was a function # we need to return a SourceCodeWithConceptNode for i in range(len(self.out) - 1, -1, -1): if self.out[i] == ')': start = i break else: self._add_error(ParenthesisMismatchErrorNode((token, pos))) return None source_code = self._make_source_code_with_concept(start, token, pos) for item in self.out[start:]: # update the parameter list try: self.parameters_list.remove(item) except ValueError: pass del self.out[start:] self._put_to_out(source_code) # self.pop_stack_to_out() # # Replace the ')' marker by its real position # for i in range(len(self.out) - 1, -1, -1): # if self.out[i] == ')': # self.out[i] = UnrecognizedTokensNode(pos, pos, [token]) else: self.stack.pop() # discard the lpar return True return False def eat_concept(self, sya_concept_def, token, pos, first_pass=True): """ a concept is found :param sya_concept_def: :param token: :param pos: :param first_pass: When not called from a fork after manage_unrecognized() :return: """ if self.is_locked: return parser_helper = SyaConceptParserHelper(sya_concept_def, pos) if first_pass: if self.debug_enabled: self.debug.append(DebugInfo(pos, token, sya_concept_def, "??")) if self.unrecognized_tokens.last_token_type() == TokenKind.WHITESPACE: parser_helper.remember_whitespace = self.unrecognized_tokens.tokens[-1] if Token.is_whitespace(parser_helper.last_token_before_first_token): self.unrecognized_tokens.pop(TokenKind.WHITESPACE) # First, try to recognize the tokens that are waiting self.manage_unrecognized() for forked in self.forked: # manage the fact that some clone may have been forked forked.eat_concept(sya_concept_def, token, pos, first_pass=False) # then, check if this new concept is linked to the previous ones # ie, is the previous concept fully matched ? if parser_helper.expected_parameters_before_first_token == 0: # => does not expect pending parameter (it's suffixed concept) while self._stack_isinstance(SyaConceptParserHelper) and self.stack[-1].potential_pos != -1: # => previous seems to have everything it needs in the parameter list self.pop_stack_to_out() if parser_helper.is_atom(): self._put_to_out(parser_helper.fix_concept()) else: # call shunting yard algorithm while self.i_can_pop(parser_helper): self.pop_stack_to_out() if parser_helper.is_matched(): # case of a prefix concept which has found happiness with self.parameters_list # directly put it in out self.manage_parameters_when_new_concept(parser_helper) self._put_to_out(parser_helper.fix_concept()) else: if self.debug_enabled: self._remove_debug_info_if_needed() self.debug.append(DebugInfo(pos, token, sya_concept_def, DEBUG_PUSH)) self.stack.append(parser_helper) self.manage_parameters_when_new_concept(parser_helper) def eat_unrecognized(self, token, pos): """ The token was not recognized, add to the current UnrecognizedTokensNode :param token: :param pos: :return: """ if self.is_locked: return if self.debug_enabled: self.debug.append(DebugInfo(pos, token, None, DEBUG_PUSH_UNREC)) self.unrecognized_tokens.add_token(token, pos) def finalize(self, pos): """ Put the remaining items from the stack to out :return: """ if self.is_locked: return if len(self.stack) == 0 and len(self.out) == 0: # check for parenthesis mismatch if self.unrecognized_tokens.parenthesis_count > 0: self._add_error(ParenthesisMismatchErrorNode(self.unrecognized_tokens)) return # no need to pop the buffer, as no concept is found if self.debug_enabled: self.debug.append(DebugInfo(pos, "", None, "??")) while len(self.stack) > 0: parser_helper = self.stack[-1] # validate parenthesis if self._is_lpar(parser_helper) or self._is_rpar(parser_helper): self._add_error(ParenthesisMismatchErrorNode(parser_helper)) return None self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize(pos) failed_to_match = sum(map(lambda e: e.type != TokenKind.VAR_DEF, parser_helper.expected)) if failed_to_match > 0: # didn't manage to read all tokens. # Transform them into unrecognized self._transform_to_unrecognized(parser_helper) self.false_positives.append(parser_helper) self.stack.pop() # discard the parser helper else: self.pop_stack_to_out() # process it self.manage_unrecognized() for forked in self.forked: # manage that some clones may have been forked forked.finalize(pos) def clone(self): clone = InFixToPostFix(self.context, self.debug_enabled) clone.is_locked = self.is_locked clone.out = self.out[:] clone.stack = [i.clone() if hasattr(i, "clone") else i for i in self.stack] clone.unrecognized_tokens = self.unrecognized_tokens.clone() clone.parameters_list = self.parameters_list[:] clone.errors = self.errors[:] clone.debug = self.debug[:] # clone.forked = self.forked return clone @dataclass() class PostFixToItem: concept: Concept start: int end: int has_unrecognized: bool class SyaNodeParser(BaseNodeParser): def __init__(self, **kwargs): super().__init__("SyaNode", 50, **kwargs) if 'sheerka' in kwargs: sheerka = kwargs.get("sheerka") self.sya_definitions = sheerka.resolved_sya_def else: self.concepts_by_first_keyword = {} self.sya_definitions = {} # self.token = None # self.pos = -1 # self.tokens = None # # self.context: ExecutionContext = None # self.text = None # self.sheerka = None def init_from_concepts(self, context, concepts, **kwargs): super().init_from_concepts(context, concepts) sya_definitions = kwargs.get("sya", None) if sya_definitions: self.sya_definitions = sya_definitions @staticmethod def _is_eligible(concept): """ Predicate that select concepts that must handled by AtomNodeParser :param concept: :return: """ # We only concepts that has parameter (refuse atoms) # Bnf definitions are not supposed to be managed by this parser either return len(concept.metadata.variables) > 0 and concept.metadata.definition_type != DEFINITION_TYPE_BNF @staticmethod def _get_sya_concept_def(parser, concept): sya_concept_def = SyaConceptDef(concept) if concept.id in parser.sya_definitions: # Manage when precedence and associativity are given in the unit tests sya_def = parser.sya_definitions.get(concept.id) if sya_def[0] is not None: sya_concept_def.precedence = sya_def[0] if sya_def[1] is not None: sya_concept_def.associativity = sya_def[1] if parser.sheerka: concept_weight = parser.sheerka.get_concepts_weights(BuiltinConcepts.PRECEDENCE) if concept.id in concept_weight: sya_concept_def.precedence = concept_weight[concept.id] if associativity := concept.get_prop(BuiltinConcepts.ASSOCIATIVITY): sya_concept_def.associativity = SyaAssociativity(associativity) return sya_concept_def def infix_to_postfix(self, context, parser_input: ParserInput): """ Implementing Shunting Yard Algorithm :param context: :param parser_input: :return: """ if not self.reset_parser(context, parser_input): return None forked = [] def _add_forked_to_res(): # check that if some new InfixToPostfix are created for in_to_post in res: if len(in_to_post.forked) > 0: forked.extend(in_to_post.forked) in_to_post.forked.clear() if len(forked) > 0: res.extend(forked) forked.clear() res = [InFixToPostFix(context, context.in_context(BuiltinConcepts.DEBUG))] while self.parser_input.next_token(False): for infix_to_postfix in res: infix_to_postfix.reset() token = self.parser_input.token try: if token.type in (TokenKind.LPAR, TokenKind.RPAR): # little optim, no need to lock, unlock or get the concept when parenthesis for infix_to_postfix in res: infix_to_postfix.eat_token(token, self.parser_input.pos) continue for infix_to_postfix in res: if infix_to_postfix.eat_token(token, self.parser_input.pos): infix_to_postfix.lock() concepts = self.get_concepts(token, self._is_eligible, to_map=self._get_sya_concept_def) if not concepts: for infix_to_postfix in res: infix_to_postfix.eat_unrecognized(token, self.parser_input.pos) continue if len(concepts) == 1: for infix_to_postfix in res: infix_to_postfix.eat_concept(concepts[0], token, self.parser_input.pos) continue # make the cartesian product temp_res = [] for infix_to_postfix in res: for concept in concepts: clone = infix_to_postfix.clone() temp_res.append(clone) clone.eat_concept(concept, token, self.parser_input.pos) res = temp_res finally: _add_forked_to_res() # make sure that remaining items in stack are moved to out for infix_to_postfix in res: infix_to_postfix.reset() infix_to_postfix.finalize(self.parser_input.pos) _add_forked_to_res() if context.in_context(BuiltinConcepts.DEBUG): context.debug(f"Parsing {parser_input}") context.debug(f"{len(res)} InfixToPostFix(s) found") for i, r in enumerate(res): context.debug(f"#{i}") for line in r.debug: context.debug(line) return res def postfix_to_item(self, sheerka, postfixed): item = postfixed.pop() if isinstance(item, (UnrecognizedTokensNode, SourceCodeNode, ConceptNode)): return item if isinstance(item, SourceCodeWithConceptNode): items = [] while len(item.nodes) > 0: res = self.postfix_to_item(sheerka, item.nodes) if isinstance(res, PostFixToItem): items.append( ConceptNode(res.concept, res.start, res.end, self.parser_input.tokens[res.start: res.end + 1])) else: items.append(res) item.has_unrecognized |= hasattr(res, "has_unrecognized") and res.has_unrecognized or \ isinstance(res, UnrecognizedTokensNode) item.nodes = items item.fix_all_pos() item.tokens = self.parser_input.tokens[item.start:item.end + 1] item.fix_source(True) return item # ParserHelper start = item.start end = item.end has_unrecognized = False concept = sheerka.new_from_template(item.concept, item.concept.key) for param_index in reversed(range(len(concept.metadata.variables))): inner_item = self.postfix_to_item(sheerka, postfixed) if inner_item.start < start: start = inner_item.start if inner_item.end > end: end = inner_item.end has_unrecognized |= isinstance(inner_item, (UnrecognizedTokensNode, SourceCodeWithConceptNode)) or \ hasattr(inner_item, "has_unrecognized") and inner_item.has_unrecognized param_name = concept.metadata.variables[param_index][0] param_value = inner_item.concept if hasattr(inner_item, "concept") else \ [inner_item.return_value] if isinstance(inner_item, SourceCodeNode) else \ inner_item concept.compiled[param_name] = param_value return PostFixToItem(concept, start, end, has_unrecognized) def parse(self, context, parser_input: ParserInput): """ :param context: :param parser_input: :return: """ if not isinstance(parser_input, ParserInput): return None if parser_input.is_empty(): return context.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.IS_EMPTY) ) ret = [] valid_infix_to_postfixs = self.get_valid(self.infix_to_postfix(context, parser_input)) if valid_infix_to_postfixs is None: # token error return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) if len(valid_infix_to_postfixs) == 0: return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input.as_text())) for infix_to_postfix in valid_infix_to_postfixs: sequence = [] has_unrecognized = False while len(infix_to_postfix.out) > 0: item = self.postfix_to_item(context.sheerka, infix_to_postfix.out) has_unrecognized |= hasattr(item, "has_unrecognized") and item.has_unrecognized or \ isinstance(item, UnrecognizedTokensNode) if isinstance(item, PostFixToItem): to_insert = ConceptNode(item.concept, item.start, item.end, self.parser_input.tokens[item.start: item.end + 1]) else: to_insert = item sequence.insert(0, to_insert) if has_unrecognized: # Manage some sick cases where missing parenthesis mess the order or the sequence # example "foo bar(one plus two" # too lazy to fix the why... sequence.sort(key=attrgetter("start")) ret.append( self.sheerka.ret( self.name, not has_unrecognized, self.sheerka.new( BuiltinConcepts.PARSER_RESULT, parser=self, source=parser_input, body=sequence, try_parsed=sequence))) if len(ret) == 1: self.log_result(context, parser_input, ret[0]) return ret[0] else: self.log_multiple_results(context, parser_input, ret) return ret @staticmethod def get_valid(infix_to_postfixs): """ Gets the valid infixToPostfix :param infix_to_postfixs: :return: """ def _has_sya(items): for item in items: if isinstance(item, SourceCodeWithConceptNode): if _has_sya(item.nodes): return True if isinstance(item, SyaConceptParserHelper): return True return False if infix_to_postfixs is None: return None result = [] for infix_to_postfix in infix_to_postfixs: if len(infix_to_postfix.get_errors()) > 0: continue if len(infix_to_postfix.out) == 0: continue if infix_to_postfix in result: continue if not _has_sya(infix_to_postfix.out): # refuses the result if it does not involve SYA continue result.append(infix_to_postfix) return result # @staticmethod # def init_sheerka(self, sheerka): # if hasattr(BaseNodeParser, "init_sheerka"): # BaseNodeParser.init_sheerka(sheerka) # # # init syadefinitins