import ast from dataclasses import dataclass from typing import List, Union from core.builtin_concepts import ParserResultConcept, ReturnValueConcept from core.builtin_helpers import CreateObjectIdentifiers from core.concept import AllConceptParts, Concept, ConceptParts, DoNotResolve from core.rule import Rule from core.tokenizer import Token, TokenKind, Tokenizer from core.utils import get_text_from_tokens, str_concept, tokens_index from parsers.BaseExpressionParser import AndNode, ComparisonNode, ComparisonType, Comprehension, FunctionParameter, \ ListComprehensionNode, ListNode, NameExprNode, \ NotNode, OrNode, VariableNode, comma from parsers.BaseNodeParser import ConceptNode, RuleNode, SourceCodeNode, SourceCodeWithConceptNode, \ UnrecognizedTokensNode from parsers.FunctionParser import FunctionNode from parsers.PythonParser import PythonNode from sheerkapython.python_wrapper import sheerka_globals from sheerkarete.common import V from sheerkarete.conditions import AndConditions, Condition, NegatedCondition, NegatedConjunctiveConditions class ExprTestObj: @staticmethod def get_pos(nodes): start, end = None, None for n in nodes: if start is None or start > n.start: start = n.start if end is None or end < n.end: end = n.end return start, end @staticmethod def get_pos_from_source(source, full_text_as_tokens): if isinstance(source, tuple): source, to_skip = source[0], source[1] else: to_skip = 0 source_as_node = list(Tokenizer(source, yield_eof=False)) start = tokens_index(full_text_as_tokens, source_as_node, skip=to_skip) end = start + len(source_as_node) - 1 return start, end @staticmethod def as_tokens(source): if isinstance(source, tuple): source, to_skip = source else: source, to_skip = source, 0 return list(Tokenizer(source, yield_eof=False)), to_skip def get_expr_node(self, full_text_as_tokens=None): raise NotImplementedError() @staticmethod def safe_get_expr_node(obj, full_text_as_tokens): if obj is None: return None obj = EXPR(obj) if isinstance(obj, (str, tuple)) else obj return obj.get_expr_node(full_text_as_tokens) class AND(ExprTestObj): """ Test class for AndNode""" def __init__(self, *parts, source=None): self.parts = parts self.source = source def get_expr_node(self, full_text_as_tokens=None): parts = [part.get_expr_node(full_text_as_tokens) for part in self.parts] start, end = self.get_pos_from_source(self.source, full_text_as_tokens) if self.source else self.get_pos(parts) return AndNode(start, end, full_text_as_tokens[start: end + 1], *parts) class OR(ExprTestObj): """ Test class for OrNode""" def __init__(self, *parts, source=None): self.parts = parts self.source = source def get_expr_node(self, full_text_as_tokens=None): parts = [part.get_expr_node(full_text_as_tokens) for part in self.parts] start, end = self.get_pos_from_source(self.source, full_text_as_tokens) if self.source else self.get_pos(parts) return OrNode(start, end, full_text_as_tokens[start: end + 1], *parts) @dataclass class NOT(ExprTestObj): """ Test class for NotNode""" expr: ExprTestObj source: str = None def get_expr_node(self, full_text_as_tokens=None): part = self.expr.get_expr_node(full_text_as_tokens) start, end = self.get_pos_from_source(self.source, full_text_as_tokens) if self.source else ( part.start - 2, part.end) return NotNode(start, end, full_text_as_tokens[start: end + 1], part) @dataclass class EXPR(ExprTestObj): """Test class for NameNode""" source: str def get_expr_node(self, full_text_as_tokens=None): value_as_tokens, to_skip = self.as_tokens(self.source) start = tokens_index(full_text_as_tokens, value_as_tokens, to_skip) end = start + len(value_as_tokens) - 1 return NameExprNode(start, end, full_text_as_tokens[start: end + 1]) @dataclass class VAR(ExprTestObj): """Test class for VarNode""" full_name: str source: str = None def get_expr_node(self, full_text_as_tokens=None): value_as_tokens = list(Tokenizer(self.source or self.full_name, yield_eof=False)) start = tokens_index(full_text_as_tokens, value_as_tokens, 0) end = start + len(value_as_tokens) - 1 parts = self.full_name.split(".") if len(parts) == 1: return VariableNode(start, end, full_text_as_tokens[start: end + 1], parts[0]) else: return VariableNode(start, end, full_text_as_tokens[start: end + 1], parts[0], *parts[1:]) @dataclass class CompExprTestObj(ExprTestObj): """ Test object for comparison ==, <=, ... """ left: ExprTestObj right: ExprTestObj source: str = None def get_expr_node(self, full_text_as_tokens=None): node_type = comparison_type_mapping[type(self).__name__] left_node = self.left.get_expr_node(full_text_as_tokens) right_node = self.right.get_expr_node(full_text_as_tokens) start, end = self.get_pos_from_source(self.source, full_text_as_tokens) if self.source else \ self.get_pos([left_node, right_node]) return ComparisonNode(start, end, full_text_as_tokens[start: end + 1], node_type, left_node, right_node) @dataclass class EQ(CompExprTestObj): pass @dataclass class NEQ(CompExprTestObj): pass @dataclass class GT(CompExprTestObj): pass @dataclass class GTE(CompExprTestObj): pass @dataclass class LT(CompExprTestObj): pass @dataclass class LTE(CompExprTestObj): pass @dataclass class IN(CompExprTestObj): pass @dataclass class NIN(CompExprTestObj): # for NOT INT pass @dataclass class PAREN(ExprTestObj): # for parenthesis node node: object source: str = None class L_EXPR(ExprTestObj): def __init__(self, first, last, *items, sep=None, source=None): self.first = first self.last = last self.items = items self.sep = sep or comma self.source = source def get_expr_node(self, full_text_as_tokens=None): first = self.safe_get_expr_node(self.first, full_text_as_tokens) last = self.safe_get_expr_node(self.last, full_text_as_tokens) items = [self.safe_get_expr_node(item, full_text_as_tokens) for item in self.items] if self.source is None: source = self.first if self.first else "" source += f"{self.sep.value} ".join(item.get_source() for item in items) if self.last: source += self.last else: source = self.source start, end = self.get_pos_from_source(source, full_text_as_tokens) return ListNode(start, end, full_text_as_tokens[start: end + 1], first, last, items, self.sep) @dataclass class LCC: """ List comprehension comprehension """ target: object iterable: object if_expr: object @dataclass class LC(ExprTestObj): # for List Comprehension node element: object generators: list source: str = None def get_expr_node(self, full_text_as_tokens=None): # first transform str into NameExprTestObj (ie EXPR) if isinstance(self.element, str): self.element = EXPR(self.element) comprehensions = [] nodes = [] for comp in self.generators: target = EXPR(comp[0]) if isinstance(comp[0], (str, tuple)) else comp[0] iterable = EXPR(comp[1]) if isinstance(comp[1], (str, tuple)) else comp[1] if_expr = EXPR(comp[2]) if isinstance(comp[2], (str, tuple)) else comp[2] comprehensions.append(LCC(target, iterable, if_expr)) self.generators = comprehensions # then transform into ListComprehensionNode element = self.element.get_expr_node(full_text_as_tokens) nodes.append(element) comprehensions = [] for comp in self.generators: target = comp.target.get_expr_node(full_text_as_tokens) iterable = comp.iterable.get_expr_node(full_text_as_tokens) if_expr = comp.if_expr.get_expr_node(full_text_as_tokens) if comp.if_expr else None comprehensions.append(Comprehension(target, iterable, if_expr)) nodes.extend([target, iterable, if_expr]) start, end = self.get_pos_from_source(self.source, full_text_as_tokens) if self.source else self.get_pos(nodes) return ListComprehensionNode(start, end, full_text_as_tokens[start: end + 1], element, comprehensions) class FN(ExprTestObj): """ Test class only It matches with FunctionNode but with less constraints Thereby, FN("first", "last", ["param1," ...]) can be compared to FunctionNode(NameExprNode("first"), NameExprNode("second"), [FunctionParameter(NamesNodes("param1"), NamesNodes(", ")]) Note that FunctionParameter can easily be defined with a single string * "param" -> FunctionParameter(NameExprNode("param"), None) * "param, " -> FunctionParameter(NameExprNode("param"), NameExprNode(", ")) For more complicated situations, you can use a tuple (value, sep) to define the value part and the separator part """ def __init__(self, first, last, parameters): self.first = first self.last = last self.parameters = [] for param in parameters: if isinstance(param, tuple): self.parameters.append(param) elif isinstance(param, str) and (pos := param.find(",")) != -1: self.parameters.append((param[:pos], param[pos:])) else: self.parameters.append((param, None)) def __repr__(self): res = self.first for param in self.parameters: if param[1]: res += f"{param[0]}{param[1]} " else: res += f"{param[0]}" return res + self.last def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, FN): return self.first == other.first and self.last == other.last and self.parameters == other.parameters return False def __hash__(self): return hash((self.first, self.last, self.parameters)) def transform_real_obj(self, other, get_test_obj_delegate): if isinstance(other, FN): return other if isinstance(other, FunctionNode): params = [] for self_parameter, other_parameter in zip(self.parameters, other.parameters): if isinstance(self_parameter[0], str): value = other_parameter.value.value else: value = get_test_obj_delegate(other_parameter.value, self_parameter[0]) sep = other_parameter.separator.value if other_parameter.separator else None params.append((value, sep)) return FN(other.first.value, other.last.value, params) raise Exception(f"Expecting FunctionNode but received {other=}") def get_expr_node(self, full_text_as_tokens=None): start, end = self.get_pos_from_source(self.first, full_text_as_tokens) first = NameExprNode(start, end, full_text_as_tokens[start: end + 1]) start, end = self.get_pos_from_source(self.last, full_text_as_tokens) last = NameExprNode(start, end, full_text_as_tokens[start: end + 1]) parameters = [] for param_value, sep in self.parameters: if isinstance(param_value, str): start, end = self.get_pos_from_source(param_value, full_text_as_tokens) param_as_expr_node = NameExprNode(start, end, full_text_as_tokens[start: end + 1]) else: param_as_expr_node = param_value.get_expr_node(full_text_as_tokens) if sep: sep_tokens = Tokenizer(sep, yield_eof=False) start = param_as_expr_node.end + 1 end = start + len(list(sep_tokens)) - 1 sep_as_expr_node = NameExprNode(start, end, full_text_as_tokens[start: end + 1]) else: sep_as_expr_node = None parameters.append(FunctionParameter(param_as_expr_node, sep_as_expr_node)) start, end = first.start, last.end return FunctionNode(start, end, full_text_as_tokens[start: end + 1], first, last, parameters) class HelperWithPos: def __init__(self, start=None, end=None): self.start = start self.end = end self.start_is_fixed = start is not None self.end_is_fixed = end is not None def fix_pos(self, node): """ :param node: an object or a tuple :return: """ if hasattr(node, "start"): target_start, target_end = node.start, node.end elif isinstance(node, tuple): target_start, target_end = node else: target_start, target_end = None, None if not self.start_is_fixed: if target_start is not None and (self.start is None or target_start < self.start): self.start = target_start if not self.end_is_fixed: if target_end is not None and (self.end is None or target_end > self.end): self.end = target_end return self class CC: """ Concept class for test purpose CC means concept for compiled (or concept with compiled) It matches a concept if the compiles are equals """ # The only properties that are testes are concept_key and compiled # The other properties (concept, source, start and end) # are used in tests/parsers/parsers_utils.py to help creating helper objects def __init__(self, concept, source=None, exclude_body=False, **kwargs): self.concept_key = concept.key if isinstance(concept, Concept) else concept self.compiled = kwargs self.concept = concept if isinstance(concept, Concept) else None self.source = source # to use when the key is different from the sub str to search when filling start and stop self.start = None # for debug purpose, indicate where the concept starts self.end = None # for debug purpose, indicate where the concept ends self.exclude_body = exclude_body if "body" in self.compiled: self.compiled[ConceptParts.BODY] = self.compiled["body"] del self.compiled["body"] def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, Concept): if other.key != self.concept_key: return False if self.exclude_body: to_compare = {k: v for k, v in other.get_compiled().items() if k != ConceptParts.BODY} else: to_compare = other.get_compiled() if self.compiled == to_compare: return True else: return False if not isinstance(other, CC): return False if self.concept_key != other.concept_key: return False return self.compiled == other.compiled def __hash__(self): if self.concept: return hash(self.concept) return hash(self.concept_key) def __repr__(self): if self.concept: txt = f"CC(concept='{self.concept}'" else: txt = f"CC(concept_key='{self.concept_key}'" for k, v in self.compiled.items(): txt += f", {k}='{v}'" return txt + ")" def fix_pos(self, node): start = node.start if hasattr(node, "start") else \ node[0] if isinstance(node, tuple) else None end = node.end if hasattr(node, "end") else \ node[1] if isinstance(node, tuple) else None if start is not None: if self.start is None or start < self.start: self.start = start if end is not None: if self.end is None or end > self.end: self.end = end return self def transform_real_obj(self, other, to_compare_delegate): """ Transform other into CNC, to ease the comparison :param other: :param to_compare_delegate: :return: """ if isinstance(other, CC): return other if isinstance(other, Concept): if self.exclude_body: compiled = {k: v for k, v in other.get_compiled().items() if k != ConceptParts.BODY} else: compiled = other.get_compiled() self_compile_to_use = self.compiled or compiled compiled = to_compare_delegate(self_compile_to_use, compiled, to_compare_delegate) return CC(other, self.source, self.exclude_body, **compiled) raise Exception(f"Expecting Concept but received {other=}") class CB: """ Concept with body only Test class that tests only the body of the concept """ def __init__(self, concept: Union[str, Concept], body: object): self.concept_key = concept.key if isinstance(concept, Concept) else concept self.concept = concept if isinstance(concept, Concept) else None self.body = body def __eq__(self, other): if not isinstance(other, CB): return False return self.concept_key == other.concept_key and self.body == other.body def __hash__(self): return hash((self.concept, self.body)) def __repr__(self): concept_debug = f"concept={self.concept}" if self.concept else f"concept_key={self.concept_key}" return f"CB({concept_debug}, body='{self.body}')" def transform_real_obj(self, other, get_test_obj_delegate): if isinstance(other, CB): return other if isinstance(other, Concept): concept = other.key if not self.concept else other if isinstance(other.body, Concept): body = get_test_obj_delegate(other.body, self.body, get_test_obj_delegate) else: body = other.body return CB(concept, body) raise Exception(f"Expecting Concept but received {other=}") class CV: """ Concept with all values Test class that tests all the values (not the metadata, so not the properties) of a concept """ def __init__(self, concept, **kwargs): self.concept_key = concept.key if isinstance(concept, Concept) else concept self.concept = concept if isinstance(concept, Concept) else None self.values = {} for k, v in kwargs.items(): if f"#{k}#" in AllConceptParts: self.values[f"#{k}#"] = v else: self.values[k] = v def __eq__(self, other): if not isinstance(other, CV): return False return self.concept_key == other.concept_key and self.values == other.values def __hash__(self): return hash((self.concept_key, self.values)) def __repr__(self): concept_debug = f"concept={self.concept}" if self.concept else f"concept_key={self.concept_key}" return f"CV({concept_debug}, values={self.values})" def transform_real_obj(self, other, get_test_obj_delegate): if isinstance(other, CV): return other if isinstance(other, Concept): concept = other.key if not self.concept else other values = get_test_obj_delegate(other.values(), self.values, get_test_obj_delegate) return CV(concept, **values) raise Exception(f"Expecting Concept but received {other=}") class CMV: """ Concept with metadata variables CMV stands for Concept Metadata Variables Test class that only compare the key and the metadata variables """ def __init__(self, concept, source=None, **kwargs): self.concept_key = concept.key if isinstance(concept, Concept) else concept self.concept = concept if isinstance(concept, Concept) else None self.variables = kwargs self.source = source # to use when the key is different from the sub str to search when filling start and stop self.start = None # for debug purpose, indicate where the concept starts self.end = None # for debug purpose, indicate where the concept ends def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, CMV): return False if self.concept_key != other.concept_key: return False return self.variables == other.variables def __hash__(self): if self.concept: return hash(self.concept) return hash(self.concept_key) def __repr__(self): if self.concept: txt = f"CMV(concept='{self.concept}'" else: txt = f"CMV(concept_key='{self.concept_key}'" for k, v in self.variables.items(): txt += f", {k}='{v}'" return txt + ")" def fix_pos(self, node): start = node.start if hasattr(node, "start") else \ node[0] if isinstance(node, tuple) else None end = node.end if hasattr(node, "end") else \ node[1] if isinstance(node, tuple) else None if start is not None: if self.start is None or start < self.start: self.start = start if end is not None: if self.end is None or end > self.end: self.end = end return self def transform_real_obj(self, other, get_test_obj_delegate): if isinstance(other, CMV): return other if isinstance(other, Concept): concept = other.key if not self.concept else other variables = {name: value for name, value in other.get_metadata().variables} return CMV(concept, **variables) raise Exception(f"Expecting Concept but received {other=}") class CIO: """ Concept id only only test the id """ def __init__(self, concept, source=None): if isinstance(concept, str): self.concept_name = concept self.concept_id = None self.concept = None elif isinstance(concept, Concept): self.concept_id = concept.id self.concept = concept self.source = source self.start = None self.end = None def set_concept(self, concept): self.concept = concept self.concept_id = concept.id def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, CIO): return False return self.concept_id == other.concept_id def __hash__(self): return hash(self.concept_id) def __repr__(self): return f"CIO(concept='{self.concept}')" if self.concept else f"CIO(name='{self.concept_name}')" def transform_real_obj(self, other, get_test_obj_delegate): if isinstance(other, CIO): return other if isinstance(other, Concept): return CIO(other) raise Exception(f"Expecting Concept but received {other=}") class RETVAL: """ Class helper for return value for parser result """ def __init__(self, source, who=None, parser=None): self.source = source self.who = who self.parser = parser def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, RETVAL): return False return (self.source == other.source and self.who == other.who and self.parser == other.parser) def __hash__(self): return hash((self.source, self.who)) def __repr__(self): txt = f"RV(source='{self.source}'" if self.who is not None: txt += f", who={self.who}" if self.parser is not None: txt += f", parser={self.parser}" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, RETVAL): return other if isinstance(other, ReturnValueConcept): if not isinstance(other.body, ParserResultConcept): raise Exception(f"ParserResultConcept not found body={other.body}") parser_result = other.body return RETVAL(parser_result.source, other.who if self.who is not None else None, parser_result.parser if self.parser is not None else None) raise Exception(f"Expecting ReturnValueConcept but received {other=}") class SCN(HelperWithPos): """ SourceCodeNode tester class It matches with SourceCodeNode but with less constraints SCN == SourceCodeNode if source, start, end (start and end are not validated when None) """ def __init__(self, source, start=None, end=None): super().__init__(start, end) self.source = source def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, SourceCodeNode): if self.source != other.source: return False if self.start is not None and self.start != other.start: return False if self.end is not None and self.end != other.end: return False return True if not isinstance(other, SCN): return False return self.source == other.source and \ self.start == other.start and \ self.end == other.end def __hash__(self): return hash((self.source, self.start, self.end)) def __repr__(self): txt = f"SCN(source='{self.source}'" if self.start is not None: txt += f", start={self.start}" if self.end is not None: txt += f", end={self.end}" return txt + ")" def transform_real_obj(self, other, to_compare_delegate): """ Transform other into CNC, to ease the comparison :param other: :param to_compare_delegate: :return: """ if isinstance(other, SCN): return other if isinstance(other, SourceCodeNode): return SCN(other.source, other.start if self.start is not None else None, other.end if self.end is not None else None) raise Exception(f"Expecting SourceCodeNode but received {other=}") class SCWC(HelperWithPos): """ SourceNodeWithConcept tester class It matches with a SourceNodeWithConcept but it's easier to instantiate during the tests """ def __init__(self, first, last, *args): super().__init__(None, None) self.first = first self.last = last self.content = list(args) def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, SourceCodeWithConceptNode): if self.first != other.first: return False if self.last != other.last: return False if len(self.content) != len(other.nodes): return False for self_node, other_node in zip(self.content, other.nodes): if self_node != other_node: return False # at last return True if not isinstance(other, SCWC): return False return (self.start == other.start and self.end == other.end and self.first == other.first and self.last == other.last and self.content == other.content) def __repr__(self): txt = "SCWC(" if self.start is not None: txt += f"start={self.start}" if self.end is not None: txt += f", end={self.end}" for item in [self.first, self.last, *self.content]: txt += f", {item}" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, SCWC): return other if isinstance(other, SourceCodeWithConceptNode): first = get_test_obj_delegate(other.first, self.first) last = get_test_obj_delegate(other.last, self.last) content = [get_test_obj_delegate(r, t) for r, t in zip(other.nodes, self.content)] res = SCWC(first, last, *content) res.start = other.start res.end = other.end return res raise Exception(f"Expecting SourceCodeWithConceptNode but received {other=}") @property def source(self): """ this code is a copy and paste from SourceCodeWithConceptNode.pseudo_fix_source TODO: create a common function or whatever... :return: """ source = self.first.source if hasattr(self.first, "source") else self.first for n in self.content: source += " " if hasattr(n, "source"): source += n.source elif hasattr(n, "concept"): source += str(n.concept) else: source += " unknown" source += self.last.source if hasattr(self.last, "source") else self.last return source class CN(HelperWithPos): """ ConceptNode tester class It matches with ConceptNode but with less constraints CN == ConceptNode if concept key, start, end and source are the same """ def __init__(self, concept, source=None, start=None, end=None): """ :param concept: Concept or concept_key (only the key is used anyway) :param start: :param end: :param source: """ super().__init__(start, end) self.concept_key = concept.key if isinstance(concept, Concept) else concept self.source = source self.concept = concept if isinstance(concept, Concept) else None def fix_source(self, str_tokens): self.source = "".join(str_tokens) return self def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, CN): return False return self.concept_key == other.concept_key and \ self.start == other.start and \ self.end == other.end and \ self.source == other.source def __hash__(self): return hash((self.concept_key, self.start, self.end, self.source)) def __repr__(self): if self.concept: txt = f"CN(concept='{self.concept}'" else: txt = f"CN(concept_key='{self.concept_key}'" txt += f", source='{self.source}'" if self.start is not None: txt += f", start={self.start}" if self.end is not None: txt += f", end={self.end}" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, CN): return other if isinstance(other, ConceptNode): return CN(other.concept, other.source if self.source is not None else None, other.start if self.start is not None else None, other.end if self.end is not None else None) raise Exception(f"Expecting ConceptNode but received {other=}") class CNC(CN): """ ConceptNode for Compiled tester class It matches with ConceptNode But focuses on the 'compiled' property of the concept CNC == ConceptNode if CNC.get_compiled() == ConceptNode.concept.get_compiled() """ def __init__(self, concept_key, source=None, start=None, end=None, exclude_body=False, **kwargs): super().__init__(concept_key, source, start, end) self.compiled = kwargs self.exclude_body = exclude_body if "body" in self.compiled: self.compiled[ConceptParts.BODY] = self.compiled["body"] del self.compiled["body"] def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, CNC): return False return self.concept_key == other.concept_key and \ self.start == other.start and \ self.end == other.end and \ self.source == other.source and \ self.compiled == other.compiled def __repr__(self): if self.concept: txt = f"CNC(concept='{self.concept}'" else: txt = f"CNC(concept_key='{self.concept_key}'" txt += f", source='{self.source}'" if self.start is not None: txt += f", start={self.start}" if self.end is not None: txt += f", end={self.end}" for k, v in self.compiled.items(): txt += f", {k}='{v}'" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, CNC): return other if isinstance(other, ConceptNode): if self.exclude_body: compiled = {k: v for k, v in other.concept.get_compiled().items() if k != ConceptParts.BODY} else: compiled = other.concept.get_compiled() self_compile_to_use = self.compiled or compiled compiled = get_test_obj_delegate(compiled, self_compile_to_use, get_test_obj_delegate) return CNC(other.concept, other.source if self.source is not None else None, other.start if self.start is not None else None, other.end if self.end is not None else None, self.exclude_body, **compiled) raise Exception(f"Expecting ConceptNode but received {other=}") class UTN(HelperWithPos): """ Tester class for UnrecognizedTokenNode compare the source, and start, end if defined """ def __init__(self, source, start=None, end=None): """ :param source: :param start: :param end: """ super().__init__(start, end) self.source = source def __eq__(self, other): if id(self) == id(other): return True if isinstance(other, UnrecognizedTokensNode): return self.start == other.start and \ self.end == other.end and \ self.source == other.source if not isinstance(other, UTN): return False return self.start == other.start and \ self.end == other.end and \ self.source == other.source def __hash__(self): return hash((self.source, self.start, self.end)) def __repr__(self): txt = f"UTN(source='{self.source}'" if self.start is not None: txt += f", start={self.start}" if self.end is not None: txt += f", end={self.end}" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, UTN): return other if isinstance(other, UnrecognizedTokensNode): return UTN(other.source, other.start, other.end) raise Exception(f"Expecting UnrecognizedTokensNode but received {other=}") class RN(HelperWithPos): """ Helper class to test RuleNode """ def __init__(self, rule, source=None, start=None, end=None): """ :param source: :param start: :param end: """ super().__init__(start, end) self.rule_id = rule.id if isinstance(rule, Rule) else rule self.source = source or str_concept((None, self.rule_id), prefix="r:") if self.rule_id else None self.rule = rule if isinstance(rule, Rule) else None def __eq__(self, other): if id(self) == id(other): return True if not isinstance(other, RN): return False return (self.rule_id == other.rule_id and self.start == other.start and self.end == other.end and self.source == other.source) def __hash__(self): return hash((self.rule_id, self.start, self.end, self.source)) def __repr__(self): if self.rule: txt = f"RN(rule='{self.rule}'" else: txt = f"RN(rule_id='{self.rule_id}'" txt += f", source='{self.source}'" if self.start is not None: txt += f", start={self.start}" if self.end is not None: txt += f", end={self.end}" return txt + ")" def transform_real_obj(self, other, get_test_obj_delegate): """ Transform other into CNC, to ease the comparison :param other: :param get_test_obj_delegate: :return: """ if isinstance(other, RN): return other if isinstance(other, RuleNode): return RN(other.rule, other.source if self.source is not None else None, other.start if self.start is not None else None, other.end if self.end is not None else None) raise Exception(f"Expecting RuleNode but received {other=}") @dataclass() class NEGCOND: """ Represents a NegatedCondition """ condition: str @dataclass() class NCCOND: """ Represents a NegatedConjunctiveConditions """ conditions: List[str] comparison_type_mapping = { "EQ": ComparisonType.EQUALS, "NEQ": ComparisonType.NOT_EQUAlS, "LT": ComparisonType.LESS_THAN, "LTE": ComparisonType.LESS_THAN_OR_EQUALS, "GT": ComparisonType.GREATER_THAN, "GTE": ComparisonType.GREATER_THAN_OR_EQUALS, "IN": ComparisonType.IN, "NIN": ComparisonType.NOT_IN, } def get_expr_node_from_test_node(full_text, test_node): """ Returns EXPR, OR, NOT, AND object to ease the comparison with the real ExprNode """ full_text_as_tokens = list(Tokenizer(full_text, yield_eof=False)) return test_node.get_expr_node(full_text_as_tokens) def _index(tokens, expr, index): """ Finds a sub list in a bigger list :param tokens: :param expr: :param index: :return: """ expected = [token.str_value for token in Tokenizer(expr) if token.type != TokenKind.EOF] for i in range(0, len(tokens) - len(expected) + 1): for j in range(len(expected)): if tokens[i + j] != expected[j]: break else: if index == 0: return i, len(expected) else: index -= 1 raise ValueError(f"substring '{expr}' not found") def compute_debug_array(res): to_compare = [] for r in res: res_debug = [] for token in r.debug: if isinstance(token, Token): if token.type == TokenKind.WHITESPACE: continue else: res_debug.append("T(" + token.value + ")") else: res_debug.append("C(" + token.concept.name + ")") to_compare.append(res_debug) return to_compare def get_node( concepts_map, expression_as_tokens, sub_expr, concept_key=None, skip=0, init_empty_body=False, exclude_body=False): """ Tries to find sub in expression When found, transform it to its correct type :param expression_as_tokens: full expression :param sub_expr: sub expression to search in the full expression :param concepts_map: hash of the known concepts :param concept_key: key of the concept if different from sub_expr :param skip: number of occurrences of sub_expr to skip :param init_empty_body: if True adds the source in the body (actually in compiled.BODY) :param exclude_body: Ask to not compare body :return: """ if isinstance(sub_expr, list): return [get_node(concepts_map, expression_as_tokens, s, concept_key, skip, init_empty_body, exclude_body) for s in sub_expr] if isinstance(sub_expr, tuple): return get_node(concepts_map, expression_as_tokens, sub_expr[0], concept_key, sub_expr[1], init_empty_body, exclude_body) if isinstance(sub_expr, (DoNotResolve, ReturnValueConcept, RETVAL)): return sub_expr if isinstance(sub_expr, SCWC): sub_expr.first = get_node(concepts_map, expression_as_tokens, sub_expr.first, skip=skip) sub_expr.last = get_node(concepts_map, expression_as_tokens, sub_expr.last, skip=skip) sub_expr.content = [get_node(concepts_map, expression_as_tokens, c, skip=skip) for c in sub_expr.content] sub_expr.fix_pos(sub_expr.first) sub_expr.fix_pos(sub_expr.last) return sub_expr if isinstance(sub_expr, SCN): node = get_node(concepts_map, expression_as_tokens, sub_expr.source, skip=skip) sub_expr.fix_pos(node) return sub_expr if isinstance(sub_expr, RN): start, length = _index(expression_as_tokens, sub_expr.source, skip) sub_expr.start = start sub_expr.end = start + length - 1 return sub_expr if isinstance(sub_expr, (CNC, CC, CN, CMV, CIO)): if sub_expr.concept is None or sub_expr.start is None or sub_expr.end is None: concept_node = get_node( concepts_map, expression_as_tokens, sub_expr.source or sub_expr.concept_key, sub_expr.concept_key, skip) if not hasattr(concept_node, "concept"): raise Exception(f"'{sub_expr.concept_key}' is not a concept. Check your map.") concept_found = concept_node.concept sub_expr.concept_key = concept_found.key sub_expr.concept = concept_found sub_expr.fix_pos( (concept_node.start, concept_node.end if hasattr(concept_node, "end") else concept_node.start)) if hasattr(sub_expr, "compiled"): for k, v in sub_expr.compiled.items(): node = get_node(concepts_map, expression_as_tokens, v, exclude_body=exclude_body) # need to get start and end positions if isinstance(v, str) and v in concepts_map: new_value_concept = concepts_map[v] new_value = CC(Concept().update_from(new_value_concept), exclude_body=exclude_body) if init_empty_body: init_body(new_value, concept_found, v) else: new_value = node sub_expr.compiled[k] = new_value sub_expr.fix_pos(node) if init_empty_body: init_body(sub_expr, concept_found, sub_expr.source) if hasattr(sub_expr, "fix_source"): sub_expr.fix_source(expression_as_tokens[sub_expr.start: sub_expr.end + 1]) return sub_expr if isinstance(sub_expr, UTN): node = get_node(concepts_map, expression_as_tokens, sub_expr.source, skip=skip) sub_expr.fix_pos(node) return sub_expr start, length = _index(expression_as_tokens, sub_expr, skip) # try to match one of the concept from the map concept_key = concept_key or sub_expr concept_found = concepts_map.get(concept_key, None) if concept_found: concept_found = Concept().update_from(concept_found) # make a copy when massively used in tests if init_empty_body: node = CNC(concept_found, sub_expr, start, start + length - 1, exclude_body=exclude_body) init_body(node, concept_found, sub_expr) return node else: return CN(concept_found, sub_expr, start, start + length - 1) else: # else an UnrecognizedTokensNode return UTN(sub_expr, start, start + length - 1) def init_body(item, concept, value): if "body" in item.compiled: item.compiled[ConceptParts.BODY] = item.compiled["body"] del (item.compiled["body"]) return if not concept or concept.get_metadata().body or ConceptParts.BODY in item.compiled: return item.compiled[ConceptParts.BODY] = DoNotResolve(value) def compute_expected_array(concepts_map, expression, expected, init_empty_body=False, exclude_body=False): """ Computes a simple but sufficient version of the result of infix_to_postfix() :param concepts_map: :param expression: :param expected: :param init_empty_body: if True adds the source in the body (actually in compiled.BODY) :param exclude_body: do not include ConceptParts.BODY in comparison :return: """ expression_as_tokens = [token.str_value for token in Tokenizer(expression) if token.type != TokenKind.EOF] return [get_node( concepts_map, expression_as_tokens, sub_expr, init_empty_body=init_empty_body, exclude_body=exclude_body) for sub_expr in expected] def get_unrecognized_node(start, text): tokens = list(Tokenizer(text, yield_eof=False)) return UnrecognizedTokensNode(start, start + len(tokens) - 1, tokens) def get_source_code_node(start, text, concepts_map, id_manager=None): id_manager = id_manager or CreateObjectIdentifiers() id_mapping = {} concept_mapping_by_id = {} # get the concepts, mapped by their new id for concept_name, concept in concepts_map.items(): concept_identifier = id_manager.get_identifier(concept, "__C__") id_mapping[concept_name] = concept_identifier concept_mapping_by_id[concept_identifier] = concept # transform the source code to use the new id tokens = list(Tokenizer(text, yield_eof=False)) text_to_compile_tokens = [] for t in tokens: if t.type == TokenKind.IDENTIFIER and t.value in id_mapping: text_to_compile_tokens.append(Token(TokenKind.IDENTIFIER, id_mapping[t.value], -1, -1, -1)) else: text_to_compile_tokens.append(t) text_to_compile = get_text_from_tokens(text_to_compile_tokens) # create the python node ast_ = ast.parse(text_to_compile, "", 'eval') python_node = PythonNode(text_to_compile, ast_, text) python_node.objects = concept_mapping_by_id return SourceCodeNode(start, start + len(tokens) - 1, tokens, text, python_node) def get_rete_conditions(*conditions): """ Transform a list of string into a list of Condition (Rete conditions) :param conditions: conditions in the form 'identifier|attribute|value' when one argument starts with "#" it means that it's a variables ex : "#__x_00__|__name__|'__ret'" -> Condition(V('#__x_00__'), '__name__', '__ret') Caution, the value part is evaluated "identifier|__name__|'True'" -> Condition(identifier, '__name__', 'True') # the string 'True' "identifier|__name__|True" -> Condition(identifier, '__name__', True) # the bool True """ def get_value(obj): if obj.startswith("#"): return V(obj[1:]) return eval(obj, sheerka_globals) res = [] for cond in conditions: if isinstance(cond, Condition): res.append(cond) elif isinstance(cond, NEGCOND): inner_cond = get_rete_conditions(cond.condition).conditions[0] res.append(NegatedCondition(inner_cond.identifier, inner_cond.attribute, inner_cond.value)) elif isinstance(cond, NCCOND): inner_conds = get_rete_conditions(*cond.conditions).conditions res.append(NegatedConjunctiveConditions(*inner_conds)) else: parts = cond.split("|") identifier = get_value(parts[0]) attribute = parts[1] value = get_value(parts[2]) res.append(Condition(identifier, attribute, value)) return AndConditions(res) def get_test_obj(real_obj, test_obj, get_test_obj_delegate=None): """ From a production object (Concept, ConceptNode, ....) Create a test object (CNC, CC ...) that can be used to validate the unit tests :param real_obj: :param test_obj: test object used as a template :param get_test_obj_delegate: :return: """ if isinstance(test_obj, list): if len(test_obj) != len(real_obj): raise Exception(f"Not the same size ! {real_obj=}, {test_obj=}") return [get_test_obj(r, t) for r, t in zip(real_obj, test_obj)] if isinstance(test_obj, dict): if len(test_obj) != len(real_obj): raise Exception(f"Not the same size ! {real_obj=}, {test_obj=}") return {k: get_test_obj(real_obj[k], v) for k, v in test_obj.items()} if hasattr(test_obj, "transform_real_obj"): return test_obj.transform_real_obj(real_obj, get_test_obj) return real_obj def prepare_nodes_comparison(concepts_map, expression, real_obj, test_obj): if isinstance(real_obj, list): assert len(real_obj) == len( test_obj), f"The two lists do not have the same size {len(real_obj)} != {len(test_obj)}" resolved_test_obj = compute_expected_array(concepts_map, expression, test_obj) real_obj_as_test = [get_test_obj(r, t) for r, t in zip(real_obj, resolved_test_obj)] return real_obj_as_test, resolved_test_obj else: resolved_test_obj = compute_expected_array(concepts_map, expression, [test_obj])[0] real_obj_as_test = get_test_obj(real_obj, resolved_test_obj) return real_obj_as_test, resolved_test_obj def compare_with_test_object(actual, expected): to_compare = get_test_obj(actual, expected) assert to_compare == expected