from itertools import product from core.builtin_concepts import BuiltinConcepts from core.builtin_helpers import only_successful, get_inner_body, get_lexer_nodes_using_positions from core.sheerka.services.SheerkaExecute import ParserInput from core.sheerka.services.sheerka_service import FailedToCompileError from core.tokenizer import TokenKind, Tokenizer, Keywords from core.utils import get_text_from_tokens from parsers.BaseNodeParser import UnrecognizedTokensNode from parsers.BaseParser import BaseParser, UnexpectedTokenParsingError, UnexpectedEofParsingError, BaseExprParser from parsers.PythonWithConceptsParser import PythonWithConceptsParser from parsers.expressions import ParenthesisNode, OrNode, AndNode, NotNode, LeftPartNotFoundError, \ ParenthesisMismatchError, NameExprNode, ExprNode, VariableNode, ComparisonNode from sheerkarete.common import V from sheerkarete.conditions import Condition, AndConditions class ReteConditionsEmitter: def __init__(self, context): from parsers.RelationalOperatorParser import RelationalOperatorParser self.context = context self.comparison_parser = RelationalOperatorParser() self.var_counter = 0 self.variables = {} def add_variable(self, target): var_name = f"__x_{self.var_counter:02}__" self.var_counter += 1 self.variables[target] = var_name return var_name def init_variable_if_needed(self, node, res): if node.name not in self.variables: var_name = self.add_variable(node.name) res.append(Condition(V(var_name), "__name__", node.name)) return V(self.variables[node.name]) def get_conditions(self, expr_nodes): conditions = [] for expr_node in expr_nodes: parsed_ret = self.comparison_parser.parse(self.context, expr_node.tokens) if not parsed_ret.status: raise FailedToCompileError(parsed_ret.body) tree = parsed_ret.body.body if isinstance(tree, VariableNode): var_name = self.init_variable_if_needed(tree, conditions) if tree.attributes_str is not None: conditions.append(Condition(var_name, tree.attributes_str, True)) elif isinstance(tree, ComparisonNode): if isinstance(tree.left, VariableNode): left = self.init_variable_if_needed(tree.left, conditions) attr = tree.left.attributes_str or "__self__" right = eval(get_text_from_tokens(tree.right.tokens)) conditions.append(Condition(left, attr, right)) return [AndConditions(conditions)] class LogicalOperatorParser(BaseExprParser): """ will parser logic expression like not (a and b or c) The nodes can be used for custom filtering (ex with ExplanationConcept) Or to help to understand why a python expression returns True or False """ NAME = "LogicalOperator" def __init__(self, **kwargs): super().__init__(self.NAME, 50, False, yield_eof=True) self.and_tokens = list(Tokenizer(" and ", yield_eof=False)) self.and_not_tokens = list(Tokenizer(" and not ", yield_eof=False)) self.not_tokens = list(Tokenizer("not ", yield_eof=False)) @staticmethod def clean_parenthesis_nodes(nodes): for i, node in enumerate(nodes): if isinstance(node, ParenthesisNode): nodes[i] = node.node def parse(self, context, parser_input: ParserInput): """ :param context: :param parser_input: :return: """ if not isinstance(parser_input, ParserInput): return None context.log(f"Parsing '{parser_input}' with {self.NAME}Parser", self.name) sheerka = context.sheerka if parser_input.is_empty(): return context.sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.IS_EMPTY)) if not self.reset_parser(context, parser_input): return self.sheerka.ret( self.name, False, context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink)) self.parser_input.next_token() tree = self.parse_input() token = self.parser_input.token if token and token.type != TokenKind.EOF: self.add_error(UnexpectedTokenParsingError(f"Unexpected token '{token}'", token, [])) if isinstance(tree, ParenthesisNode): tree = tree.node value = self.get_return_value_body(context.sheerka, self.parser_input.as_text(), tree, tree) ret = self.sheerka.ret( self.name, not self.has_error, value) return ret def parse_input(self): return self.parse_or() def parse_or(self): start = self.parser_input.pos expr = self.parse_and() token = self.parser_input.token if token.type != TokenKind.IDENTIFIER or token.value != "or": return expr parts = [expr] while token.type == TokenKind.IDENTIFIER and token.value == "or": self.parser_input.next_token() expr = self.parse_and() if expr is None: self.add_error(UnexpectedEofParsingError("When parsing 'or'")) end = self.parser_input.pos self.clean_parenthesis_nodes(parts) return OrNode(start, end, self.parser_input.tokens[start: end + 1], *parts) parts.append(expr) token = self.parser_input.token end = parts[-1].end self.clean_parenthesis_nodes(parts) return OrNode(start, end, self.parser_input.tokens[start: end + 1], *parts) def parse_and(self): start = self.parser_input.pos expr = self.parse_not() token = self.parser_input.token if token.type != TokenKind.IDENTIFIER or token.value != "and": return expr parts = [expr] while token.type == TokenKind.IDENTIFIER and token.value == "and": self.parser_input.next_token() expr = self.parse_not() if expr is None: self.add_error(UnexpectedEofParsingError("When parsing 'and'")) end = self.parser_input.pos self.clean_parenthesis_nodes(parts) return AndNode(start, end, self.parser_input.tokens[start: end + 1], *parts) parts.append(expr) token = self.parser_input.token end = parts[-1].end self.clean_parenthesis_nodes(parts) return AndNode(start, end, self.parser_input.tokens[start: end + 1], *parts) def parse_not(self): token = self.parser_input.token start = self.parser_input.pos if token.type == TokenKind.IDENTIFIER and token.value == "not": self.parser_input.next_token() parsed = self.parse_not() node = parsed.node if isinstance(parsed, ParenthesisNode) else parsed return NotNode(start, parsed.end, self.parser_input.tokens[start: parsed.end + 1], node) else: return self.parse_names() def parse_names(self): def stop(): return token.type == TokenKind.EOF or \ paren_count == 0 and token.type == TokenKind.RPAR or \ token.type == TokenKind.IDENTIFIER and token.value in ("and", "or", "not") token = self.parser_input.token if token.type == TokenKind.EOF: return None if token.type == TokenKind.LPAR: start = self.parser_input.pos self.parser_input.next_token() expr = self.parse_or() token = self.parser_input.token if token.type != TokenKind.RPAR: self.error_sink.append( UnexpectedTokenParsingError(f"Unexpected token '{token}'", token, [TokenKind.RPAR])) return expr end = self.parser_input.pos self.parser_input.next_token() return ParenthesisNode(start, end, None, expr) buffer = [] paren_count = 0 last_paren = None start = self.parser_input.pos while not stop(): buffer.append(token) if token.type == TokenKind.LPAR: last_paren = token paren_count += 1 if token.type == TokenKind.RPAR: paren_count -= 1 self.parser_input.next_token(False) token = self.parser_input.token if len(buffer) == 0: if token.type != TokenKind.RPAR: self.error_sink.append(LeftPartNotFoundError()) return None if paren_count != 0: self.error_sink.append(ParenthesisMismatchError(last_paren)) return None if buffer[-1].type == TokenKind.WHITESPACE: buffer.pop() end = start + len(buffer) - 1 return NameExprNode(start, end, buffer) def compile_conjunctions(self, context, conjunctions, who): """ Transform a list of conjunctions (AND and OR) into one or multiple CompiledExpr :param context: :param conjunctions: list of ExprNode :param who: service that calls the method :returns: List Of CompiledExpr May throw FailedToRecognized if a conjunction cannot be parsed """ recognized = [] for conjunction in conjunctions: # try to recognize conjunction, one by one # negative conjunction can be a concept starting with 'not' parsed_ret = context.sheerka.parse_unrecognized( context, conjunction.get_value(), # we remove the 'NOT' part when needed to ease the recognition parsers="all", who=who, prop=Keywords.WHEN, filter_func=only_successful) if parsed_ret.status: recognized.append(get_inner_body(context, parsed_ret.body)) else: raise FailedToCompileError(parsed_ret.body) # for each conjunction, we have a list of recognized concepts (or python node) # we need a cartesian product of the results # Explanation for later # conjunction[0] : 'x is a y' that can be resolved with two concepts c:|1001: and c:|1002: # conjunction[1] : 'y is an z' that can also be resolved with two concepts (c:|1003: and c:|1004) # so to understand the full question 'x is a y and y is an z' # we can have c:|1001: then c:|1003: # or c:|1001: then c:|1004: # or c:|1002: then c:|1003: # or c:|1002: then c:|1004: # if one of this combination works, it means that the question 'x is a y and y is an z' was matched # hence the cartesian product product_of_recognized = list(product(*recognized)) return_values = [] for recognized_conjunctions in product_of_recognized: if len(recognized_conjunctions) == 1 and not isinstance(conjunctions[0], NotNode): return_values.append(recognized_conjunctions[0]) elif len(recognized_conjunctions) == 1 and recognized_conjunctions[0].who == "parsers.Python": # it is a negated python Node. Need to parse again ret = context.sheerka.parse_python(context, source=str(conjunctions[0])) if ret.status: return_values.append(ret) else: # find a way to track the failure pass else: # complex result. Use PythonWithNode lexer_nodes = get_lexer_nodes_using_positions(recognized_conjunctions, self._get_positions(conjunctions)) # put back the 'and' / 'not' node for i in range(len(lexer_nodes) - 1, 0, -1): end = lexer_nodes[i].start - 1 start = lexer_nodes[i - 1].end + 1 if isinstance(conjunctions[i], NotNode): lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_not_tokens)) else: lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_tokens)) # add the starting 'not' if needed # and reindex the following positions if isinstance(conjunctions[0], NotNode): lexer_nodes[0].start = 2 lexer_nodes.insert(0, UnrecognizedTokensNode(0, 1, self.not_tokens)) python_with_concept_node_ret = PythonWithConceptsParser().parse_nodes(context, lexer_nodes) if not python_with_concept_node_ret.status: # find a way to track the failure pass return_values.append(python_with_concept_node_ret) rete_cond_emitter = ReteConditionsEmitter(context) rete_disjunctions = rete_cond_emitter.get_conditions(conjunctions) return return_values, rete_disjunctions @staticmethod def _get_positions(expr_nodes): """ simply manage NotNodes to address the fact that the 'not' part in removed """ for expr in expr_nodes: if isinstance(expr, NotNode): yield ExprNode(expr.start + 2, expr.end, expr.tokens[2:]) else: yield expr