import ast from dataclasses import dataclass from core.builtin_concepts import ReturnValueConcept from core.builtin_helpers import CreateObjectIdentifiers from core.concept import CC, Concept, ConceptParts, DoNotResolve, CIO, CMV from core.tokenizer import Tokenizer, TokenKind, Token from core.utils import get_text_from_tokens, tokens_index from parsers.BaseNodeParser import scnode, utnode, cnode, SCWC, CNC, short_cnode, CN, UTN, \ SCN, RN, UnrecognizedTokensNode, SourceCodeNode from parsers.PythonParser import PythonNode from parsers.SyaNodeParser import SyaConceptParserHelper from parsers.expressions import NameExprNode, AndNode, OrNode, NotNode, VariableNode, ComparisonNode, ComparisonType from sheerkarete.common import V from sheerkarete.conditions import Condition, AndConditions @dataclass class Obj: prop_a: object prop_b: object = None prop_c: object = None parent: object = None class AND: """ Test class for AndNode""" def __init__(self, *parts, source=None): self.parts = parts self.source = source class OR: """ Test class for OrNode""" def __init__(self, *parts, source=None): self.parts = parts self.source = source @dataclass class NOT: """ Test class for NotNode""" expr: object source: str = None @dataclass class EXPR: """Test class for NameNode. E stands for Expression""" source: str @dataclass class VAR: """Test class for VarNode""" full_name: str source: str = None @dataclass class EQ: left: object right: object source = None @dataclass class NEQ: left: object right: object source = None @dataclass class GT: left: object right: object source = None @dataclass class GTE: left: object right: object source = None @dataclass class LT: left: object right: object source = None @dataclass class LTE: left: object right: object source = None @dataclass class IN: left: object right: object source = None @dataclass class NIN: # for NOT INT left: object right: object source = None comparison_type_mapping = { "EQ": ComparisonType.EQUALS, "NEQ": ComparisonType.NOT_EQUAlS, "LT": ComparisonType.LESS_THAN, "LTE": ComparisonType.LESS_THAN_OR_EQUALS, "GT": ComparisonType.GREATER_THAN, "GTE": ComparisonType.GREATER_THAN_OR_EQUALS, "IN": ComparisonType.IN, "NIN": ComparisonType.NOT_IN, } def get_expr_node_from_test_node(full_text, test_node): """ Returns EXPR, OR, NOT, AND object to ease the comparison with the real ExprNode """ full_text_as_tokens = list(Tokenizer(full_text, yield_eof=False)) def get_pos(nodes): start, end = None, None for n in nodes: if start is None or start > n.start: start = n.start if end is None or end < n.end: end = n.end return start, end def get_pos_from_source(source): source_as_node = list(Tokenizer(source, yield_eof=False)) start = tokens_index(full_text_as_tokens, source_as_node) end = start + len(source_as_node) - 1 return start, end def get_expr_node(node): if isinstance(node, EXPR): value_as_tokens = list(Tokenizer(node.source, yield_eof=False)) start = tokens_index(full_text_as_tokens, value_as_tokens, 0) end = start + len(value_as_tokens) - 1 return NameExprNode(start, end, full_text_as_tokens[start: end + 1]) if isinstance(node, AND): parts = [get_expr_node(part) for part in node.parts] start, end = get_pos_from_source(node.source) if node.source else get_pos(parts) return AndNode(start, end, full_text_as_tokens[start: end + 1], *parts) if isinstance(node, OR): parts = [get_expr_node(part) for part in node.parts] start, end = get_pos_from_source(node.source) if node.source else get_pos(parts) return OrNode(start, end, full_text_as_tokens[start: end + 1], *parts) if isinstance(node, NOT): part = get_expr_node(node.expr) start, end = get_pos_from_source(node.source) if node.source else (part.start - 2, part.end) return NotNode(start, end, full_text_as_tokens[start: end + 1], part) if isinstance(node, VAR): value_as_tokens = list(Tokenizer(node.source or node.full_name, yield_eof=False)) start = tokens_index(full_text_as_tokens, value_as_tokens, 0) end = start + len(value_as_tokens) - 1 parts = node.full_name.split(".") if len(parts) == 1: return VariableNode(start, end, full_text_as_tokens[start: end + 1], parts[0]) else: return VariableNode(start, end, full_text_as_tokens[start: end + 1], parts[0], *parts[1:]) if isinstance(node, (EQ, NEQ, GT, GTE, LT, LTE, IN, NIN)): node_type = comparison_type_mapping[type(node).__name__] left_node, right_node = get_expr_node(node.left), get_expr_node(node.right) start, end = get_pos_from_source(node.source) if node.source else get_pos([left_node, right_node]) return ComparisonNode(start, end, full_text_as_tokens[start: end + 1], node_type, left_node, right_node) return get_expr_node(test_node) def _index(tokens, expr, index): """ Finds a sub list in a bigger list :param tokens: :param expr: :param index: :return: """ expected = [token.str_value for token in Tokenizer(expr) if token.type != TokenKind.EOF] for i in range(0, len(tokens) - len(expected) + 1): for j in range(len(expected)): if tokens[i + j] != expected[j]: break else: if index == 0: return i, len(expected) else: index -= 1 raise ValueError(f"substring '{expr}' not found") def compute_debug_array(res): to_compare = [] for r in res: res_debug = [] for token in r.debug: if isinstance(token, Token): if token.type == TokenKind.WHITESPACE: continue else: res_debug.append("T(" + token.value + ")") else: res_debug.append("C(" + token.concept.name + ")") to_compare.append(res_debug) return to_compare def get_node( concepts_map, expression_as_tokens, sub_expr, concept_key=None, skip=0, is_bnf=False, sya=False, init_empty_body=False, exclude_body=False): """ Tries to find sub in expression When found, transform it to its correct type :param expression_as_tokens: full expression :param sub_expr: sub expression to search in the full expression :param concepts_map: hash of the known concepts :param concept_key: key of the concept if different from sub_expr :param skip: number of occurrences of sub_expr to skip :param is_bnf: True if the concept to search is a bnf definition :param sya: Return SyaConceptParserHelper instead of a ConceptNode when needed :param init_empty_body: if True adds the source in the body (actually in compiled.BODY) :param exclude_body: Ask to not compare body :return: """ if sub_expr == "')'": return ")" if isinstance(sub_expr, ReturnValueConcept): return sub_expr if isinstance(sub_expr, (scnode, utnode, DoNotResolve)): return sub_expr if isinstance(sub_expr, CIO): sub_expr.set_concept(concepts_map[sub_expr.concept_name]) source = sub_expr.source or sub_expr.concept_name if source: node = get_node(concepts_map, expression_as_tokens, source, sya=sya) sub_expr.start = node.start sub_expr.end = node.end return sub_expr if isinstance(sub_expr, cnode): # for cnode, map the concept key to the one from concepts_maps if needed if sub_expr.concept_key.startswith("#"): return cnode( concepts_map[sub_expr.concept_key[1:]].key, sub_expr.start, sub_expr.end, sub_expr.source ) else: return sub_expr if isinstance(sub_expr, SCWC): sub_expr.first = get_node(concepts_map, expression_as_tokens, sub_expr.first, sya=sya) sub_expr.last = get_node(concepts_map, expression_as_tokens, sub_expr.last, sya=sya) sub_expr.content = [get_node(concepts_map, expression_as_tokens, c, sya=sya) for c in sub_expr.content] sub_expr.fix_pos(sub_expr.first) sub_expr.fix_pos(sub_expr.last) return sub_expr # return SourceCodeWithConceptNode(first, last, content).pseudo_fix_source() if isinstance(sub_expr, SCN): node = get_node(concepts_map, expression_as_tokens, sub_expr.source, sya=sya) sub_expr.fix_pos(node) return sub_expr if isinstance(sub_expr, RN): start, length = _index(expression_as_tokens, sub_expr.source, skip) sub_expr.start = start sub_expr.end = start + length - 1 return sub_expr if isinstance(sub_expr, (CNC, CC, CN)): concept_node = get_node( concepts_map, expression_as_tokens, sub_expr.source or sub_expr.concept_key, sub_expr.concept_key, sya=sya) if not hasattr(concept_node, "concept"): raise Exception(f"'{sub_expr.concept_key}' is not a concept. Check your map.") concept_found = concept_node.concept sub_expr.concept_key = concept_found.key sub_expr.concept = concept_found sub_expr.fix_pos((concept_node.start, concept_node.end if hasattr(concept_node, "end") else concept_node.start)) if hasattr(sub_expr, "compiled"): for k, v in sub_expr.compiled.items(): node = get_node(concepts_map, expression_as_tokens, v, sya=sya, exclude_body=exclude_body) # need to get start and end positions if isinstance(v, str) and v in concepts_map: new_value_concept = concepts_map[v] new_value = CC(Concept().update_from(new_value_concept), exclude_body=exclude_body) if init_empty_body: init_body(new_value, concept_found, v) else: new_value = node sub_expr.compiled[k] = new_value sub_expr.fix_pos(node) if init_empty_body: init_body(sub_expr, concept_found, sub_expr.source) if hasattr(sub_expr, "fix_source"): sub_expr.fix_source(expression_as_tokens[sub_expr.start: sub_expr.end + 1]) return sub_expr if isinstance(sub_expr, UTN): node = get_node(concepts_map, expression_as_tokens, sub_expr.source) sub_expr.fix_pos(node) return sub_expr if isinstance(sub_expr, short_cnode): return get_node(concepts_map, expression_as_tokens, sub_expr.source, concept_key=sub_expr.concept_key, skip=skip, is_bnf=True, sya=sya) if isinstance(sub_expr, tuple): return get_node(concepts_map, expression_as_tokens, sub_expr[0], concept_key=concept_key, skip=sub_expr[1], is_bnf=is_bnf, sya=sya) start, length = _index(expression_as_tokens, sub_expr, skip) # special case of python source code if "+" in sub_expr and sub_expr.strip() != "+": return SCN(sub_expr, start, start + length - 1) # try to match one of the concept from the map concept_key = concept_key or sub_expr concept_found = concepts_map.get(concept_key, None) if concept_found: concept_found = Concept().update_from(concept_found) # make a copy when massively used in tests if sya and len(concept_found.get_metadata().variables) > 0 and not is_bnf: return SyaConceptParserHelper(concept_found, start, start + length - 1) elif init_empty_body: node = CNC(concept_found, start, start + length - 1, source=sub_expr, exclude_body=exclude_body) init_body(node, concept_found, sub_expr) return node else: return CN(concept_found, start, start + length - 1, source=sub_expr) else: # else an UnrecognizedTokensNode return UTN(sub_expr, start, start + length - 1) def init_body(item, concept, value): if "body" in item.compiled: item.compiled[ConceptParts.BODY] = item.compiled["body"] del (item.compiled["body"]) return if not concept or concept.get_metadata().body or ConceptParts.BODY in item.compiled: return item.compiled[ConceptParts.BODY] = DoNotResolve(value) def compute_expected_array(concepts_map, expression, expected, sya=False, init_empty_body=False, exclude_body=False): """ Computes a simple but sufficient version of the result of infix_to_postfix() :param concepts_map: :param expression: :param expected: :param sya: if true, generate an SyaConceptParserHelper instead of a cnode :param init_empty_body: if True adds the source in the body (actually in compiled.BODY) :param exclude_body: do not include ConceptParts.BODY in comparison :return: """ expression_as_tokens = [token.str_value for token in Tokenizer(expression) if token.type != TokenKind.EOF] return [get_node( concepts_map, expression_as_tokens, sub_expr, sya=sya, init_empty_body=init_empty_body, exclude_body=exclude_body) for sub_expr in expected] def get_unrecognized_node(start, text): tokens = list(Tokenizer(text, yield_eof=False)) return UnrecognizedTokensNode(start, start + len(tokens) - 1, tokens) def get_source_code_node(start, text, concepts_map, id_manager=None): id_manager = id_manager or CreateObjectIdentifiers() id_mapping = {} concept_mapping_by_id = {} # get the concepts, mapped by their new id for concept_name, concept in concepts_map.items(): concept_identifier = id_manager.get_identifier(concept, "__C__") id_mapping[concept_name] = concept_identifier concept_mapping_by_id[concept_identifier] = concept # transform the source code to use the new id tokens = list(Tokenizer(text, yield_eof=False)) text_to_compile_tokens = [] for t in tokens: if t.type == TokenKind.IDENTIFIER and t.value in id_mapping: text_to_compile_tokens.append(Token(TokenKind.IDENTIFIER, id_mapping[t.value], -1, -1, -1)) else: text_to_compile_tokens.append(t) text_to_compile = get_text_from_tokens(text_to_compile_tokens) # create the python node ast_ = ast.parse(text_to_compile, "", 'eval') python_node = PythonNode(text_to_compile, ast_, text) python_node.objects = concept_mapping_by_id return SourceCodeNode(start, start + len(tokens) - 1, tokens, text, python_node) def resolve_test_concept(concept_map, hint): if isinstance(hint, str): return concept_map[hint] if isinstance(hint, CC): concept = concept_map[hint.concept_key] compiled = {k: resolve_test_concept(concept_map, v) for k, v in hint.compiled.items()} return CC(concept, source=hint.source, exclude_body=hint.exclude_body, **compiled) if isinstance(hint, CMV): concept = concept_map[hint.concept_key] return CMV(concept, **hint.variables) # CV # # CMV # # CIO raise NotImplementedError() def get_rete_conditions(*conditions_as_string): """ Transform a list of string into a list of Condition (Rete conditions) :param conditions_as_string: conditions in the form 'identifier|attribute|value' when one argument starts with "#" it means that it's a variables ex : "#__x_00__|__name__|'__ret'" -> Condition(V('#__x_00__'), '__name__', '__ret') Caution, the value part is evaluated "identifier|__name__|'True'" -> Condition(identifier, '__name__', 'True') # the string 'True' "identifier|__name__|True" -> Condition(identifier, '__name__', True) # the bool True """ res = [] for as_string in conditions_as_string: identifier, attribute, value = as_string.split("|") if identifier.startswith("#"): identifier = V(identifier[1:]) if value.startswith("'"): value = value[1:-1] elif value in ("True", "False"): value = (value == "True") else: value = int(value) res.append(Condition(identifier, attribute, value)) return AndConditions(res) def get_test_obj(test_obj, real_obj, to_compare_delegate=None): """ From a production object (Concept, ConceptNode, ....) Create a test object (CNC, CC ...) that can be used to validate the unit tests :param test_obj: :param real_obj: :param to_compare_delegate: :return: """ if isinstance(test_obj, list): if len(test_obj) != len(real_obj): raise Exception(f"Not the same size ! {test_obj=}, {real_obj=}") return [get_test_obj(t, r) for t, r in zip(test_obj, real_obj)] if isinstance(test_obj, dict): if len(test_obj) != len(real_obj): raise Exception(f"Not the same size ! {test_obj=}, {real_obj=}") return {k: get_test_obj(v, real_obj[k]) for k, v in test_obj.items()} if not hasattr(test_obj, "to_compare"): return real_obj return test_obj.to_compare(real_obj, get_test_obj)