Implemented a first and basic version of a Rete rule engine

This commit is contained in:
2021-02-09 16:06:32 +01:00
parent 821dbed189
commit a2a8d5c5e5
110 changed files with 7301 additions and 1654 deletions
+32
View File
@@ -62,6 +62,38 @@ class KeywordNotFound(CustomGrammarParserNode, ParsingError):
return hash(self.keywords)
@dataclass()
class NameNode(CustomGrammarParserNode):
def get_name(self):
name = ""
first = True
for token in self.tokens:
if token.type == TokenKind.EOF:
break
if token.type == TokenKind.WHITESPACE:
continue
if not first:
name += " "
name += token.value[1:-1] if token.type == TokenKind.STRING else str(token.value)
first = False
return name
def __repr__(self):
return self.get_name()
def __eq__(self, other):
if not isinstance(other, NameNode):
return False
return self.get_name() == other.get_name()
def __hash__(self):
return hash(self.get_name())
class BaseCustomGrammarParser(BaseParser):
"""
Base class for sheerka specific grammars
+15
View File
@@ -42,6 +42,9 @@ class LexerNode(Node):
def to_short_str(self):
raise NotImplementedError
def get_source_to_parse(self):
return self.source
class UnrecognizedTokensNode(LexerNode):
def __init__(self, start, end, tokens):
@@ -296,6 +299,12 @@ class SourceCodeNode(LexerNode):
def to_short_str(self):
return f"SCN('{self.source}')"
def get_python_node(self):
return self.python_node
def get_source_to_parse(self):
return self.python_node.source
class SourceCodeWithConceptNode(LexerNode):
"""
@@ -409,6 +418,12 @@ class SourceCodeWithConceptNode(LexerNode):
def to_short_str(self):
return f"SCWC({self.first}" + ", ".join(n.to_short_str for n in self.nodes) + f"{self.last})"
def get_python_node(self):
return self.python_node
def get_source_to_parse(self):
return self.python_node.source
@dataclass()
class GrammarErrorNode(ParsingError):
+2 -41
View File
@@ -1,33 +1,12 @@
import logging
from dataclasses import dataclass
from typing import Union
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
from core.concept import Concept
from core.error import ErrorObj
from core.global_symbols import ErrorObj
from core.sheerka.ExecutionContext import ExecutionContext
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka_logger import get_logger
from core.tokenizer import TokenKind, Token, Tokenizer, LexerError
# # keep a cache for the parser input
# pi_cache = Cache(default=lambda key: ParserInput(key), max_size=20)
#
#
# def get_parser_input(text, tokens=None, length=None):
# """
# Returns new or existing parser input
# :param text:
# :param tokens:
# :param length:
# :return:
# """
# if tokens is None or pi_cache.has(text):
# return pi_cache.get(text)
# pi = ParserInput(text, tokens, length)
# pi_cache.put(text, pi)
# return pi
from core.tokenizer import TokenKind, Token, LexerError
@dataclass()
@@ -35,13 +14,6 @@ class Node:
pass
class NotInitializedNode(Node):
pass
def __repr__(self):
return "**N/A**"
@dataclass()
class ParsingError(Node, ErrorObj):
pass
@@ -206,17 +178,6 @@ class BaseParser:
return parser_input.value
# @staticmethod
# def manage_eof(lst, strip_eof):
# if strip_eof:
# if len(lst) and lst[-1].type == TokenKind.EOF:
# lst.pop()
# return lst
#
# if len(lst) == 0 or not lst[-1].type == TokenKind.EOF:
# lst.append(Token(TokenKind.EOF, "", -1, -1, -1))
# return lst
@staticmethod
def get_tokens_boundaries(tokens):
"""
+185
View File
@@ -0,0 +1,185 @@
from typing import Union, List
from core.builtin_concepts_ids import BuiltinConcepts
from core.sheerka.services.SheerkaExecute import ParserInput, SheerkaExecute
from core.tokenizer import TokenKind, Token
from core.utils import get_text_from_tokens
from parsers.BaseParser import BaseParser
from parsers.expressions import ComparisonNode, ParenthesisMismatchError, NameExprNode, ComparisonType, VariableNode
class ComparisonParser(BaseParser):
"""
Parses xxx (== | > | < | >= | <= | != | in | not in) yyy
Nothing else
"""
def __init__(self, **kwargs):
super().__init__("Expression", 60, False, yield_eof=True)
def parse(self, context, parser_input: Union[ParserInput, List[Token]]):
"""
:param context:
:param parser_input:
:return:
"""
if isinstance(parser_input, list):
parser_input = context.sheerka.services[SheerkaExecute.NAME].get_parser_input(None, tokens=parser_input)
elif not isinstance(parser_input, ParserInput):
return None
context.log(f"Parsing '{parser_input}' with ComparisonExpressionParser", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return context.sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
self.parser_input.next_token()
node = self.parse_compare()
value = self.get_return_value_body(context.sheerka, self.parser_input.as_text(), node, node)
ret = self.sheerka.ret(
self.name,
not self.has_error,
value)
return ret
def parse_compare(self):
start = self.parser_input.pos
left = self.parse_names()
if left is None:
return None
if (comp := self.eat_comparison()) is None:
return left
right = self.parse_names()
end = right.end if right else self.parser_input.pos
return ComparisonNode(start, end, self.parser_input.tokens[start: end + 1], comp, left, right)
def parse_names(self):
token = self.parser_input.token
if token.type == TokenKind.EOF:
return None
buffer = []
paren_count = 0
last_lparen = None
last_rparen = None
start = self.parser_input.pos
while (paren_count > 0 or not self.eat_comparison(False)) and token.type != TokenKind.EOF:
buffer.append(token)
if token.type == TokenKind.LPAR:
last_lparen = token
paren_count += 1
if token.type == TokenKind.RPAR:
last_rparen = token
paren_count -= 1
self.parser_input.next_token(False)
token = self.parser_input.token
if paren_count != 0:
pass
if paren_count > 0:
self.error_sink.append(ParenthesisMismatchError(last_lparen))
return None
if paren_count < 0:
self.error_sink.append(ParenthesisMismatchError(last_rparen))
return None
if buffer[-1].type == TokenKind.WHITESPACE:
buffer.pop()
end = start + len(buffer) - 1
return self.try_to_recognize(NameExprNode(start, end, buffer))
def eat_comparison(self, eat=True):
token = self.parser_input.token
if token.type == TokenKind.EQUALSEQUALS:
if eat:
self.parser_input.next_token()
return ComparisonType.EQUALS
if token.type == TokenKind.LESS:
if self.parser_input.the_token_after(False).type == TokenKind.EQUALS:
if eat:
self.parser_input.next_token()
self.parser_input.next_token()
return ComparisonType.LESS_THAN_OR_EQUALS
else:
if eat:
self.parser_input.next_token()
return ComparisonType.LESS_THAN
if token.type == TokenKind.GREATER:
if self.parser_input.the_token_after(False).type == TokenKind.EQUALS:
if eat:
self.parser_input.next_token()
self.parser_input.next_token()
return ComparisonType.GREATER_THAN_OR_EQUALS
else:
if eat:
self.parser_input.next_token()
return ComparisonType.GREATER_THAN
if token.type == TokenKind.IDENTIFIER and token.value == "not":
if self.parser_input.the_token_after(True).value == "in":
if eat:
self.parser_input.next_token()
self.parser_input.next_token()
return ComparisonType.NOT_IN
if token.type == TokenKind.IDENTIFIER and token.value == "in":
if eat:
self.parser_input.next_token()
return ComparisonType.IN
if token.type == TokenKind.EMARK and self.parser_input.the_token_after(False).type == TokenKind.EQUALS:
if eat:
self.parser_input.next_token()
self.parser_input.next_token()
return ComparisonType.NOT_EQUAlS
return None
@staticmethod
def try_to_recognize(expr: NameExprNode):
not_a_variable = False
expect_dot = False
for t in expr.tokens:
if expect_dot and t.type != TokenKind.DOT:
not_a_variable = True
if t.type == TokenKind.DOT:
break # Only interested in the root part
elif t.type == TokenKind.WHITESPACE:
expect_dot = True
elif t.type == TokenKind.LPAR:
pass # try to recognize function
elif not str(t.value).isidentifier():
not_a_variable = True
if not_a_variable:
return expr
full_name = get_text_from_tokens(expr.tokens)
split = full_name.split(".")
if len(split) == 1:
return VariableNode(expr.start, expr.end, expr.tokens, split[0])
else:
return VariableNode(expr.start, expr.end, expr.tokens, split[0], *split[1:])
+24 -73
View File
@@ -1,13 +1,14 @@
from dataclasses import dataclass, field
from dataclasses import dataclass
import core.builtin_helpers
import core.utils
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept, ParserResultConcept
from core.concept import ConceptParts, DEFINITION_TYPE_BNF, DEFINITION_TYPE_DEF
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
from core.concept import DEFINITION_TYPE_BNF, DEFINITION_TYPE_DEF
from core.global_symbols import NotInit
from core.sheerka.services.SheerkaExecute import ParserInput, SheerkaExecute
from core.tokenizer import TokenKind, Keywords
from parsers.BaseCustomGrammarParser import BaseCustomGrammarParser, SyntaxErrorNode
from parsers.BaseParser import Node, ParsingError, NotInitializedNode, UnexpectedTokenParsingError
from parsers.BaseCustomGrammarParser import BaseCustomGrammarParser, SyntaxErrorNode, NameNode, CustomGrammarParserNode
from parsers.BaseParser import ParsingError, UnexpectedTokenParsingError
from parsers.BnfDefinitionParser import BnfDefinitionParser
@@ -17,15 +18,7 @@ class ParsingException(Exception):
@dataclass()
class DefConceptParsingResult(Node):
"""
Base node for all default parser nodes
"""
tokens: list = field(compare=False, repr=False)
@dataclass()
class DefConceptParsingError(DefConceptParsingResult, ParsingError):
class DefConceptParsingError(CustomGrammarParserNode, ParsingError):
pass
@@ -38,63 +31,21 @@ class CannotHandleParsingError(DefConceptParsingError):
@dataclass()
class NameNode(DefConceptParsingResult):
def get_name(self):
name = ""
first = True
for token in self.tokens:
if token.type == TokenKind.EOF:
break
if token.type == TokenKind.WHITESPACE:
continue
if not first:
name += " "
name += token.value[1:-1] if token.type == TokenKind.STRING else str(token.value)
first = False
return name
def __repr__(self):
return self.get_name()
def __eq__(self, other):
if not isinstance(other, NameNode):
return False
return self.get_name() == other.get_name()
def __hash__(self):
return hash(self.get_name())
@dataclass()
class DefConceptNode(DefConceptParsingResult):
name: NameNode = NotInitializedNode()
where: ReturnValueConcept = NotInitializedNode()
pre: ReturnValueConcept = NotInitializedNode()
post: ReturnValueConcept = NotInitializedNode()
body: ReturnValueConcept = NotInitializedNode()
ret: ReturnValueConcept = NotInitializedNode()
definition: ReturnValueConcept = NotInitializedNode()
class DefConceptNode(CustomGrammarParserNode):
name: NameNode = NotInit
where: ReturnValueConcept = NotInit
pre: ReturnValueConcept = NotInit
post: ReturnValueConcept = NotInit
body: ReturnValueConcept = NotInit
ret: ReturnValueConcept = NotInit
definition: ReturnValueConcept = NotInit
definition_type: str = None
def get_asts(self):
asts = {}
for part_key in ConceptParts:
prop_value = getattr(self, part_key.value)
if isinstance(prop_value, ReturnValueConcept) and \
isinstance(prop_value.body, ParserResultConcept) and \
hasattr(prop_value.body.body, "ast_"):
asts[part_key] = prop_value
return asts
@dataclass()
class IsaConceptNode(DefConceptParsingResult):
concept: NameNode = NotInitializedNode()
set: NameNode = NotInitializedNode()
class IsaConceptNode(CustomGrammarParserNode):
concept: NameNode = NotInit
set: NameNode = NotInit
class DefConceptParser(BaseCustomGrammarParser):
@@ -201,12 +152,12 @@ class DefConceptParser(BaseCustomGrammarParser):
def get_concept_definition(self, current_concept_def, parts):
if Keywords.FROM not in parts:
return None, NotInitializedNode()
return None, NotInit
tokens = parts[Keywords.FROM]
if len(tokens) == 1:
self.add_error(SyntaxErrorNode([], f"Empty '{tokens[0].value}' declaration."), False)
return None, NotInitializedNode()
return None, NotInit
if tokens[1].value == Keywords.BNF.value:
return self.get_concept_bnf_definition(current_concept_def, core.utils.strip_tokens(tokens[2:]))
@@ -216,7 +167,7 @@ class DefConceptParser(BaseCustomGrammarParser):
def get_concept_bnf_definition(self, current_concept_def, tokens):
if len(tokens) == 0:
self.add_error(SyntaxErrorNode([], "Empty 'bnf' declaration"), False)
return None, NotInitializedNode()
return None, NotInit
if tokens[0].type == TokenKind.COLON:
tokens = self.get_body(tokens[1:])
@@ -233,7 +184,7 @@ class DefConceptParser(BaseCustomGrammarParser):
if not parsing_result.status:
self.add_error(parsing_result.value)
return None, NotInitializedNode()
return None, NotInit
return DEFINITION_TYPE_BNF, parsing_result
@@ -243,7 +194,7 @@ class DefConceptParser(BaseCustomGrammarParser):
tokens = core.utils.strip_tokens(tokens[start:])
if len(tokens) == 0:
self.add_error(SyntaxErrorNode([], f"Empty 'from' declaration."), False)
return None, NotInitializedNode()
return None, NotInit
if tokens[0].type == TokenKind.COLON:
tokens = self.get_body(tokens[1:])
@@ -252,7 +203,7 @@ class DefConceptParser(BaseCustomGrammarParser):
def get_ast(self, keyword, parts):
if keyword not in parts:
return NotInitializedNode()
return NotInit
tokens = parts[keyword]
if len(tokens) == 1:
-122
View File
@@ -1,122 +0,0 @@
from dataclasses import dataclass
import core.utils
from core.builtin_concepts import BuiltinConcepts, ReturnValueConcept
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager, FormatAstNode
from core.tokenizer import Keywords
from parsers.BaseCustomGrammarParser import BaseCustomGrammarParser, KeywordNotFound
from parsers.BaseParser import BaseParser, Node
@dataclass
class FormatRuleNode(Node):
tokens: dict
rule: ReturnValueConcept = None
format_ast: FormatAstNode = None
class DefFormatRuleParser(BaseCustomGrammarParser):
"""
Class that will parse formatting rules definitions
eg: when xxx print yyy
where xxx will be evaluated in the context of BuiltinConcepts.EVAL_QUESTION_REQUESTED
and yyy is a internal way to describe a format (yet another one)
"""
KEYWORDS = [Keywords.WHEN, Keywords.PRINT]
KEYWORDS_VALUES = [k.value for k in KEYWORDS]
def __init__(self, **kwargs):
BaseCustomGrammarParser.__init__(self, "DefFormatRule", 60)
def parse(self, context, parser_input: ParserInput):
"""
:param context:
:param parser_input:
:return:
"""
if not isinstance(parser_input, ParserInput):
return None
if parser_input.from_tokens:
ret = context.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input))
self.log_result(context, parser_input, ret)
return ret
context.log(f"Parsing '{parser_input}' with FunctionParser", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
self.parser_input.next_token()
rule = self.parse_rule()
body = self.get_return_value_body(sheerka, parser_input.as_text(), rule, rule)
ret = sheerka.ret(self.name, not self.has_error, body)
self.log_result(context, parser_input.as_text(), ret)
return ret
def parse_rule(self):
parts = self.get_parts(self.KEYWORDS_VALUES, strip_tokens=True)
if parts is None:
return None
node = FormatRuleNode(parts)
try:
res = self.get_when(parts[Keywords.WHEN])
if res is None:
return node
node.rule = res
parsed = self.get_print(parts[Keywords.PRINT])
if parsed is None:
return node
node.format_ast = parsed
except KeyError as e:
self.add_error(KeywordNotFound([], [e.args[0].value]))
return None
return node
def get_when(self, tokens):
"""
Validate the when part of the rule.
:param tokens:
:return:
"""
source = core.utils.get_text_from_tokens(core.utils.strip_tokens(tokens[1:]))
res = self.sheerka.services[SheerkaRuleManager.NAME].compile_when(self.context, self.name, source)
if not isinstance(res, list):
self.add_error(res.value)
return None
return res
def get_print(self, tokens):
"""
Validate the print part
:param tokens:
:return:
"""
source = core.utils.get_text_from_tokens(core.utils.strip_tokens(tokens[1:]))
res = self.sheerka.services[SheerkaRuleManager.NAME].compile_print(self.context, source)
if not res.status:
self.add_error(res.value)
return None
return res.body
+231
View File
@@ -0,0 +1,231 @@
from dataclasses import dataclass
from typing import List
import core.utils
from core.builtin_concepts import ReturnValueConcept
from core.builtin_concepts_ids import BuiltinConcepts
from core.global_symbols import NotInit
from core.sheerka.services.SheerkaExecute import ParserInput
from core.sheerka.services.SheerkaRuleManager import SheerkaRuleManager, FormatAstNode, RuleCompiledPredicate
from core.sheerka.services.sheerka_service import FailedToCompileError
from core.tokenizer import Keywords, TokenKind
from parsers.BaseCustomGrammarParser import BaseCustomGrammarParser, NameNode, KeywordNotFound, SyntaxErrorNode
from parsers.BaseParser import Node, UnexpectedEofParsingError
from sheerkarete.conditions import Condition
@dataclass()
class DefRuleNode(Node):
tokens: dict
name: NameNode = NotInit
when: List[RuleCompiledPredicate] = NotInit
rete: List[List[Condition]] = NotInit
@dataclass()
class DefExecRuleNode(DefRuleNode):
then: ReturnValueConcept = NotInit
@dataclass
class DefFormatRuleNode(DefRuleNode):
print: FormatAstNode = NotInit
class DefRuleParser(BaseCustomGrammarParser):
DEF_KEYWORDS = [Keywords.RULE, Keywords.AS]
DEF_KEYWORDS_VALUES = [k.value for k in DEF_KEYWORDS]
RULE_KEYWORDS = [Keywords.WHEN, Keywords.THEN, Keywords.PRINT]
RULE_KEYWORDS_VALUES = [k.value for k in RULE_KEYWORDS]
def __init__(self, **kwargs):
BaseCustomGrammarParser.__init__(self, "DefRule", 60)
def parse(self, context, parser_input: ParserInput):
if not isinstance(parser_input, ParserInput):
return None
# rule parser can only manage string text
if parser_input.from_tokens:
ret = context.sheerka.ret(
self.name,
False,
context.sheerka.new(BuiltinConcepts.NOT_FOR_ME, body=parser_input))
self.log_result(context, parser_input, ret)
return ret
context.log(f"Parsing '{parser_input}' with DefRuleParser", self.name)
sheerka = context.sheerka
if parser_input.is_empty():
return sheerka.ret(self.name,
False,
sheerka.new(BuiltinConcepts.IS_EMPTY))
if not self.reset_parser(context, parser_input):
return self.sheerka.ret(self.name,
False,
context.sheerka.new(BuiltinConcepts.ERROR, body=self.error_sink))
self.parser_input.next_token()
node = self.parse_def_rule()
body = self.get_return_value_body(sheerka, parser_input.as_text(), node, node)
ret = sheerka.ret(self.name, not self.has_error, body)
self.log_result(context, parser_input.as_text(), ret)
return ret
def parse_def_rule(self):
token = self.parser_input.token
if token.value == Keywords.DEF.value:
return self.parse_rule_name()
elif token.value in (Keywords.WHEN.value, Keywords.PRINT.value):
return self.parse_rule()
else:
self.add_error(KeywordNotFound([], [Keywords.WHEN.value]))
return None
def parse_rule_name(self):
"""
Parses def rule xxx as yyyy
"""
self.parser_input.next_token() # eat def
token = self.parser_input.token
if token.value != Keywords.RULE.value:
self.add_error(KeywordNotFound([token], [Keywords.RULE.value]))
return None
buffer = []
while self.parser_input.next_token(skip_whitespace=False):
token = self.parser_input.token
if token.value == Keywords.AS.value:
break
else:
buffer.append(token)
else: # 'as' keyword not found
self.add_error(KeywordNotFound([], [Keywords.AS.value]))
return None
if not self.parser_input.next_token(): # eat as
self.add_error(UnexpectedEofParsingError("While parsing 'when'."))
return None
rule = self.parse_rule()
name_node = self.get_concept_name(buffer)
if name_node is None:
return rule
rule.name = name_node
return rule
def parse_rule(self):
""""
Parses 'when xxx then yyy'
or 'when xxx print yyy'
"""
parts = self.get_parts(self.RULE_KEYWORDS_VALUES, strip_tokens=True)
if Keywords.THEN in parts and Keywords.PRINT in parts:
self.add_error(SyntaxErrorNode([], "Cannot have both 'print' and 'then' keywords"))
return None
if Keywords.THEN not in parts and Keywords.PRINT not in parts:
self.add_error(KeywordNotFound([], [Keywords.THEN.value, Keywords.PRINT.value]))
return None
return self.parse_format_rule(parts) if Keywords.PRINT in parts else self.parse_exec_rule(parts)
def parse_exec_rule(self, parts):
node = DefExecRuleNode(parts)
try:
compiled_result = self.get_when(parts[Keywords.WHEN])
if compiled_result is None:
return node
node.when = compiled_result.compiled_predicates
node.rete = compiled_result.rete_disjunctions
parsed = self.get_then(parts[Keywords.THEN])
if parsed is None:
return node
node.then = parsed
except KeyError as e:
self.add_error(KeywordNotFound([], [e.args[0].value]))
return None
return node
def parse_format_rule(self, parts):
node = DefFormatRuleNode(parts)
try:
compiled_result = self.get_when(parts[Keywords.WHEN])
if compiled_result is None:
return node
node.when = compiled_result.compiled_predicates
node.rete = compiled_result.rete_disjunctions
parsed = self.get_print(parts[Keywords.PRINT])
if parsed is None:
return node
node.print = parsed
except KeyError as e:
self.add_error(KeywordNotFound([], [e.args[0].value]))
return None
return node
def get_when(self, tokens):
"""
Validate the when part of the rule.
:param tokens:
:return:
"""
source = core.utils.get_text_from_tokens(core.utils.strip_tokens(tokens[1:]))
try:
rule_manager_service = self.sheerka.services[SheerkaRuleManager.NAME]
compiled_result = rule_manager_service.compile_when(self.context, self.name, source)
except FailedToCompileError as ex:
for c in ex.cause:
self.add_error(c)
return None
return compiled_result
def get_then(self, tokens):
source = core.utils.get_text_from_tokens(core.utils.strip_tokens(tokens[1:]))
res = self.sheerka.services[SheerkaRuleManager.NAME].compile_exec(self.context, source)
if not res.status:
self.add_error(res.value)
return None
return res
def get_print(self, tokens):
"""
Validate the print part
:param tokens:
:return:
"""
source = core.utils.get_text_from_tokens(core.utils.strip_tokens(tokens[1:]))
res = self.sheerka.services[SheerkaRuleManager.NAME].compile_print(self.context, source)
if not res.status:
self.add_error(res.value)
return None
return res.body
def get_concept_name(self, tokens):
name_tokens = core.utils.strip_tokens(tokens)
if len(name_tokens) == 0:
self.add_error(SyntaxErrorNode([], "Name is mandatory"))
return None
for token in name_tokens:
if token.type == TokenKind.NEWLINE:
self.add_error(SyntaxErrorNode([token], "Newline are not allowed in name."))
return None
name_node = NameNode(name_tokens) # skip the first token
return name_node
+184 -224
View File
@@ -1,183 +1,64 @@
from dataclasses import dataclass
from typing import List, Tuple, Callable
from itertools import product
from core.builtin_concepts import BuiltinConcepts
from core.concept import Concept
from core.builtin_helpers import only_successful, parse_unrecognized, get_inner_body, parse_python, \
get_lexer_nodes_using_positions
from core.sheerka.services.SheerkaExecute import ParserInput
from core.tokenizer import TokenKind, Token
from parsers.BaseParser import Node, BaseParser, UnexpectedTokenParsingError, UnexpectedEofParsingError, ParsingError
from core.sheerka.services.sheerka_service import FailedToCompileError
from core.tokenizer import TokenKind, Tokenizer, Keywords
from core.utils import get_text_from_tokens
from parsers.BaseNodeParser import UnrecognizedTokensNode
from parsers.BaseParser import BaseParser, UnexpectedTokenParsingError, UnexpectedEofParsingError
from parsers.PythonWithConceptsParser import PythonWithConceptsParser
from parsers.expressions import ParenthesisNode, OrNode, AndNode, NotNode, LeftPartNotFoundError, \
ParenthesisMismatchError, NameExprNode, ExprNode, VariableNode, ComparisonNode
from sheerkarete.common import V
from sheerkarete.conditions import Condition, AndConditions
class ExprNode(Node):
"""
Base ExprNode
eval() must be overridden
"""
class ReteConditionsEmitter:
def eval(self, obj):
return True
def __init__(self, context):
from parsers.ComparisonParser import ComparisonParser
self.context = context
self.comparison_parser = ComparisonParser()
self.var_counter = 0
self.variables = {}
def add_variable(self, target):
var_name = f"__x_{self.var_counter:02}__"
self.var_counter += 1
self.variables[target] = var_name
return var_name
@dataclass()
class LeftPartNotFoundError(ParsingError):
"""
When the expression starts with 'or' or 'and'
"""
pass
def init_variable_if_needed(self, node, res):
if node.name not in self.variables:
var_name = self.add_variable(node.name)
res.append(Condition(V(var_name), "__name__", node.name))
return V(self.variables[node.name])
class NameExprNode(ExprNode):
def __init__(self, tokens):
self.tokens = tokens
self.value = "".join([t.str_value for t in self.tokens])
def get_conditions(self, expr_nodes):
conditions = []
for expr_node in expr_nodes:
parsed_ret = self.comparison_parser.parse(self.context, expr_node.tokens)
if not parsed_ret.status:
raise FailedToCompileError(parsed_ret.body)
tree = parsed_ret.body.body
def eval(self, obj):
return self.value
if isinstance(tree, VariableNode):
var_name = self.init_variable_if_needed(tree, conditions)
if tree.attributes_str is not None:
conditions.append(Condition(var_name, tree.attributes_str, True))
def __repr__(self):
return f"NameExprNode('{self.value}')"
elif isinstance(tree, ComparisonNode):
if isinstance(tree.left, VariableNode):
left = self.init_variable_if_needed(tree.left, conditions)
attr = tree.left.attributes_str or "__self__"
right = eval(get_text_from_tokens(tree.right.tokens))
conditions.append(Condition(left, attr, right))
def __str__(self):
return self.value
@dataclass
class PropertyEqualsNode(ExprNode):
prop: str
value: object
def eval(self, obj):
if hasattr(obj, self.prop):
return str(getattr(obj, self.prop)) == self.value
return False
@dataclass()
class PropertyContainsNode(ExprNode):
prop: str
value: object
def eval(self, obj):
if hasattr(obj, self.prop):
return self.value in str(getattr(obj, self.prop))
return False
@dataclass
class PropertyEqualsSequenceNode(ExprNode):
"""
To use when the test must be done across parent and child
"""
props: List[str]
values: List[object]
def eval(self, obj):
index = len(self.props) - 1
while True:
if not hasattr(obj, self.props[index]) or getattr(obj, self.props[index]) != self.values[index]:
return False
if index == 0:
break
index -= 1
obj = obj.get_parent() if hasattr(obj, "get_parent") else obj.parent
if obj is None:
return False
return True
@dataclass()
class IsaNode(ExprNode):
"""
To use to replicate instanceof, sheerka.instanceof,
"""
obj_class: object
def eval(self, obj):
if isinstance(self.obj_class, type):
return isinstance(obj, self.obj_class)
if isinstance(self.obj_class, (BuiltinConcepts, str)):
return isinstance(obj, Concept) and str(self.obj_class) == obj.key
return False
@dataclass()
class LambdaNode(ExprNode):
"""
Generic expression to ease the tests
"""
lambda_exp: Callable[[object], bool]
def eval(self, obj):
try:
return self.lambda_exp(obj)
except Exception:
pass
@dataclass(init=False)
class AndNode(ExprNode):
parts: Tuple[ExprNode]
def __init__(self, *parts: ExprNode):
self.parts = parts
def eval(self, obj):
res = self.parts[0].eval(obj) and self.parts[1].eval(obj)
for part in self.parts[2:]:
res &= part.eval(obj)
return res
def __repr__(self):
return f"AndNode(" + ", ".join([repr(p) for p in self.parts]) + ")"
def __str__(self):
return " and ".join([str(p) for p in self.parts])
@dataclass(init=False)
class OrNode(ExprNode):
parts: Tuple[ExprNode]
def __init__(self, *parts: ExprNode):
self.parts = parts
def eval(self, obj):
res = self.parts[0].eval(obj) or self.parts[1].eval(obj)
for part in self.parts[2:]:
res |= part.eval(obj)
return res
def __repr__(self):
return f"OrNode(" + ", ".join([repr(p) for p in self.parts]) + ")"
def __str__(self):
return " or ".join([str(p) for p in self.parts])
@dataclass()
class NotNode(ExprNode):
node: ExprNode
def eval(self, obj):
return not self.node.eval(obj)
class FalseNode(ExprNode):
def eval(self, obj):
return False
class TrueNode(ExprNode):
def eval(self, obj):
return True
return [AndConditions(conditions)]
class ExpressionParser(BaseParser):
@@ -191,6 +72,15 @@ class ExpressionParser(BaseParser):
def __init__(self, **kwargs):
super().__init__("Expression", 50, False, yield_eof=True)
self.and_tokens = list(Tokenizer(" and ", yield_eof=False))
self.and_not_tokens = list(Tokenizer(" and not ", yield_eof=False))
self.not_tokens = list(Tokenizer("not ", yield_eof=False))
@staticmethod
def clean_parenthesis_nodes(nodes):
for i, node in enumerate(nodes):
if isinstance(node, ParenthesisNode):
nodes[i] = node.node
def parse(self, context, parser_input: ParserInput):
"""
@@ -232,6 +122,7 @@ class ExpressionParser(BaseParser):
return ret
def parse_or(self):
start = self.parser_input.pos
expr = self.parse_and()
token = self.parser_input.token
if token.type != TokenKind.IDENTIFIER or token.value != "or":
@@ -243,14 +134,19 @@ class ExpressionParser(BaseParser):
expr = self.parse_and()
if expr is None:
self.add_error(UnexpectedEofParsingError("When parsing 'or'"))
return OrNode(*parts)
end = self.parser_input.pos
self.clean_parenthesis_nodes(parts)
return OrNode(start, end, self.parser_input.tokens[start: end + 1], *parts)
parts.append(expr)
token = self.parser_input.token
return OrNode(*parts)
end = parts[-1].end
self.clean_parenthesis_nodes(parts)
return OrNode(start, end, self.parser_input.tokens[start: end + 1], *parts)
def parse_and(self):
expr = self.parse_names()
start = self.parser_input.pos
expr = self.parse_not()
token = self.parser_input.token
if token.type != TokenKind.IDENTIFIER or token.value != "and":
return expr
@@ -258,27 +154,46 @@ class ExpressionParser(BaseParser):
parts = [expr]
while token.type == TokenKind.IDENTIFIER and token.value == "and":
self.parser_input.next_token()
expr = self.parse_names()
expr = self.parse_not()
if expr is None:
self.add_error(UnexpectedEofParsingError("When parsing 'and'"))
return AndNode(*parts)
end = self.parser_input.pos
self.clean_parenthesis_nodes(parts)
return AndNode(start, end, self.parser_input.tokens[start: end + 1], *parts)
parts.append(expr)
token = self.parser_input.token
return AndNode(*parts)
end = parts[-1].end
self.clean_parenthesis_nodes(parts)
return AndNode(start, end, self.parser_input.tokens[start: end + 1], *parts)
def parse_not(self):
token = self.parser_input.token
start = self.parser_input.pos
if token.type == TokenKind.IDENTIFIER and token.value == "not":
self.parser_input.next_token()
parsed = self.parse_not()
node = parsed.node if isinstance(parsed, ParenthesisNode) else parsed
return NotNode(start,
parsed.end,
self.parser_input.tokens[start: parsed.end + 1],
node)
else:
return self.parse_names()
def parse_names(self):
def stop():
return token.type == TokenKind.EOF or \
paren_count == 0 and token.type == TokenKind.RPAR or \
token.type == TokenKind.IDENTIFIER and token.value in ("and", "or")
token.type == TokenKind.IDENTIFIER and token.value in ("and", "or", "not")
token = self.parser_input.token
if token.type == TokenKind.EOF:
return None
if token.type == TokenKind.LPAR:
start = self.parser_input.pos
self.parser_input.next_token()
expr = self.parse_or()
token = self.parser_input.token
@@ -286,14 +201,18 @@ class ExpressionParser(BaseParser):
self.error_sink.append(
UnexpectedTokenParsingError(f"Unexpected token '{token}'", token, [TokenKind.RPAR]))
return expr
end = self.parser_input.pos
self.parser_input.next_token()
return expr
return ParenthesisNode(start, end, None, expr)
buffer = []
paren_count = 0
last_paren = None
start = self.parser_input.pos
while not stop():
buffer.append(token)
if token.type == TokenKind.LPAR:
last_paren = token
paren_count += 1
if token.type == TokenKind.RPAR:
paren_count -= 1
@@ -305,65 +224,106 @@ class ExpressionParser(BaseParser):
self.error_sink.append(LeftPartNotFoundError())
return None
if paren_count != 0:
self.error_sink.append(ParenthesisMismatchError(last_paren))
return None
if buffer[-1].type == TokenKind.WHITESPACE:
buffer.pop()
return NameExprNode(buffer)
end = start + len(buffer) - 1
return NameExprNode(start, end, buffer)
def compile_conjunctions(self, context, conjunctions, who):
"""
Transform a list of conjunctions (AND and OR) into one or multiple CompiledExpr
:param context:
:param conjunctions: list of ExprNode
:param who: service that calls the method
:returns: List Of CompiledExpr
May throw FailedToRecognized if a conjunction cannot be parsed
"""
recognized = []
for conjunction in conjunctions:
# try to recognize conjunction, one by one
# negative conjunction can be a concept starting with 'not'
parsed_ret = parse_unrecognized(
context,
conjunction.get_value(), # we remove the 'NOT' part when needed to ease the recognition
parsers="all",
who=who,
prop=Keywords.WHEN,
filter_func=only_successful)
class ExpressionVisitor:
"""
Pyhtonic implementation of visitors for ExprNode
"""
if parsed_ret.status:
recognized.append(get_inner_body(context, parsed_ret.body))
else:
raise FailedToCompileError(parsed_ret.body)
def visit(self, expr_node):
name = expr_node.__class__.__name__
# for each conjunction, we have a list of recognized concepts (or python node)
# we need a cartesian product of the results
# Explanation for later
# conjunction[0] : 'x is a y' that can be resolved with two concepts c:|1001: and c:|1002:
# conjunction[1] : 'y is an z' that can also be resolved with two concepts (c:|1003: and c:|1004)
# so to understand the full question 'x is a y and y is an z'
# we can have c:|1001: then c:|1003:
# or c:|1001: then c:|1004:
# or c:|1002: then c:|1003:
# or c:|1002: then c:|1004:
# if one of this combination works, it means that the question 'x is a y and y is an z' was matched
# hence the cartesian product
product_of_recognized = list(product(*recognized))
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(expr_node)
return_values = []
for recognized_conjunctions in product_of_recognized:
if len(recognized_conjunctions) == 1 and not isinstance(conjunctions[0], NotNode):
return_values.append(recognized_conjunctions[0])
elif len(recognized_conjunctions) == 1 and recognized_conjunctions[0].who == "parsers.Python":
# it is a negated python Node. Need to parse again
ret = parse_python(context, source=str(conjunctions[0]))
if ret.status:
return_values.append(ret)
else:
# find a way to track the failure
pass
else:
# complex result. Use PythonWithNode
lexer_nodes = get_lexer_nodes_using_positions(recognized_conjunctions,
self._get_positions(conjunctions))
def generic_visit(self, expr_node):
"""Called if no explicit visitor function exists for a node."""
for field, value in expr_node.__dict__.items():
if isinstance(value, (list, tuple)):
for item in value:
if isinstance(item, ExprNode):
self.visit(item)
elif isinstance(value, ExprNode):
self.visit(value)
# put back the 'and' / 'not' node
for i in range(len(lexer_nodes) - 1, 0, -1):
end = lexer_nodes[i].start - 1
start = lexer_nodes[i - 1].end + 1
if isinstance(conjunctions[i], NotNode):
lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_not_tokens))
else:
lexer_nodes.insert(i, UnrecognizedTokensNode(start, end, self.and_tokens))
# add the starting 'not' if needed
# and reindex the following positions
if isinstance(conjunctions[0], NotNode):
lexer_nodes[0].start = 2
lexer_nodes.insert(0, UnrecognizedTokensNode(0, 1, self.not_tokens))
class TrueifyVisitor(ExpressionVisitor):
"""
Visit an ExprNode
replace all the nodes containing a variable to 'trueify' with True
The node containing both variables to trueify and to skip are skipped
"""
python_with_concept_node_ret = PythonWithConceptsParser().parse_nodes(context, lexer_nodes)
if not python_with_concept_node_ret.status:
# find a way to track the failure
pass
return_values.append(python_with_concept_node_ret)
def __init__(self, to_trueify, to_skip):
self.to_trueify = to_trueify
self.to_skip = to_skip
rete_cond_emitter = ReteConditionsEmitter(context)
rete_disjunctions = rete_cond_emitter.get_conditions(conjunctions)
def visit_AndNode(self, expr_node):
parts = []
for part in expr_node.parts:
parts.append(self.visit(part))
return AndNode(*parts)
return return_values, rete_disjunctions
def visit_OrNode(self, expr_node):
parts = []
for part in expr_node.parts:
parts.append(self.visit(part))
return OrNode(*parts)
def visit_NameExprNode(self, expr_node):
return_true = False
for t in expr_node.tokens:
if t.type == TokenKind.IDENTIFIER:
if t.value in self.to_skip:
return expr_node
if t.value in self.to_trueify:
return_true = True
return NameExprNode([Token(TokenKind.IDENTIFIER, "True", -1, -1, -1)]) if return_true else expr_node
@staticmethod
def _get_positions(expr_nodes):
"""
simply manage NotNodes to address the fact that the 'not' part in removed
"""
for expr in expr_nodes:
if isinstance(expr, NotNode):
yield ExprNode(expr.start + 2, expr.end, expr.tokens[2:])
else:
yield expr
+7
View File
@@ -44,9 +44,15 @@ class NamesNode(FunctionParserNode):
return "".join([t.str_value for t in self.tokens])
def to_unrecognized(self):
"""
UnrecognizedTokensNode with all tokens
"""
return UnrecognizedTokensNode(self.start, self.end, self.tokens).fix_source()
def to_str_unrecognized(self):
"""
UnrecognizedTokensNode with one token, which is a string token of all the tokens
"""
token = Token(TokenKind.STRING,
"'" + self.str_value() + "'",
self.tokens[0].index,
@@ -342,6 +348,7 @@ class FunctionParser(BaseParser):
res = [SourceCodeWithConceptNode(function_node.first.to_unrecognized(), function_node.last.to_unrecognized())]
# try to recognize every parameter, one by one
for param in function_node.parameters:
if isinstance(param.value, NamesNode):
# try to recognize concepts
+10 -3
View File
@@ -35,8 +35,9 @@ class ConceptDetectedError(ParsingError):
class PythonNode(Node):
def __init__(self, source, ast_=None, objects=None):
self.source = source
def __init__(self, source, ast_=None, original_source=None, objects=None):
self.source = source # what was parsed
self.original_source = original_source or source # to remember source before concept id replacement
self.ast_ = ast_ # if ast_ else ast.parse(source, mode="eval") if source else None
self.objects = objects or {} # when objects (mainly concepts or rules) are recognized in the expression
self.compiled = None
@@ -64,6 +65,9 @@ class PythonNode(Node):
if self.source != other.source:
return False
if self.original_source != other.original_source:
return False
if self.ast_ and other.ast_:
self_dump = self.get_dump(self.ast_)
other_dump = self.get_dump(other.ast_)
@@ -74,6 +78,9 @@ class PythonNode(Node):
def __hash__(self):
return hash((self.source, self.ast_.hash))
def get_python_node(self):
return self
@staticmethod
def get_dump(ast_):
if not ast_:
@@ -156,7 +163,7 @@ class PythonParser(BaseParser):
BuiltinConcepts.PARSER_RESULT,
parser=self,
source=parser_input.as_text(),
body=PythonNode(source_code, tree, tracker),
body=PythonNode(source_code, tree, objects=tracker),
try_parsed=None))
self.log_result(context, parser_input.as_text(), ret)
+21 -58
View File
@@ -1,9 +1,8 @@
from core.builtin_concepts import BuiltinConcepts
from core.sheerka.services.SheerkaExecute import SheerkaExecute
from core.builtin_helpers import parse_python, CreateObjectIdentifiers
from parsers.BaseNodeParser import ConceptNode, RuleNode
from parsers.BaseNodeParser import SourceCodeWithConceptNode
from parsers.BaseParser import BaseParser
from parsers.PythonParser import PythonParser
from parsers.UnrecognizedNodeParser import UnrecognizedNodeParser
unrecognized_nodes_parser = UnrecognizedNodeParser()
@@ -13,16 +12,6 @@ class PythonWithConceptsParser(BaseParser):
def __init__(self, **kwargs):
super().__init__("PythonWithConcepts", 20)
@staticmethod
def sanitize(identifier):
if identifier is None:
return ""
res = ""
for c in identifier:
res += c if c.isalnum() else "0"
return res
@staticmethod
def get_nodes(nodes):
for node in nodes:
@@ -46,73 +35,47 @@ class PythonWithConceptsParser(BaseParser):
source = ""
to_parse = ""
identifiers = {}
identifiers_key = {}
python_ids_mappings = {}
def _get_identifier(c, wrapper):
"""
Get an identifier for a concept.
Make sure to return the same identifier if the same concept
Make sure to return a different identifier if same name but different concept
Internal function because I don't want identifiers, identifiers_key and python_ids_mappings
to be instance variables
I would like to keep this parser as stateless as possible
:param c:
:return:
"""
if id(c) in identifiers:
return identifiers[id(c)]
identifier = wrapper + self.sanitize(c.key or c.name)
if c.id:
identifier += "__" + c.id
if identifier in identifiers_key:
identifiers_key[identifier] += 1
identifier += f"_{identifiers_key[identifier]}"
else:
identifiers_key[identifier] = 0
identifier += wrapper
identifiers[id(c)] = identifier
return identifier
last_token_index = 0
ids_manager = CreateObjectIdentifiers()
for node in self.get_nodes(nodes):
if isinstance(node, ConceptNode):
source += node.source
if to_parse:
if node.start != last_token_index + 1 and source: # put back missing whitespace
source += " "
to_parse += " "
source += node.source
concept = node.concept
python_id = _get_identifier(concept, "__C__")
python_id = ids_manager.get_identifier(concept, "__C__")
to_parse += python_id
python_ids_mappings[python_id] = concept
last_token_index = node.end
elif isinstance(node, RuleNode):
source += node.source
if to_parse:
if node.start != last_token_index + 1 and source: # put back missing whitespace
source += " "
to_parse += " "
source += node.source
rule = node.rule
python_id = _get_identifier(rule, "__R__")
python_id = ids_manager.get_identifier(rule, "__R__")
to_parse += python_id
python_ids_mappings[python_id] = rule
last_token_index = node.end
else:
source += node.source
to_parse += node.source
to_parse += node.get_source_to_parse()
last_token_index = node.end
if hasattr(node, "get_python_node"):
python_ids_mappings.update(node.get_python_node().objects)
with context.push(BuiltinConcepts.PARSE_CODE,
{"language": "Python", "source": to_parse},
"Trying Python for '" + to_parse + "'") as sub_context:
parser_input = context.sheerka.services[SheerkaExecute.NAME].get_parser_input(to_parse)
python_parser = PythonParser()
result = python_parser.parse(sub_context, parser_input)
result = parse_python(context, to_parse, "Trying Python for '" + to_parse + "'")
if result.status:
python_node = result.body.body
python_node.source = source
python_node.original_source = source
python_node.objects = python_ids_mappings
return sheerka.ret(
+6 -6
View File
@@ -53,7 +53,7 @@ class UnrecognizedNodeParser(BaseParser):
if not res.status:
self.add_error(res.body)
else:
sequences_found = core.utils.product(sequences_found, [res.body])
sequences_found = core.utils.sheerka_product(sequences_found, [res.body])
elif isinstance(node, UnrecognizedTokensNode):
res = parse_unrecognized(context, node.source, PARSERS)
@@ -62,16 +62,16 @@ class UnrecognizedNodeParser(BaseParser):
lexer_nodes = get_lexer_nodes(res.body.body, node.start, node.tokens)
if lexer_nodes:
# make lexer_nodes is not empty (for example, some Python result are discarded)
sequences_found = core.utils.product(sequences_found, lexer_nodes)
sequences_found = core.utils.sheerka_product(sequences_found, lexer_nodes)
else:
sequences_found = core.utils.product(sequences_found, [node])
sequences_found = core.utils.sheerka_product(sequences_found, [node])
has_unrecognized = True
else:
sequences_found = core.utils.product(sequences_found, [node])
sequences_found = core.utils.sheerka_product(sequences_found, [node])
has_unrecognized = True
elif isinstance(node, SourceCodeNode):
sequences_found = core.utils.product(sequences_found, [node])
sequences_found = core.utils.sheerka_product(sequences_found, [node])
has_unrecognized = True # to let PythonWithConceptParser validate the code
elif isinstance(node, SourceCodeWithConceptNode):
@@ -82,7 +82,7 @@ class UnrecognizedNodeParser(BaseParser):
break
else:
node.nodes[i] = res.body
sequences_found = core.utils.product(sequences_found, [node])
sequences_found = core.utils.sheerka_product(sequences_found, [node])
has_unrecognized = True # to let PythonWithConceptParser validate the code
else: # cannot happen as of today :-)
+414
View File
@@ -0,0 +1,414 @@
from dataclasses import dataclass
from typing import List, Tuple
from core.tokenizer import Token, TokenKind, Tokenizer
from core.utils import tokens_are_matching
from parsers.BaseParser import Node, ParsingError
class ComparisonType:
EQUALS = "EQ"
NOT_EQUAlS = "NOT_EQ"
LESS_THAN = "LT"
LESS_THAN_OR_EQUALS = "LTE"
GREATER_THAN = "GT"
GREATER_THAN_OR_EQUALS = "GTE"
IN = "IN"
NOT_IN = "NOT_IN"
@dataclass()
class LeftPartNotFoundError(ParsingError):
"""
When the expression starts with 'or' or 'and'
"""
pass
@dataclass()
class ParenthesisMismatchError(ParsingError):
token: Token
@dataclass
class ExprNode(Node):
"""
Base ExprNode
eval() must be overridden
"""
start: int # index of the first token
end: int # index of the last token
tokens: List[Token]
def eval(self, obj):
return True
def __eq__(self, other):
if not isinstance(other, ExprNode):
return False
if self.start != other.start or self.end != other.end:
return False
if other.tokens is not None and other.tokens != self.tokens:
return False
return True
def __hash__(self):
return hash((self.start, self.end))
class NameExprNode(ExprNode):
def __init__(self, start, end, tokens):
super().__init__(start, end, tokens)
self.tokens = tokens
self.value = "".join([t.str_value for t in self.tokens])
def eval(self, obj):
return self.value
def get_value(self):
return self.value
def __repr__(self):
return f"NameExprNode('{self.value}')"
def __str__(self):
return self.value
def __eq__(self, other):
if not isinstance(other, NameExprNode):
return False
return super().__eq__(other)
def __hash__(self):
return super().__hash__()
@dataclass(init=False)
class AndNode(ExprNode):
parts: Tuple[ExprNode]
def __init__(self, start, end, tokens, *parts: ExprNode):
super().__init__(start, end, tokens)
self.parts = parts
def eval(self, obj):
res = self.parts[0].eval(obj) and self.parts[1].eval(obj)
for part in self.parts[2:]:
res &= part.eval(obj)
return res
def __repr__(self):
return f"AndNode(start={self.start}, end={self.end}, " + ", ".join([repr(p) for p in self.parts]) + ")"
def __str__(self):
return " and ".join([str(p) for p in self.parts])
def __eq__(self, other):
if not isinstance(other, AndNode):
return False
if self.start != other.start or self.end != other.end:
return False
if other.tokens is not None and other.tokens != self.tokens:
return False
return self.parts == other.parts
def __hash__(self):
return hash((self.start, self.end, self.parts))
@dataclass(init=False)
class OrNode(ExprNode):
parts: Tuple[ExprNode]
def __init__(self, start, end, tokens, *parts: ExprNode):
super().__init__(start, end, tokens)
self.parts = parts
def eval(self, obj):
res = self.parts[0].eval(obj) or self.parts[1].eval(obj)
for part in self.parts[2:]:
res |= part.eval(obj)
return res
def __repr__(self):
return f"OrNode(start={self.start}, end={self.end}, " + ", ".join([repr(p) for p in self.parts]) + ")"
def __str__(self):
return " or ".join([str(p) for p in self.parts])
def __eq__(self, other):
if not isinstance(other, OrNode):
return False
if self.start != other.start or self.end != other.end:
return False
if other.tokens is not None and other.tokens != self.tokens:
return False
return self.parts == other.parts
def __hash__(self):
return hash((self.start, self.end, self.parts))
@dataclass()
class NotNode(ExprNode):
node: ExprNode
def eval(self, obj):
return not self.node.eval(obj)
def get_value(self):
return self.node.get_value()
def __repr__(self):
return f"NotNode(start={self.start}, end={self.end}, {self.node!r})"
def __str__(self):
return f"not {self.node}"
def __eq__(self, other):
if not isinstance(other, NotNode):
return False
if self.start != other.start or self.end != other.end:
return False
if other.tokens is not None and other.tokens != self.tokens:
return False
return self.node == other.node
def __hash__(self):
return hash((self.start, self.end, self.node))
@dataclass()
class ParenthesisNode(ExprNode):
"""
Contains the boundaries of an expression inside parenthesis
Need it, just to keep track of the boundaries of the parenthesis
"""
node: ExprNode
def __eq__(self, other):
if not isinstance(other, ParenthesisNode):
return False
if self.start != other.start or self.end != other.end:
return False
if other.tokens is not None and other.tokens != self.tokens:
return False
return self.node == other.node
def __hash__(self):
return hash((self.start, self.end, self.node))
class VariableNode(ExprNode):
def __init__(self, start, end, tokens, name, *attributes):
super().__init__(start, end, tokens)
self.name = name.strip()
self.attributes = [attr.strip() for attr in attributes]
if len(self.attributes) > 0:
self.attributes_str = ".".join(self.attributes)
else:
self.attributes_str = None
def __eq__(self, other):
if id(self) == id(other):
return True
if not isinstance(other, VariableNode):
return False
return self.name == other.name and self.attributes == other.attributes
def __hash__(self):
return hash((self.name, self.attributes))
def __repr__(self):
prefix = f"VariableNode(start={self.start}, end={self.end}, '{self.name}"
if len(self.attributes) > 0:
return prefix + "." + ".".join(self.attributes) + "')"
else:
return prefix + "')"
def __str__(self):
if self.attributes:
return self.name + "." + ".".join(self.attributes)
else:
return self.name
@dataclass
class ComparisonNode(ExprNode):
comp: str
left: ExprNode
right: ExprNode
def __eq__(self, other):
if id(self) == id(other):
return True
if not isinstance(other, ComparisonNode):
return False
return (self.comp == other.comp and
self.left == other.left and
self.right == other.right)
def __hash__(self):
return hash((self.comp, self.left, self.right))
def __repr__(self):
return f"ComparisonNode(start={self.start}, end={self.end}, {self.left!r} {self.comp} {self.right!r})"
def __str__(self):
return f"{self.left} {self.comp} {self.right}"
class ExpressionVisitor:
"""
Pyhtonic implementation of visitors for ExprNode
"""
def visit(self, expr_node):
name = expr_node.__class__.__name__
method = 'visit_' + name
visitor = getattr(self, method, self.generic_visit)
return visitor(expr_node)
def generic_visit(self, expr_node):
"""Called if no explicit visitor function exists for a node."""
for field, value in expr_node.__dict__.items():
if isinstance(value, (list, tuple)):
for item in value:
if isinstance(item, ExprNode):
self.visit(item)
elif isinstance(value, ExprNode):
self.visit(value)
class TrueifyVisitor(ExpressionVisitor):
"""
Visit an ExprNode
replace all the nodes containing a variable to 'trueify' with True
The node containing both variables to trueify and to skip are skipped
"""
def __init__(self, to_trueify, to_skip):
self.to_trueify = to_trueify
self.to_skip = to_skip
def visit_AndNode(self, expr_node):
parts = []
for part in expr_node.parts:
parts.append(self.visit(part))
return AndNode(expr_node.start, expr_node.end, expr_node.tokens, *parts)
def visit_OrNode(self, expr_node):
parts = []
for part in expr_node.parts:
parts.append(self.visit(part))
return OrNode(expr_node.start, expr_node.end, expr_node.tokens, *parts)
def visit_NameExprNode(self, expr_node):
return_true = False
for t in expr_node.tokens:
if t.type == TokenKind.IDENTIFIER:
if t.value in self.to_skip:
return expr_node
if t.value in self.to_trueify:
return_true = True
return NameExprNode(expr_node.start,
expr_node.end,
[Token(TokenKind.IDENTIFIER, "True", -1, -1, -1)]) if return_true else expr_node
is_question_tokens = list(Tokenizer("is_question()"))
eval_question_requested_in_context = list(Tokenizer("context.in_context(BuiltinConcepts.EVAL_QUESTION_REQUESTED)"))
class IsAQuestionVisitor(ExpressionVisitor):
"""
visit an expression and return True if is_question or context.in_context(BuiltinConcepts.EVAL_QUESTION_REQUESTED)
if found.
"""
def visit_NameExprNode(self, expr_node):
if tokens_are_matching(expr_node.tokens, is_question_tokens) or \
tokens_are_matching(expr_node.tokens, eval_question_requested_in_context):
return True
return None
def visit_AndNode(self, expr_node):
"""
AND | True | False | None
------+-------+-------+----------
False | False | False | False
True | True | False | True
None | True | False | None
"""
res = self.visit(expr_node.parts[0])
if isinstance(res, bool) and not res:
return res
for part in expr_node.parts[1:]:
visited = self.visit(part)
if isinstance(visited, bool):
if not visited:
return visited
else:
res = visited
return res
def visit_OrNode(self, expr_node):
"""
OR | True | False | None
------+-------+-------+----------
True | True | True | True
False | True | False | False
None | True | False | None
"""
res = self.visit(expr_node.parts[0])
if isinstance(res, bool) and res:
return res
for part in expr_node.parts[1:]:
visited = self.visit(part)
if isinstance(visited, bool):
if visited:
return visited
else:
res = visited
return res
def visit_NotNode(self, expr_node):
"""
| NOT
------+-------
False | True
True | False
None | None
"""
visited = self.visit(expr_node.node)
return None if visited is None else not visited
def is_a_question(self, expr_node):
res = self.visit(expr_node)
return isinstance(res, bool) and res