import ast import logging from cache.Cache import Cache from core.ast_helpers import ast_to_props from core.builtin_concepts import BuiltinConcepts from core.concept import Concept, ConceptParts, DEFINITION_TYPE_BNF, concept_part_value from core.global_symbols import DEFAULT_EVALUATORS, INIT_AST_PARSERS, NotFound, NotInit from core.rule import Rule from core.sheerka.services.SheerkaExecute import ParserInput from core.tokenizer import TokenKind, Tokenizer from core.utils import as_bag PARSE_STEPS = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING] EVAL_ONLY_STEPS = [BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION] EVAL_STEPS = PARSE_STEPS + EVAL_ONLY_STEPS PARSERS = ["EmptyString", "ShortTermMemory", "Sequence", "Bnf", "Sya", "Python"] def remove_python_nodes(context, return_values): """ Try to reduce the number of return_values by removing return values with python node :param context: :param return_values: :return: """ res = [] for ret_val in return_values: value = context.sheerka.objvalue(ret_val) if not hasattr(value, "get_python_node"): res.append(ret_val) return res def is_same_success(context, return_values): """ Returns True if all returns values are successful and have the same value :param context: :param return_values: :return: True False or None (None if the concept is not evaluated) """ assert isinstance(return_values, list) def _get_value(ret_val): if not ret_val.status: raise Exception("Status is false") if isinstance(ret_val.body, Concept) and not ret_val.body.get_hints().is_evaluated: raise Exception("Concept is not evaluated") return context.sheerka.objvalue(ret_val) try: reference = _get_value(return_values[0]) for return_value in return_values[1:]: actual = _get_value(return_value) if actual != reference: return False except Exception as ex: context.log_error(ex) return None return True def expect_one(context, return_values): """ Checks if there is at least one success return value If there is more than one, check if it's the same value :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) if len(return_values) == 1: return return_values[0] successful_results = [item for item in return_values if item.status] number_of_successful = len(successful_results) # total_items = len(return_values) # remove errors when a winner is found if number_of_successful == 1: return sheerka.ret( context.who, True, successful_results[0].body, parents=return_values) # too many winners, which one to choose ? if number_of_successful > 1: # first, try to remove python node results. # In case of conflict, the concept take precedence over the natural Python result # as it is considered as an override (overload ?) successful_results = remove_python_nodes(context, successful_results) if len(successful_results) == 1: return sheerka.ret( context.who, True, successful_results[0].body, parents=return_values) if is_same_success(context, successful_results): return sheerka.ret( context.who, True, successful_results[0].value, parents=return_values) else: if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many successful results found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_SUCCESS, body=successful_results), parents=return_values) # number_of_successful == 0, only errors, i cannot help you if context.logger and context.logger.isEnabledFor(logging.DEBUG): context.log(f"Too many errors found by expect_one()", context.who) for s in successful_results: context.log(f"-> {s}", context.who) if len(return_values) == 1: return sheerka.ret( context.who, False, return_values[0].body, parents=return_values) else: # test if only one evaluator in error from evaluators.OneErrorEvaluator import OneErrorEvaluator one_error_evaluator = OneErrorEvaluator() reduce_requested = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.REDUCE_REQUESTED)) if one_error_evaluator.matches(context, return_values + [reduce_requested]): return sheerka.ret( context.who, False, one_error_evaluator.eval(context, return_values).body, parents=return_values) else: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values), parents=return_values) def only_successful(context, return_values): """ Removes all return values that are not successful Return error when no successful return value :param context: :param return_values: :return: """ if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) successful_results = [item for item in return_values if item.status] if len(successful_results) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS, body=return_values), parents=return_values) return sheerka.ret( context.who, True, sheerka.new(BuiltinConcepts.ONLY_SUCCESSFUL, body=successful_results), parents=return_values) def resolve_ambiguity(context, concepts): """ From the list of concepts, elect the one(s) that best suit(s) the context Use the PRE and WHERE metadata to choose the correct concepts :param context: :param concepts: :return: """ # we first sort by condition complexity. The more complex is the PRE condition, the more likely # the concept matches the context by_complexity = {} parts = [concept_part_value(ConceptParts.PRE), concept_part_value(ConceptParts.WHERE)] for c in concepts: by_complexity.setdefault(get_concept_complexity(context, c, parts), []).append(c) remaining_concepts = [] for complexity in sorted(by_complexity.keys(), reverse=True): if complexity == 0: remaining_concepts.extend(by_complexity[complexity]) else: for c in by_complexity[complexity]: from core.sheerka.services.SheerkaEvaluateConcept import EvaluationHints evaluated = context.sheerka.evaluate_concept(context, c, hints=EvaluationHints(eval_body=False, expression_only=True), metadata=[ConceptParts.PRE, ConceptParts.WHERE]) if context.sheerka.is_success(evaluated) or evaluated.key == c.key: remaining_concepts.append(c) if len(remaining_concepts) > 0: break # no need to check concept with lower complexity if len(remaining_concepts) < 2: return remaining_concepts # they all failed the pre conditions or one champ is found # for concepts with the same condition complexity, we choose the one that has the less number of variables # We consider that Concept("hello world") is more specific than Concept("hello a").def_var("a") # when the input is "hello world" by_number_of_vars = {} for c in remaining_concepts: by_number_of_vars.setdefault(len(c.get_metadata().variables), []).append(c) return by_number_of_vars[min(by_number_of_vars.keys())] def get_condition_complexity(context, condition): if condition is None or condition.strip() == "": return 0 # # count the number of conjunctions from parsers.LogicalOperatorParser import LogicalOperatorParser from parsers.BaseExpressionParser import compile_disjunctions from parsers.BaseExpressionParser import AndNode parser = LogicalOperatorParser() res = parser.parse(context, ParserInput(condition)) if not res.status: return 0 disjunctions = compile_disjunctions(res.body.body) complexity = 0 for conjunction in disjunctions: node_complexity = len(conjunction.parts) if isinstance(conjunction, AndNode) else 1 if node_complexity > complexity: complexity = node_complexity return complexity def get_concept_complexity(context, concept, concepts_parts): """ Need to find a proper algorithm to compute the complexity of a concept metadata So far, the concept is considered as complex if it has concept_part_str (so far with concept_part_str='pre') :param context: :param concept: :param concepts_parts: :return: """ complexity = 0 for i, parts in enumerate(reversed(concepts_parts)): for part in parts.split("|"): value = getattr(concept.get_metadata(), part) part_complexity = get_condition_complexity(context, value) if part_complexity > 0: complexity += part_complexity + (i * 100) return complexity def get_concepts_complexity(context, concepts, concepts_parts): """ compute the complexity of the concepts, relatively to each others :param context: :param concepts: concepts :param concepts_parts: metadata to use to compute the complexity :return: """ return {c.id or c.name: get_concept_complexity(context, c, concepts_parts) for c in concepts} def only_parsers_results(context, return_values): """ Filters the return_values and returns when the result is a ParserResult regardless of the status So it filters parsers in error (ERROR, NOT_FOR_ME, EMPTY...) :param context: :param return_values: :return: """ from parsers.BaseNodeParser import UnrecognizedTokensNode from parsers.BaseParser import ParsingError if not isinstance(return_values, list): return return_values sheerka = context.sheerka if len(return_values) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.IS_EMPTY, body=return_values), parents=return_values) return_values_ok = [item for item in return_values if sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)] # hack because some parsers don't follow the NOT_FOR_ME rule temp_ret_val = [] for ret_val in return_values_ok: if isinstance(ret_val.body.body, ParsingError): continue if isinstance(ret_val.body.body, list) and \ len(ret_val.body.body) == 1 and \ isinstance(ret_val.body.body[0], UnrecognizedTokensNode): continue temp_ret_val.append(ret_val) return_values_ok = temp_ret_val if len(return_values_ok) == 0: return sheerka.ret( context.who, False, sheerka.new(BuiltinConcepts.TOO_MANY_ERRORS if len(return_values) > 1 else BuiltinConcepts.ERROR, body=return_values), parents=return_values) return sheerka.ret( context.who, True, sheerka.new(BuiltinConcepts.FILTERED, body=return_values_ok, iterable=return_values, predicate="sheerka.isinstance(item.body, BuiltinConcepts.PARSER_RESULT)"), parents=return_values) def evaluate_from_source(context, source, parsers=INIT_AST_PARSERS, evaluators=DEFAULT_EVALUATORS, desc=None, eval_body=True, eval_where=True, is_question=False, expect_success=False, stm=None): """ :param context: :param source: :param parsers: :param evaluators: :param desc: :param eval_body: :param eval_where: :param is_question: :param expect_success: :param stm: short term memories entries AKA current namespace :return: """ sheerka = context.sheerka desc = desc or f"Eval '{source}'" hints_to_reset = { BuiltinConcepts.EVAL_BODY_REQUESTED, BuiltinConcepts.EVAL_WHERE_REQUESTED, BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED, BuiltinConcepts.EVAL_QUESTION_REQUESTED, } with context.push(BuiltinConcepts.EVALUATE_SOURCE, source, desc=desc, reset_hints=hints_to_reset) as sub_context: if eval_body: sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) if eval_where: sub_context.protected_hints.add(BuiltinConcepts.EVAL_WHERE_REQUESTED) if expect_success: sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED) if is_question: sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED) if stm: for k, v in stm.items(): sub_context.add_to_short_term_memory(k, v) if parsers != "all": sub_context.preprocess_parsers = parsers if evaluators != "all": sub_context.preprocess_evaluators = evaluators user_input = sheerka.ret(context.who, True, sheerka.new(BuiltinConcepts.USER_INPUT, body=source)) ret = sheerka.execute(sub_context, [user_input], EVAL_STEPS) sub_context.add_values(return_values=ret) return ret def evaluate_return_values(context, source, return_values, evaluators=DEFAULT_EVALUATORS, desc=None, eval_body=True, eval_where=True, is_question=False, expect_success=False, stm=None): sheerka = context.sheerka desc = desc or f"Eval '{source}' using return values" hints_to_reset = { BuiltinConcepts.EVAL_BODY_REQUESTED, BuiltinConcepts.EVAL_WHERE_REQUESTED, BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED, BuiltinConcepts.EVAL_QUESTION_REQUESTED, } with context.push(BuiltinConcepts.EVALUATE_SOURCE, source, desc=desc, reset_hints=hints_to_reset) as sub_context: if eval_body: sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) if eval_where: sub_context.protected_hints.add(BuiltinConcepts.EVAL_WHERE_REQUESTED) if expect_success: sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED) if is_question: sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED) if stm: for k, v in stm.items(): sub_context.add_to_short_term_memory(k, v) if evaluators != "all": sub_context.preprocess_evaluators = evaluators sub_context.add_inputs(return_values=return_values) res = sheerka.execute(sub_context, return_values.copy(), EVAL_ONLY_STEPS) one_r = expect_one(context, res) sub_context.add_values(return_values=one_r) return one_r def get_lexer_nodes(return_values, start, tokens): """ Transform all elements from return_values into lexer nodes (ConceptNode, UnrecognizedTokensNode, SourceCodeNode...) On the contrary of the other method (get_lexer_nodes_using_positions), all created lexer node will use the same offset (start) :param return_values: :param start: :param tokens: :return: list of list (list of concept node sequence) """ from evaluators.BaseEvaluator import BaseEvaluator from parsers.BaseNodeParser import ConceptNode, LexerNode, RuleNode, SourceCodeNode lexer_nodes = [] for ret_val in return_values: # To manage AFTER_PARSING evaluators who = ret_val.parents[0].who if ret_val.who.startswith(BaseEvaluator.PREFIX) else ret_val.who if who == "parsers.Python": if ret_val.body.source.strip().isidentifier(): # Discard SourceCodeNode which seems to be a concept name # It may be a wrong idea, so let's see continue end = start + len(tokens) - 1 lexer_nodes.append([SourceCodeNode(start, end, tokens, ret_val.body.source, python_node=ret_val.body.body, return_value=ret_val)]) elif who == "parsers.ExactConcept": concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] end = start + len(tokens) - 1 for concept in concepts: lexer_nodes.append([ConceptNode(concept, start, end, tokens, ret_val.body.source)]) elif who in ("parsers.Bnf", "parsers.Sya", "parsers.Sequence"): nodes = [node.clone() for node in ret_val.body.body] for node in nodes: node.start += start node.end += start if isinstance(node, ConceptNode): for k, v in node.concept.get_compiled().items(): if isinstance(v, LexerNode): v = v.clone() v.start += start v.end += start node.concept.get_compiled()[k] = v # but append the whole sequence if when it's a sequence lexer_nodes.append(nodes) elif who == "parsers.Rule": rules = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] end = start + len(tokens) - 1 for rule in rules: lexer_nodes.append([RuleNode(rule, start, end, tokens, ret_val.body.source)]) else: raise NotImplementedError(f"get_lexer_nodes who={who}") return lexer_nodes def get_lexer_nodes_using_positions(return_values, positions): """ Transform all elements from return_values into lexer nodes (ConceptNode, UnrecognizedTokensNode, SourceCodeNode...) Use positions to compute the exact new positions On the contrary of the other method (get_lexer_nodes), one return value is mapped with one position. it's not a offset, but an absolute position :param return_values: :param positions: is a list of triplets (start, end, tokens) :return: """ from evaluators.BaseEvaluator import BaseEvaluator from parsers.BaseNodeParser import ConceptNode, LexerNode, RuleNode, SourceCodeNode lexer_nodes = [] for ret_val, position in zip(return_values, positions): # To manage AFTER_PARSING evaluators who = ret_val.parents[0].who if ret_val.who.startswith(BaseEvaluator.PREFIX) else ret_val.who if who in ("parsers.Python", 'parsers.PythonWithConcepts'): lexer_nodes.append(SourceCodeNode(position.start, position.end, position.tokens, ret_val.body.source, python_node=ret_val.body.body, return_value=ret_val)) elif who == "parsers.ExactConcept": concepts = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] for concept in concepts: lexer_nodes.append(ConceptNode(concept, position.start, position.end, position.tokens, ret_val.body.source)) elif who in ("parsers.Bnf", "parsers.Sya", "parsers.Sequence"): nodes = [node.clone() for node in ret_val.body.body] for node in nodes: node.start = position.start node.end = position.end if isinstance(node, ConceptNode): for k, v in node.concept.get_compiled().items(): if isinstance(v, LexerNode): v = v.clone() v.start += position.start v.end += position.start node.concept.get_compiled()[k] = v # but append the whole sequence if when it's a sequence lexer_nodes.extend(nodes) elif who == "parsers.Rule": rules = ret_val.body.body if hasattr(ret_val.body.body, "__iter__") else [ret_val.body.body] for rule in rules: lexer_nodes.append(RuleNode(rule, position.start, position.end, position.tokens, ret_val.body.source)) elif who == "parsers.Function": node = ret_val.body.body node.start = position.start node.end = position.end lexer_nodes.append(node) else: raise NotImplementedError(f"get_lexer_nodes_using_positions {who=}") return lexer_nodes def ensure_evaluated(context, concept, eval_body=True, metadata=None): """ Evaluate a concept is not already evaluated :param context: :param concept: :param eval_body: :param metadata: :return: """ from core.sheerka.services.SheerkaEvaluateConcept import SheerkaEvaluateConcept, EvaluationHints if concept.get_hints().is_evaluated: return SheerkaEvaluateConcept.apply_ret(concept, eval_body or context.in_context(BuiltinConcepts.EVAL_BODY_REQUESTED)) # do not try to evaluate concept that are not fully initialized if concept.get_metadata().definition_type != DEFINITION_TYPE_BNF: for var_name, var_default_value in concept.get_metadata().variables: if var_default_value is None and \ var_name not in concept.get_compiled() and \ (var_name not in concept.values() or concept.get_value(var_name) == NotInit): return concept evaluated = context.sheerka.evaluate_concept(context, concept, hints=EvaluationHints(eval_body=eval_body), metadata=metadata) return evaluated def get_lexer_nodes_from_unrecognized(context, unrecognized_tokens_node, parsers): """ Using parsers, try to recognize concepts from source :param context: :param unrecognized_tokens_node: :param parsers: :return: """ res = context.sheerka.parse_unrecognized(context, unrecognized_tokens_node.source, parsers) res = only_parsers_results(context, res) if not res.status: return None return get_lexer_nodes(res.body.body, unrecognized_tokens_node.start, unrecognized_tokens_node.tokens) def update_compiled(context, concept, errors, parsers=None): """ TL;DR; Recursively iterate over concept.get_compiled() to replace LexerNode into concepts or list of ReturnValueConcept Long version: When parsing using a LexerNodeParser (SyaNodeParser, BnfNodeParser...) the result will be a LexerNode. In the specific case of a ConceptNode, the compiled variables will also be LexerNode (UnrecognizedTokensNode...) This function iterate over the compile to transform these nodes into concept of compiled AST :param context: :param concept: :param errors: a list the must be initialized by the caller :param parsers: to customize the parsers to use :return: """ from parsers.BaseNodeParser import ConceptNode, SourceCodeNode, SourceCodeWithConceptNode, UnrecognizedTokensNode sheerka = context.sheerka parsers = parsers or PARSERS def _validate_concept(c): """ Recursively browse the compiled properties in order to find unrecognized :param c: :return: """ for k, v in c.get_compiled().items(): if isinstance(v, Concept): _validate_concept(v) elif isinstance(v, ConceptNode): _validate_concept(v.concept) c.get_compiled()[k] = v.concept elif isinstance(v, SourceCodeNode): if not v.return_value: raise NotImplementedError("_validate_concept SourceCodeNode ret val is False") c.get_compiled()[k] = [v.return_value] elif isinstance(v, SourceCodeWithConceptNode): if v.return_value: res = v.return_value else: from parsers.PythonWithConceptsParser import PythonWithConceptsParser parser_helper = PythonWithConceptsParser() res = parser_helper.parse_nodes(context, v.get_all_nodes()) if res.status: c.get_compiled()[k] = [res] else: errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'")) elif isinstance(v, UnrecognizedTokensNode): res = context.sheerka.parse_unrecognized(context, v.source, parsers) res = only_successful(context, res) # only key successful parsers if res.status: c.get_compiled()[k] = res.body.body else: errors.append(sheerka.new(BuiltinConcepts.ERROR, body=f"Cannot parse '{v.source}'")) def _get_source(compiled, var_name): if var_name not in compiled: return None if not isinstance(compiled[var_name], list): return None if not len(compiled[var_name]) == 1: return None if not sheerka.isinstance(compiled[var_name][0], BuiltinConcepts.RETURN_VALUE): return None if not sheerka.isinstance(compiled[var_name][0].body, BuiltinConcepts.PARSER_RESULT): return None if compiled[var_name][0].body.name == "parsers.ShortTermMemory": return None return compiled[var_name][0].body.source _validate_concept(concept) # Special case where the values of the variables are the names of the variable # example : Concept("a plus b").def_var("a").def_var("b") # and the user has entered 'a plus b' # Chances are that we are talking about the concept itself, and not an instantiation (like '10 plus 2') # This means that 'a' and 'b' don't have any real values if len(concept.get_metadata().variables) > 0: for name, value in concept.get_metadata().variables: if _get_source(concept.get_compiled(), name) != name: break else: concept.get_hints().is_evaluated = True def add_to_ret_val(sheerka, context, return_values, concept_key): concept = sheerka.new(concept_key) ret_val = sheerka.ret(context.who, True, concept) return_values.append(ret_val) return return_values def remove_from_ret_val(sheerka, return_values, concept_key): to_remove = [] for ret_val in return_values: if ret_val.status and sheerka.isinstance(ret_val.body, concept_key): to_remove.append(ret_val) for item in to_remove: return_values.remove(item) return return_values def set_is_evaluated(concepts, check_nb_variables=False): """ set is_evaluated to True :param concepts: :param check_nb_variables: only set is_evaluated if the concept has variables :return: """ if concepts is None: return if hasattr(concepts, "__iter__"): for c in concepts: if not check_nb_variables or check_nb_variables and len(c.get_metadata().variables) > 0: c.get_hints().is_evaluated = True else: if not check_nb_variables or check_nb_variables and len(concepts.get_metadata().variables) > 0: concepts.get_hints().is_evaluated = True def update_concepts_hints(concepts, is_evaluated=None, recognized_by=None, is_instance=None): if concepts is None: return def update_concept_hints(c, _is_evaluated, _recognized_by, _is_instance): if _is_evaluated is not None: c.get_hints().is_evaluated = _is_evaluated if _recognized_by is not None: c.get_hints().recognized_by = _recognized_by if _is_instance is not None: c.get_hints().is_instance = _is_instance if hasattr(concepts, "__iter__"): for concept in concepts: update_concept_hints(concept, is_evaluated, recognized_by, is_instance) else: update_concept_hints(concepts, is_evaluated, recognized_by, is_instance) def ensure_concept(*concepts): if hasattr(concepts, "__iter__"): for concept in concepts: if not isinstance(concept, Concept): raise TypeError(f"'{concept}' must be a concept") else: if not isinstance(concepts, Concept): raise TypeError(f"'{concepts}' must be a concept") def ensure_rule(*rules): if hasattr(rules, "__iter__"): for rule in rules: if not isinstance(rule, Rule): raise TypeError(f"'{rule}' must be a rule") else: if not isinstance(rules, Rule): raise TypeError(f"'{rules}' must be a rule") def ensure_concept_or_rule(*items): if hasattr(items, "__iter__"): for item in items: if not isinstance(item, (Concept, Rule)): raise TypeError(f"'{item}' must be a concept or rule") else: if not isinstance(items, (Concept, Rule)): raise TypeError(f"'{items}' must be a concept or rule") def ensure_bnf(context, concept, parser_name="BaseNodeParser"): if concept.get_metadata().definition_type == DEFINITION_TYPE_BNF and not concept.get_bnf(): from parsers.BnfDefinitionParser import BnfDefinitionParser regex_parser = BnfDefinitionParser() desc = f"Resolving BNF '{concept.get_metadata().definition}'" with context.push(BuiltinConcepts.INIT_BNF, concept, who=parser_name, obj=concept, desc=desc) as sub_context: sub_context.add_inputs(parser_input=concept.get_metadata().definition) bnf_parsing_ret_val = regex_parser.parse(sub_context, concept.get_metadata().definition) sub_context.add_values(return_values=bnf_parsing_ret_val) if not bnf_parsing_ret_val.status: raise Exception(bnf_parsing_ret_val.value) concept.set_bnf(bnf_parsing_ret_val.body.body) expressions_cache = Cache() def evaluate_expression(expr, bag): """ Try to evaluate expr in context of bag :param expr: :param bag: :return: """ if expr is None or expr.strip() == "": return None if expr in bag: return bag[expr] props_definitions = expressions_cache.get(expr) if props_definitions is NotFound: _ast = ast.parse(expr, mode="eval") props_definitions = [] ast_to_props(props_definitions, _ast.body, None) props_definitions.reverse() expressions_cache.put(expr, props_definitions) return evaluate_object(bag, props_definitions) def evaluate_object(bag, properties): """ Evaluate the properties of an object Works with evaluate_expression :param bag: :param properties: List of ast_helpers.PropDef :return: """ for prop in properties: try: obj = bag[prop.prop] except KeyError: try: obj = bag["self"][prop.prop] except Exception: raise NameError(prop.prop) if obj is None: return None if prop.index is not None: obj = obj[prop.index] bag = as_bag(obj) return obj def is_a_question(context, concept): """ Returns True if the concept must be executed in the context of BuiltinConcepts.EVAL_QUESTION_REQUESTED The only two ways that are currently supported are * is_question() appears in the pre condition * context.in_context(BuiltinConcepts.EVAL_QUESTION_REQUESTED) appears in the pre condition :param context: :param concept: concept to analyse """ pre = concept.get_metadata().pre if pre in (None, NotInit, ""): return False res = context.sheerka.parse_expression(context, pre) if not res.status: return False node = res.body.body from parsers.BaseExpressionParser import IsAQuestionVisitor return IsAQuestionVisitor().is_a_question(node) def get_inner_body(context, concept): """ For container concept, returns the body """ if context.sheerka.isinstance(concept.body, BuiltinConcepts.ONLY_SUCCESSFUL): return concept.body.body else: return concept.body def get_possible_variables_from_concept(context, concept): """ Given concept definition, gives the variables of the concept that can be considered as a parameter in another function >>> gpvfc = get_possible_variables_from_concept >>> assert gpvfc(Concept("a plus b").def_var("a").def_var("b")) == {"a", "b"} >>> assert gpvfc(Concept("twenties", definition="twenty (one|two)=n").def_var("n")) == set() :param context: :param concept: :return: """ if len(concept.name) <= 1: return set() concept_name = [t.str_value for t in Tokenizer(concept.name, yield_eof=False)] names = [v_value.strip() or v_name for v_name, v_value in concept.get_metadata().variables if v_name in concept_name] possible_vars = filter(lambda x: context.sheerka.is_not_a_concept_name(x), names) to_keep = set() for var in possible_vars: tokens = Tokenizer(var, yield_eof=False) for t in tokens: if t.type in (TokenKind.IDENTIFIER, TokenKind.KEYWORD): to_keep.add(var) return to_keep def is_only_successful(sheerka, return_value): """ :param sheerka: :param return_value :return: """ return sheerka.isinstance(return_value, BuiltinConcepts.RETURN_VALUE) and \ sheerka.isinstance(return_value.body, BuiltinConcepts.ONLY_SUCCESSFUL) def debug_nodes(nodes): from parsers.BaseNodeParser import UnrecognizedTokensNode res = [] for node in nodes: if isinstance(node, UnrecognizedTokensNode): res.append(node.source) elif hasattr(node, "get_concept"): concept = node.get_concept() res.append(concept) else: res.append(node) return res def get_new_variables_definitions(concept): """ Return a new set of variable definition, where the default value are initialized with what was compiled """ new_variables = [] for var_name, var_default_value in concept.get_metadata().variables: if var_name in concept.get_metadata().parameters and hasattr(concept.get_compiled()[var_name], "source"): new_variables.append((var_name, concept.get_compiled()[var_name].source)) else: new_variables.append((var_name, var_default_value)) return new_variables class CreateObjectIdentifiers: """ Class that creates unique identifiers for Concept or Rule objects """ def __init__(self): self.identifiers = {} self.identifiers_key = {} @staticmethod def sanitize(identifier, default="0"): if identifier is None: return "" res = "" for c in identifier: res += c if c.isalnum() else default return res def get_identifier(self, obj, wrapper): """ Get an identifier for a concept. Make sure to return the same identifier if the same concept Make sure to return a different identifier if same name but different concept Internal function because I don't want identifiers, identifiers_key and python_ids_mappings to be instance variables I would like to keep this parser as stateless as possible :param obj: :param wrapper: string or char that will wrap the result (ex '__C__' or '__R__') :return: """ if id(obj) in self.identifiers: return self.identifiers[id(obj)] identifier = wrapper + self.sanitize(obj.key or obj.name) if obj.id: identifier += "__" + self.sanitize(obj.id, "_") if identifier in self.identifiers_key: self.identifiers_key[identifier] += 1 identifier += f"_{self.identifiers_key[identifier]}" else: self.identifiers_key[identifier] = 0 identifier += wrapper self.identifiers[id(obj)] = identifier return identifier