Files
Sheerka-Old/src/parsers/LogicalOperatorParser.py
T

250 lines
11 KiB
Python

from itertools import product
from core.builtin_helpers import only_successful, get_inner_body, get_lexer_nodes_using_positions
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka.services.sheerka_service import FailedToCompileError
from core.tokenizer import TokenKind, Tokenizer, Keywords
from core.utils import get_text_from_tokens
from parsers.BaseExpressionParser import ParenthesisNode, OrNode, AndNode, NotNode, ExprNode, VariableNode, \
ComparisonNode, BaseExpressionParser
from parsers.BaseNodeParser import UnrecognizedTokensNode
from parsers.BaseParser import UnexpectedEofParsingError, ErrorSink
from parsers.PythonWithConceptsParser import PythonWithConceptsParser
from sheerkarete.common import V
from sheerkarete.conditions import Condition, AndConditions
class ReteConditionsEmitter:
def __init__(self, context):
from parsers.RelationalOperatorParser import RelationalOperatorParser
self.context = context
self.comparison_parser = RelationalOperatorParser()
self.var_counter = 0
self.variables = {}
def add_variable(self, target):
var_name = f"__x_{self.var_counter:02}__"
self.var_counter += 1
self.variables[target] = var_name
return var_name
def init_variable_if_needed(self, node, res):
if node.name not in self.variables:
var_name = self.add_variable(node.name)
res.append(Condition(V(var_name), "__name__", node.name))
return V(self.variables[node.name])
def get_conditions(self, expr_nodes):
conditions = []
for expr_node in expr_nodes:
error_sink = ErrorSink()
parser_input = ParserInput(None, tokens=expr_node.tokens).reset()
parser_input.next_token()
parsed = self.comparison_parser.parse_input(self.context, parser_input, error_sink)
if error_sink.has_error:
raise FailedToCompileError(error_sink.sink)
if isinstance(parsed, VariableNode):
var_name = self.init_variable_if_needed(parsed, conditions)
if parsed.attributes_str is not None:
conditions.append(Condition(var_name, parsed.attributes_str, True))
elif isinstance(parsed, ComparisonNode):
if isinstance(parsed.left, VariableNode):
left = self.init_variable_if_needed(parsed.left, conditions)
attr = parsed.left.attributes_str or "__self__"
right = eval(get_text_from_tokens(parsed.right.tokens))
conditions.append(Condition(left, attr, right))
return [AndConditions(conditions)]
class LogicalOperatorParser(BaseExpressionParser):
"""
will parser logic expression
like not (a and b or c)
The nodes can be used for custom filtering (ex with ExplanationConcept)
Or to help to understand why a python expression returns True or False
"""
NAME = "LogicalOperator"
def __init__(self, **kwargs):
super().__init__(self.NAME, 50, False, yield_eof=True)
self.and_tokens = list(Tokenizer(" and ", yield_eof=False))
self.and_not_tokens = list(Tokenizer(" and not ", yield_eof=False))
self.not_tokens = list(Tokenizer("not ", yield_eof=False))
self.expr_parser = kwargs.get("expr_parser", None)
@staticmethod
def clean_parenthesis_nodes(nodes):
for i, node in enumerate(nodes):
if isinstance(node, ParenthesisNode):
nodes[i] = node.node
def parse_input(self, context, parser_input, error_sink):
return self.parse_or(context, parser_input, error_sink)
def parse_or(self, context, parser_input, error_sink):
start = parser_input.pos
expr = self.parse_and(context, parser_input, error_sink)
token = parser_input.token
if token.type != TokenKind.IDENTIFIER or token.value != "or":
return expr
parts = [expr]
while token.type == TokenKind.IDENTIFIER and token.value == "or":
parser_input.next_token()
expr = self.parse_and(context, parser_input, error_sink)
if expr is None:
error_sink.add_error(UnexpectedEofParsingError("When parsing 'or'"))
end = parser_input.pos
self.clean_parenthesis_nodes(parts)
return OrNode(start, end, parser_input.tokens[start: end + 1], *parts)
parts.append(expr)
token = parser_input.token
end = parts[-1].end
self.clean_parenthesis_nodes(parts)
return OrNode(start, end, parser_input.tokens[start: end + 1], *parts)
def parse_and(self, context, parser_input, error_sink):
start = parser_input.pos
expr = self.parse_not(context, parser_input, error_sink)
token = parser_input.token
if token.type != TokenKind.IDENTIFIER or token.value != "and":
return expr
parts = [expr]
while token.type == TokenKind.IDENTIFIER and token.value == "and":
parser_input.next_token()
expr = self.parse_not(context, parser_input, error_sink)
if expr is None:
error_sink.add_error(UnexpectedEofParsingError("When parsing 'and'"))
end = parser_input.pos
self.clean_parenthesis_nodes(parts)
return AndNode(start, end, parser_input.tokens[start: end + 1], *parts)
parts.append(expr)
token = parser_input.token
end = parts[-1].end
self.clean_parenthesis_nodes(parts)
return AndNode(start, end, parser_input.tokens[start: end + 1], *parts)
def parse_not(self, context, parser_input, error_sink):
token = parser_input.token
start = parser_input.pos
if (token.type == TokenKind.IDENTIFIER and token.value == "not" and
parser_input.the_token_after(True).value != "in"):
parser_input.next_token()
parsed = self.parse_not(context, parser_input, error_sink)
node = parsed.node if isinstance(parsed, ParenthesisNode) else parsed
return NotNode(start,
parsed.end,
parser_input.tokens[start: parsed.end + 1],
node)
else:
return self.parse_tokens(context, parser_input, error_sink)
def parse_tokens_stop_condition(self, token, parser_input):
return token.type == TokenKind.IDENTIFIER and token.value in ("and", "or") or \
token.value == "not" and parser_input.the_token_after(True).value != "in"
def compile_conjunctions(self, context, conjunctions, who):
"""
Transform a list of conjunctions (AND and OR) into one or multiple CompiledExpr
:param context:
:param conjunctions: list of ExprNode
:param who: service that calls the method
:returns: List Of CompiledExpr
May throw FailedToRecognized if a conjunction cannot be parsed
"""
recognized = []
for conjunction in conjunctions:
# try to recognize conjunction, one by one
# negative conjunction can be a concept starting with 'not'
parsed_ret = context.sheerka.parse_unrecognized(
context,
conjunction.get_value(), # we remove the 'NOT' part when needed to ease the recognition
parsers="all",
who=who,
prop=Keywords.WHEN,
filter_func=only_successful)
if parsed_ret.status:
recognized.append(get_inner_body(context, parsed_ret.body))
else:
raise FailedToCompileError(parsed_ret.body)
# for each conjunction, we have a list of recognized concepts (or python node)
# we need a cartesian product of the results
# Explanation for later
# conjunction[0] : 'x is a y' that can be resolved with two concepts c:|1001: and c:|1002:
# conjunction[1] : 'y is an z' that can also be resolved with two concepts (c:|1003: and c:|1004)
# so to understand the full question 'x is a y and y is an z'
# we can have c:|1001: then c:|1003:
# or c:|1001: then c:|1004:
# or c:|1002: then c:|1003:
# or c:|1002: then c:|1004:
# if one of this combination works, it means that the question 'x is a y and y is an z' was matched
# hence the cartesian product
product_of_recognized = list(product(*recognized))
return_values = []
for recognized_conjunctions in product_of_recognized:
if len(recognized_conjunctions) == 1 and not isinstance(conjunctions[0], NotNode):
return_values.append(recognized_conjunctions[0])
elif len(recognized_conjunctions) == 1 and recognized_conjunctions[0].who == "parsers.Python":
# it is a negated python Node. Need to parse again
ret = context.sheerka.parse_python(context, source=str(conjunctions[0]))
if ret.status:
return_values.append(ret)
else:
# find a way to track the failure
pass
else:
# complex result. Use PythonWithNode
lexer_nodes = get_lexer_nodes_using_positions(recognized_conjunctions,
self._get_positions(conjunctions))
# put back the 'and' / 'not' node
for i in range(len(lexer_nodes) - 1, 0, -1):
end = lexer_nodes[i].start - 1
start = lexer_nodes[i - 1].end + 1
if isinstance(conjunctions[i], NotNode):
lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_not_tokens))
else:
lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_tokens))
# add the starting 'not' if needed
# and reindex the following positions
if isinstance(conjunctions[0], NotNode):
lexer_nodes[0].start = 2
lexer_nodes.insert(0, UnrecognizedTokensNode(0, 1, self.not_tokens))
python_with_concept_node_ret = PythonWithConceptsParser().parse_nodes(context, lexer_nodes)
if not python_with_concept_node_ret.status:
# find a way to track the failure
pass
return_values.append(python_with_concept_node_ret)
rete_cond_emitter = ReteConditionsEmitter(context)
rete_disjunctions = rete_cond_emitter.get_conditions(conjunctions)
return return_values, rete_disjunctions
@staticmethod
def _get_positions(expr_nodes):
"""
simply manage NotNodes to address the fact that the 'not' part in removed
"""
for expr in expr_nodes:
if isinstance(expr, NotNode):
yield ExprNode(expr.start + 2, expr.end, expr.tokens[2:])
else:
yield expr