First version of explain. Creating a new parser was a wrong approach. Need to reimplement
This commit is contained in:
@@ -469,6 +469,50 @@ class CNC(CN):
|
||||
return txt + ")"
|
||||
|
||||
|
||||
class UTN(HelperWithPos):
|
||||
"""
|
||||
Tester class for UnrecognizedTokenNode
|
||||
compare the source, and start, end if defined
|
||||
"""
|
||||
|
||||
def __init__(self, source, start=None, end=None):
|
||||
"""
|
||||
:param concept: Concept or concept_key (only the key is used anyway)
|
||||
:param start:
|
||||
:param end:
|
||||
:param source:
|
||||
"""
|
||||
super().__init__(start, end)
|
||||
self.source = source
|
||||
|
||||
def __eq__(self, other):
|
||||
if id(self) == id(other):
|
||||
return True
|
||||
|
||||
if isinstance(other, UnrecognizedTokensNode):
|
||||
return self.start == other.start and \
|
||||
self.end == other.end and \
|
||||
self.source == other.source
|
||||
|
||||
if not isinstance(other, UTN):
|
||||
return False
|
||||
|
||||
return self.start == other.start and \
|
||||
self.end == other.end and \
|
||||
self.source == other.source
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.source, self.start, self.end))
|
||||
|
||||
def __repr__(self):
|
||||
txt = f"UTN( source='{self.source}'"
|
||||
if self.start is not None:
|
||||
txt += f", start={self.start}"
|
||||
if self.end is not None:
|
||||
txt += f", end={self.end}"
|
||||
return txt + ")"
|
||||
|
||||
|
||||
class BaseNodeParser(BaseParser):
|
||||
def __init__(self, name, priority, **kwargs):
|
||||
super().__init__(name, priority)
|
||||
@@ -623,47 +667,3 @@ class BaseNodeParser(BaseParser):
|
||||
return token.value.value
|
||||
else:
|
||||
return token.value
|
||||
|
||||
|
||||
class UTN(HelperWithPos):
|
||||
"""
|
||||
Tester class for UnrecognizedTokenNode
|
||||
compare the source, and start, end if defined
|
||||
"""
|
||||
|
||||
def __init__(self, source, start=None, end=None):
|
||||
"""
|
||||
:param concept: Concept or concept_key (only the key is used anyway)
|
||||
:param start:
|
||||
:param end:
|
||||
:param source:
|
||||
"""
|
||||
super().__init__(start, end)
|
||||
self.source = source
|
||||
|
||||
def __eq__(self, other):
|
||||
if id(self) == id(other):
|
||||
return True
|
||||
|
||||
if isinstance(other, UnrecognizedTokensNode):
|
||||
return self.start == other.start and \
|
||||
self.end == other.end and \
|
||||
self.source == other.source
|
||||
|
||||
if not isinstance(other, UTN):
|
||||
return False
|
||||
|
||||
return self.start == other.start and \
|
||||
self.end == other.end and \
|
||||
self.source == other.source
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.source, self.start, self.end))
|
||||
|
||||
def __repr__(self):
|
||||
txt = f"UTN( source='{self.source}'"
|
||||
if self.start is not None:
|
||||
txt += f", start={self.start}"
|
||||
if self.end is not None:
|
||||
txt += f", end={self.end}"
|
||||
return txt + ")"
|
||||
|
||||
+166
-3
@@ -1,11 +1,12 @@
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
|
||||
import core.utils
|
||||
from core.builtin_concepts import BuiltinConcepts, ParserResultConcept
|
||||
from core.concept import Concept
|
||||
from core.tokenizer import TokenKind, Keywords, Token, Tokenizer
|
||||
from core.sheerka.ExecutionContext import ExecutionContext
|
||||
from core.sheerka_logger import get_logger
|
||||
import core.utils
|
||||
import logging
|
||||
from core.tokenizer import TokenKind, Keywords, Token, Tokenizer
|
||||
|
||||
|
||||
@dataclass()
|
||||
@@ -65,6 +66,11 @@ class UnexpectedTokenErrorNode(ErrorNode):
|
||||
return hash((self.message, self.token, self.expected_tokens))
|
||||
|
||||
|
||||
@dataclass()
|
||||
class UnexpectedEof(ErrorNode):
|
||||
message: str
|
||||
|
||||
|
||||
class BaseParser:
|
||||
PREFIX = "parsers."
|
||||
|
||||
@@ -203,3 +209,160 @@ class BaseParser:
|
||||
value = switcher.get(token.type, lambda t: t.value)(token)
|
||||
res += value
|
||||
return res
|
||||
|
||||
|
||||
class BaseTokenizerIterParser(BaseParser):
|
||||
|
||||
def __init__(self, name, priority, parse_word=False, none_on_eof=True):
|
||||
super().__init__(name, priority)
|
||||
self.lexer_iter = None
|
||||
self._current = None
|
||||
self.context: ExecutionContext = None
|
||||
self.text = None
|
||||
self.sheerka = None
|
||||
|
||||
self.parse_word = parse_word
|
||||
self.none_on_eof = none_on_eof
|
||||
|
||||
def reset_parser(self, context, text):
|
||||
self.context = context
|
||||
self.sheerka = context.sheerka
|
||||
|
||||
self.text = text
|
||||
self.lexer_iter = iter(Tokenizer(text, self.parse_word))
|
||||
self._current = None
|
||||
|
||||
self.next_token()
|
||||
|
||||
def add_error(self, error, next_token=True):
|
||||
self.error_sink.append(error)
|
||||
if next_token:
|
||||
self.next_token()
|
||||
return error
|
||||
|
||||
def get_token(self) -> Token:
|
||||
return self._current
|
||||
|
||||
def next_token(self, skip_whitespace=True):
|
||||
try:
|
||||
self._current = next(self.lexer_iter)
|
||||
|
||||
if self.none_on_eof and self._current.type == TokenKind.EOF:
|
||||
self._current = None
|
||||
return False
|
||||
|
||||
if skip_whitespace:
|
||||
while self._current.type == TokenKind.WHITESPACE or self._current.type == TokenKind.NEWLINE:
|
||||
self._current = next(self.lexer_iter)
|
||||
except StopIteration:
|
||||
self._current = None
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class BaseSplitIterParser(BaseParser):
|
||||
|
||||
def __init__(self, name, priority, none_on_eof=False):
|
||||
super().__init__(name, priority)
|
||||
self._current = None
|
||||
self.context: ExecutionContext = None
|
||||
self.text = None
|
||||
self.sheerka = None
|
||||
self.iter_split = None
|
||||
self.split_and_eat_tokens = (" ", "\n", "\t")
|
||||
self.split_and_keep_tokens = ("=", ")", "(", ",")
|
||||
self.split_tokens = self.split_and_eat_tokens + self.split_and_keep_tokens
|
||||
|
||||
self.none_on_eof = none_on_eof # current token is set to None when EOF is hit
|
||||
|
||||
def parse_word(self, c, index, line, column):
|
||||
end = self.split_tokens
|
||||
escaped = False
|
||||
buffer = ""
|
||||
|
||||
while escaped or c not in end:
|
||||
if not escaped and c == "\\":
|
||||
escaped = True
|
||||
elif not escaped and c in ("'", '"'):
|
||||
end = [c]
|
||||
else:
|
||||
buffer += c
|
||||
escaped = False
|
||||
|
||||
index, column = index + 1, column + 1
|
||||
if index == len(self.text):
|
||||
break
|
||||
c = self.text[index]
|
||||
|
||||
if c == "\n":
|
||||
line += 1
|
||||
column = 0
|
||||
|
||||
if c not in self.split_and_keep_tokens: # 'not in' instead of 'in' to when c is a quote
|
||||
index, column = index + 1, column + 1
|
||||
|
||||
return buffer, index, line, column
|
||||
|
||||
def split(self):
|
||||
index = 0
|
||||
line = 1
|
||||
column = 1
|
||||
|
||||
while index < len(self.text):
|
||||
c = self.text[index]
|
||||
|
||||
if c == "=":
|
||||
if index + 1 < len(self.text) and self.text[index + 1] == "=":
|
||||
yield Token(TokenKind.EQUALSEQUALS, "==", index, line, column)
|
||||
index, column = index + 2, column + 2
|
||||
else:
|
||||
yield Token(TokenKind.EQUALS, "=", index, line, column)
|
||||
index, column = index + 1, column + 1
|
||||
elif c == ")":
|
||||
yield Token(TokenKind.RPAR, ")", index, line, column)
|
||||
index, column = index + 1, column + 1
|
||||
elif c == "(":
|
||||
yield Token(TokenKind.LPAR, "(", index, line, column)
|
||||
index, column = index + 1, column + 1
|
||||
elif c == ",":
|
||||
yield Token(TokenKind.COMMA, ",", index, line, column)
|
||||
index, column = index + 1, column + 1
|
||||
else:
|
||||
|
||||
buffer, end_index, end_line, end_column = self.parse_word(c, index, line, column)
|
||||
if buffer:
|
||||
yield Token(TokenKind.WORD, buffer, index, line, column)
|
||||
index, line, column = end_index, end_line, end_column
|
||||
|
||||
yield Token(TokenKind.EOF, "<eof>", index, line, column)
|
||||
|
||||
def reset_parser(self, context, text):
|
||||
self.context = context
|
||||
self.sheerka = context.sheerka if context else None
|
||||
|
||||
self.text = text
|
||||
self._current = None
|
||||
self.iter_split = iter(self.split())
|
||||
|
||||
def add_error(self, error, next_token=True):
|
||||
self.error_sink.append(error)
|
||||
if next_token:
|
||||
self.next_token()
|
||||
return error
|
||||
|
||||
def get_token(self) -> Token:
|
||||
return self._current
|
||||
|
||||
def next_token(self):
|
||||
try:
|
||||
self._current = next(self.iter_split)
|
||||
if self._current.type == TokenKind.EOF:
|
||||
if self.none_on_eof:
|
||||
self._current = None
|
||||
return False
|
||||
except StopIteration:
|
||||
self._current = None
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
@@ -0,0 +1,361 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Dict
|
||||
|
||||
from core.builtin_concepts import BuiltinConcepts
|
||||
from core.tokenizer import LexerError, Token
|
||||
from parsers.BaseParser import Node, UnexpectedTokenErrorNode, BaseSplitIterParser, UnexpectedEof, ErrorNode
|
||||
from parsers.ExpressionParser import ExprNode, TrueNode, PropertyEqualsNode, PropertyContainsNode, OrNode, AndNode
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ValueErrorNode(ErrorNode):
|
||||
"""
|
||||
When the value parse has an incorrect type or value
|
||||
"""
|
||||
message: str
|
||||
token: Token # token when the error is detected
|
||||
|
||||
|
||||
@dataclass()
|
||||
class MultipleDigestError(ErrorNode):
|
||||
message: str
|
||||
token: Token
|
||||
|
||||
|
||||
@dataclass()
|
||||
class ExplanationNode(Node):
|
||||
digest: str # digest of the event to explain
|
||||
command: str # original explain command
|
||||
expr: ExprNode = None
|
||||
record_digest: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class FilterNode(ExprNode):
|
||||
"""
|
||||
Wraps predicates
|
||||
"""
|
||||
expr: ExprNode
|
||||
directives: List[ExprNode] = field(default_factory=list)
|
||||
|
||||
def eval(self, obj):
|
||||
return self.expr.eval(obj)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RecurseDefNode(ExprNode):
|
||||
"""
|
||||
It is used to defined the depth of the recursion
|
||||
"""
|
||||
depth: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class FormatLNode(ExprNode):
|
||||
"""
|
||||
Define the template to use for ExecutionContext when printed in line
|
||||
"""
|
||||
template: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class FormatDNode(ExprNode):
|
||||
"""
|
||||
Defines the properties to display, and their format
|
||||
"""
|
||||
properties: Dict[str, str]
|
||||
|
||||
|
||||
@dataclass
|
||||
class UnionNode(ExprNode):
|
||||
"""
|
||||
Define the template to use for ExecutionContext when printed in line
|
||||
"""
|
||||
filters: List[FilterNode]
|
||||
|
||||
def eval(self, obj):
|
||||
if len(self.filters) == 0:
|
||||
return False
|
||||
|
||||
if len(self.filters) == 0:
|
||||
return self.filters[0].eval(obj)
|
||||
|
||||
res = False
|
||||
for f in self.filters[1:]:
|
||||
res |= f.eval(obj)
|
||||
return res
|
||||
|
||||
|
||||
class ExplainParser(BaseSplitIterParser):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__("Explain", 81, none_on_eof=True)
|
||||
|
||||
def parse_explain(self):
|
||||
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
return BuiltinConcepts.IS_EMPTY
|
||||
|
||||
if token.value != 'explain':
|
||||
self.add_error(UnexpectedTokenErrorNode("", token, ["explain"]))
|
||||
return BuiltinConcepts.NOT_FOR_ME
|
||||
|
||||
digest = ""
|
||||
record_digest = False
|
||||
expr_node = UnionNode([FilterNode(TrueNode(), [])])
|
||||
self.next_token()
|
||||
while True:
|
||||
# no need to continue when error
|
||||
if self.has_error:
|
||||
return None
|
||||
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
break
|
||||
|
||||
if token.value == "-f" or token.value == "--filter":
|
||||
self.next_token()
|
||||
expr_node.filters.append(self.parse_filter())
|
||||
elif token.value in ("-r", "--recurse"):
|
||||
self.next_token()
|
||||
expr_node.filters[-1].directives.append(self.parse_recurse())
|
||||
elif token.value == "--format_l":
|
||||
self.next_token()
|
||||
expr_node.filters[-1].directives.append(self.parse_format_l())
|
||||
elif token.value == "--format_d":
|
||||
self.next_token()
|
||||
expr_node.filters[-1].directives.append(self.parse_format_d())
|
||||
elif token.value in ("-d", "--digest"):
|
||||
self.next_token()
|
||||
digest = self.parse_digest(digest)
|
||||
record_digest = True
|
||||
elif token.value.startswith("-"):
|
||||
self.add_error(UnexpectedTokenErrorNode("", token, []))
|
||||
else:
|
||||
digest = self.parse_digest(digest)
|
||||
|
||||
return ExplanationNode(digest, self.text, expr=expr_node, record_digest=record_digest)
|
||||
|
||||
def parse_digest(self, digest):
|
||||
token = self.get_token()
|
||||
if token is None or token.value.startswith("-"):
|
||||
return ""
|
||||
|
||||
if digest != "":
|
||||
self.add_error(MultipleDigestError("Too many digest", token))
|
||||
return None
|
||||
|
||||
digest = token.value
|
||||
self.next_token()
|
||||
return digest
|
||||
|
||||
def parse_filter(self):
|
||||
node = self.parse_or()
|
||||
if node is None:
|
||||
return None
|
||||
return FilterNode(node)
|
||||
|
||||
def parse_or(self):
|
||||
parts = []
|
||||
|
||||
node = self.parse_and()
|
||||
if node is None:
|
||||
return None
|
||||
|
||||
parts.append(node)
|
||||
|
||||
while True:
|
||||
token = self.get_token()
|
||||
if token is None or token.value != "or":
|
||||
break
|
||||
|
||||
self.next_token()
|
||||
node = self.parse_and()
|
||||
if node is None:
|
||||
return None
|
||||
else:
|
||||
parts.append(node)
|
||||
|
||||
return parts[0] if len(parts) == 1 else OrNode(*parts)
|
||||
|
||||
def parse_and(self):
|
||||
parts = []
|
||||
|
||||
node = self.parse_predicate()
|
||||
if node is None:
|
||||
return None
|
||||
|
||||
parts.append(node)
|
||||
|
||||
while True:
|
||||
token = self.get_token()
|
||||
if token is None or token.value != "and":
|
||||
break
|
||||
|
||||
self.next_token()
|
||||
node = self.parse_predicate()
|
||||
if node is None:
|
||||
return None
|
||||
else:
|
||||
parts.append(node)
|
||||
|
||||
return parts[0] if len(parts) == 1 else AndNode(*parts)
|
||||
|
||||
def parse_predicate(self):
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing filter"))
|
||||
return None
|
||||
|
||||
if token.value == "(":
|
||||
self.next_token()
|
||||
expr = self.parse_or()
|
||||
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Missing right parenthesis"))
|
||||
return None
|
||||
if token.value != ")":
|
||||
self.add_error(UnexpectedTokenErrorNode("Parenthesis mismatch", token, [")"]))
|
||||
return None
|
||||
self.next_token()
|
||||
|
||||
else:
|
||||
expr = self.parse_property_predicate()
|
||||
|
||||
return expr
|
||||
|
||||
def parse_recurse(self):
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing recurse"))
|
||||
return None
|
||||
|
||||
try:
|
||||
depth = int(token.value)
|
||||
self.next_token()
|
||||
return RecurseDefNode(depth)
|
||||
except ValueError:
|
||||
self.add_error(ValueErrorNode(f"'{token.value}' is not an integer", token))
|
||||
return None
|
||||
|
||||
def parse_format_l(self):
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing format_l"))
|
||||
return None
|
||||
|
||||
if token.value.startswith("-"):
|
||||
self.add_error(UnexpectedTokenErrorNode("parsing format_l", token, ["<property name>"]))
|
||||
return None
|
||||
|
||||
template = token.value
|
||||
self.next_token()
|
||||
return FormatLNode(template)
|
||||
|
||||
def parse_format_d(self):
|
||||
props = {}
|
||||
|
||||
while TrueNode:
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing format_d"))
|
||||
return None
|
||||
|
||||
if token.value.startswith("-"):
|
||||
self.add_error(UnexpectedTokenErrorNode("parsing format_d", token, ["<property name>"]))
|
||||
return None
|
||||
|
||||
parts = token.value.split(':')
|
||||
if len(parts) == 1:
|
||||
props[token.value] = "{" + token.value + "}"
|
||||
else:
|
||||
props[parts[0]] = parts[1]
|
||||
|
||||
self.next_token()
|
||||
token = self.get_token()
|
||||
|
||||
if token is None or token.value.startswith("-"):
|
||||
break
|
||||
elif token.value == ",":
|
||||
self.next_token()
|
||||
else:
|
||||
self.add_error(UnexpectedTokenErrorNode("parsing format_d", token, ["<eof>", ","]))
|
||||
|
||||
return FormatDNode(props)
|
||||
|
||||
def parse_property_predicate(self):
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing predicate"))
|
||||
return None
|
||||
prop_name = token.value
|
||||
if prop_name.startswith("-"):
|
||||
self.add_error(UnexpectedTokenErrorNode("while parsing predicate", token, ["<property_name>"]))
|
||||
return None
|
||||
self.next_token()
|
||||
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing predicate"))
|
||||
return None
|
||||
operand = token.value
|
||||
|
||||
if operand not in ("=", "=="):
|
||||
self.add_error(UnexpectedTokenErrorNode("Unexpected token when parsing predicate", token, ['=', "=="]))
|
||||
return None
|
||||
self.next_token()
|
||||
|
||||
token = self.get_token()
|
||||
if token is None:
|
||||
self.add_error(UnexpectedEof("Unexpected EOF while parsing filter"))
|
||||
return None
|
||||
self.next_token()
|
||||
prop_value = token.value
|
||||
|
||||
return PropertyEqualsNode(prop_name, prop_value) if operand == "==" else \
|
||||
PropertyContainsNode(prop_name, prop_value)
|
||||
|
||||
def parse(self, context, parser_input):
|
||||
"""
|
||||
text can be string, but text can also be an list of tokens
|
||||
:param context:
|
||||
:param parser_input:
|
||||
:return:
|
||||
"""
|
||||
|
||||
context.log(f"Parsing '{parser_input}'", self.name)
|
||||
sheerka = context.sheerka
|
||||
|
||||
if not isinstance(parser_input, str):
|
||||
return sheerka.ret(self.name, False, sheerka.new(BuiltinConcepts.NOT_FOR_ME, reason=parser_input))
|
||||
|
||||
explanation_node = None
|
||||
try:
|
||||
self.reset_parser(context, parser_input)
|
||||
self.next_token()
|
||||
explanation_node = self.parse_explain()
|
||||
except LexerError as e:
|
||||
self.add_error(e, False)
|
||||
|
||||
if self.has_error or not isinstance(explanation_node, ExplanationNode):
|
||||
if explanation_node in (BuiltinConcepts.NOT_FOR_ME, BuiltinConcepts.IS_EMPTY):
|
||||
error_body = sheerka.new(
|
||||
BuiltinConcepts.NOT_FOR_ME,
|
||||
body=parser_input,
|
||||
reason=self.error_sink if self.has_error else BuiltinConcepts.IS_EMPTY)
|
||||
else:
|
||||
error_body = sheerka.new(
|
||||
BuiltinConcepts.ERROR,
|
||||
body=self.error_sink)
|
||||
ret = sheerka.ret(self.name, False, error_body)
|
||||
else:
|
||||
ret = sheerka.ret(self.name, True,
|
||||
sheerka.new(
|
||||
BuiltinConcepts.PARSER_RESULT,
|
||||
parser=self,
|
||||
source=parser_input,
|
||||
body=explanation_node))
|
||||
|
||||
self.log_result(context, parser_input, ret)
|
||||
return ret
|
||||
@@ -0,0 +1,177 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Tuple, Callable
|
||||
|
||||
from core.builtin_concepts import BuiltinConcepts
|
||||
from core.concept import Concept
|
||||
from parsers.BaseParser import Node
|
||||
|
||||
|
||||
class ExprNode(Node):
|
||||
"""
|
||||
Base ExprNode
|
||||
eval() must be overridden
|
||||
"""
|
||||
|
||||
def eval(self, obj):
|
||||
return True
|
||||
|
||||
|
||||
@dataclass
|
||||
class PropertyEqualsNode(ExprNode):
|
||||
prop: str
|
||||
value: object
|
||||
|
||||
def eval(self, obj):
|
||||
if hasattr(obj, self.prop):
|
||||
return str(getattr(obj, self.prop)) == self.value
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@dataclass()
|
||||
class PropertyContainsNode(ExprNode):
|
||||
prop: str
|
||||
value: object
|
||||
|
||||
def eval(self, obj):
|
||||
if hasattr(obj, self.prop):
|
||||
return self.value in str(getattr(obj, self.prop))
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@dataclass
|
||||
class PropertyEqualsSequenceNode(ExprNode):
|
||||
"""
|
||||
To use when the test must be done across parent and child
|
||||
"""
|
||||
props: List[str]
|
||||
values: List[object]
|
||||
|
||||
def eval(self, obj):
|
||||
index = len(self.props) - 1
|
||||
|
||||
while True:
|
||||
if not hasattr(obj, self.props[index]) or getattr(obj, self.props[index]) != self.values[index]:
|
||||
return False
|
||||
|
||||
if index == 0:
|
||||
break
|
||||
|
||||
index -= 1
|
||||
obj = obj.get_parent() if hasattr(obj, "get_parent") else obj.parent
|
||||
if obj is None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@dataclass()
|
||||
class IsaNode(ExprNode):
|
||||
"""
|
||||
To use to replicate instanceof, sheerka.instanceof,
|
||||
"""
|
||||
obj_class: object
|
||||
|
||||
def eval(self, obj):
|
||||
if isinstance(self.obj_class, type):
|
||||
return isinstance(obj, self.obj_class)
|
||||
|
||||
if isinstance(self.obj_class, (BuiltinConcepts, str)):
|
||||
return isinstance(obj, Concept) and str(self.obj_class) == obj.key
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@dataclass()
|
||||
class LambdaNode(ExprNode):
|
||||
"""
|
||||
Generic expression to ease the tests
|
||||
"""
|
||||
lambda_exp: Callable[[object], bool]
|
||||
|
||||
def eval(self, obj):
|
||||
try:
|
||||
return self.lambda_exp(obj)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@dataclass(init=False)
|
||||
class AndNode(ExprNode):
|
||||
parts: Tuple[ExprNode]
|
||||
|
||||
def __init__(self, *parts: ExprNode):
|
||||
self.parts = parts
|
||||
|
||||
def eval(self, obj):
|
||||
res = self.parts[0].eval(obj) and self.parts[1].eval(obj)
|
||||
for part in self.parts[2:]:
|
||||
res &= part.eval(obj)
|
||||
return res
|
||||
|
||||
|
||||
@dataclass(init=False)
|
||||
class OrNode(ExprNode):
|
||||
parts: Tuple[ExprNode]
|
||||
|
||||
def __init__(self, *parts: ExprNode):
|
||||
self.parts = parts
|
||||
|
||||
def eval(self, obj):
|
||||
res = self.parts[0].eval(obj) or self.parts[1].eval(obj)
|
||||
for part in self.parts[2:]:
|
||||
res |= part.eval(obj)
|
||||
return res
|
||||
|
||||
|
||||
@dataclass()
|
||||
class NotNode(ExprNode):
|
||||
node: ExprNode
|
||||
|
||||
def eval(self, obj):
|
||||
return not self.node.eval(obj)
|
||||
|
||||
|
||||
class FalseNode(ExprNode):
|
||||
def eval(self, obj):
|
||||
return False
|
||||
|
||||
|
||||
class TrueNode(ExprNode):
|
||||
def eval(self, obj):
|
||||
return True
|
||||
|
||||
|
||||
class ExpressionParser:
|
||||
"""
|
||||
will parser logic expression
|
||||
like not (a and b or c)
|
||||
|
||||
The nodes can be used for custom filtering (ex with ExplanationConcept)
|
||||
Or to help to understand why a python expression returns True or False
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ExpressionVisitor:
|
||||
"""
|
||||
Pyhtonic implementation of visitors for ExprNode
|
||||
"""
|
||||
|
||||
def visit(self, expr_node):
|
||||
name = expr_node.__class__.__name__
|
||||
|
||||
method = 'visit_' + name
|
||||
visitor = getattr(self, method, self.generic_visit)
|
||||
return visitor(expr_node)
|
||||
|
||||
def generic_visit(self, expr_node):
|
||||
"""Called if no explicit visitor function exists for a node."""
|
||||
for field, value in expr_node.__dict__.items():
|
||||
if isinstance(value, (list, tuple)):
|
||||
for item in value:
|
||||
if isinstance(item, ExprNode):
|
||||
self.visit(item)
|
||||
elif isinstance(value, ExprNode):
|
||||
self.visit(value)
|
||||
Reference in New Issue
Block a user