from dataclasses import dataclass from core.builtin_concepts import BuiltinConcepts from core.builtin_helpers import expect_one, only_successful, evaluate, ensure_concept from core.concept import Concept, DoNotResolve, ConceptParts, InfiniteRecursionResolved, AllConceptParts, \ concept_part_value from core.global_symbols import NotInit, CURRENT_OBJ from core.rule import Rule from core.sheerka.services.SheerkaConceptManager import SheerkaConceptManager from core.sheerka.services.SheerkaExecute import ParserInput from core.sheerka.services.sheerka_service import BaseService from core.tokenizer import Tokenizer from core.utils import unstr_concept from parsers.BaseNodeParser import ConceptNode from parsers.LogicalOperatorParser import LogicalOperatorParser from parsers.expressions import TrueifyVisitor CONCEPT_EVALUATION_STEPS = [ BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION] @dataclass class ChickenAndEggException(Exception): error: Concept @dataclass class WhereClauseDef: concept: Concept # concept on which the where clause is applied clause: str # original where clause trueified: str # modified where clause (where unresolvable variables are removed) prop: str # variable to test compiled: object # trueified where clause Python compiled class SheerkaEvaluateConcept(BaseService): NAME = "EvaluateConcept" def __init__(self, sheerka): super().__init__(sheerka) def initialize(self): self.sheerka.bind_service_method(self.evaluate_concept, True) self.sheerka.bind_service_method(self.set_auto_eval, True) @staticmethod def infinite_recursion_detected(context, concept): """ Browse the parents, looking for another evaluation of the same concept :param context: :param concept: :return: """ if concept is None: return False parent = context.get_parent() while parent is not None: if (parent.who == context.who and parent.action == BuiltinConcepts.EVALUATING_CONCEPT and parent.obj == concept and parent.obj.get_compiled() == concept.get_compiled()): return True parent = parent.get_parent() return False @staticmethod def get_infinite_recursion_resolution(obj): if isinstance(obj, InfiniteRecursionResolved): return obj if isinstance(obj, Concept) and isinstance(obj.body, InfiniteRecursionResolved): return obj.body return None @staticmethod def apply_ret(concept): """ Check if a concept has its RET part defined If True, returns it :param concept: :return: """ return concept.get_value(ConceptParts.RET) if ConceptParts.RET in concept.values else concept @staticmethod def get_needed_metadata(concept, concept_part, check_vars, check_body): """ Check if the concept_part has to be evaluated It also checks if the variables and the body need to be evaluated prior to it :param concept: :param concept_part: :param check_vars: :param check_body: :return: """ ret = [] vars_needed = False body_needed = False if concept_part in concept.get_compiled() and concept.get_compiled()[concept_part] is not None: concept_part_source = getattr(concept.get_metadata(), concept_part_value(concept_part)) assert concept_part_source is not None tokens = [t.str_value for t in Tokenizer(concept_part_source)] if check_vars: for var_name in (v[0] for v in concept.get_metadata().variables): if var_name in tokens: vars_needed = True ret.append("variables") break if check_body and "self" in tokens: body_needed = True ret.append(ConceptParts.BODY) ret.append(concept_part) return ret, vars_needed, body_needed @staticmethod def get_where_clause_def(context, concept, var_name): """ Returns the compiled code to be executed :param context: :param concept: :param var_name: :return: """ if concept.get_metadata().where is None or concept.get_metadata().where.strip() == "": return None ret = LogicalOperatorParser().parse(context, ParserInput(concept.get_metadata().where)) if not ret.status: # TODO: manage invalid where clause return None expr = ret.body.body to_trueify = [v[0] for v in concept.get_metadata().variables if v[0] != var_name] trueified_where = str(TrueifyVisitor(to_trueify, [var_name]).visit(expr)) tokens = [t.str_value for t in Tokenizer(trueified_where)] if var_name in tokens: compiled = None try: compiled = compile(trueified_where, "", "eval") except Exception: pass return WhereClauseDef(concept, concept.get_metadata().where, trueified_where, var_name, compiled) else: return None def get_recursive_definitions(self, concept, return_values): """ Returns the name of the parsers that will resolve to a recursive evaluation :param concept: :param return_values: :return: """ if concept.name in concept.variables(): # There is a variable with the same name as the concept # During evaluation, inner variables take precedence other concepts # So there won't be any cyclic reference, the variable will be picked return for parser in [r.body for r in return_values if r.status and self.sheerka.isinstance(r.body, BuiltinConcepts.PARSER_RESULT)]: parsed = parser.body if isinstance(parser.body, list) else [parser.body] for parsed_item in parsed: if isinstance(parsed_item, Concept) and parsed_item.id == concept.id: yield parser.parser elif isinstance(parsed_item, ConceptNode) and parsed_item.concept.id == concept.id: yield parser.parser def apply_where_clause(self, context, where_clause_def, return_values): """ Apply intermediate where clause when evaluating concept variables :param context: :param where_clause_def: :param return_values: :return: """ ret = [] valid_return_values = [r for r in return_values if r.status] for r in valid_return_values: if where_clause_def.compiled: try: if eval(where_clause_def.compiled, {where_clause_def.prop: self.sheerka.objvalue(r)}): ret.append(r) except NameError: ret.append(r) # it cannot be solved unitary, let's give a chance to the global where condition else: # it means that the where condition is an expression that needs to be executed evaluation_res = evaluate(context, where_clause_def.trueified, desc=f"Apply where clause on '{where_clause_def.prop}'", expect_success=True, is_question=True, stm={where_clause_def.prop: r.body}) one_res = expect_one(context, evaluation_res) if one_res.status: value = context.sheerka.objvalue(one_res) if isinstance(value, bool) and value: ret.append(r) if len(ret) > 0: return ret reason = [r.body for r in return_values] if len(valid_return_values) == 0 else None return self.sheerka.new(BuiltinConcepts.CONDITION_FAILED, body=where_clause_def.clause, concept=where_clause_def.concept, prop=where_clause_def.prop, reason=reason) def manage_infinite_recursion(self, context): """ We look for the fist parent that has a body that means something We use the eval function with no local or global :param context: :return: """ parent = context concepts_found = set() while parent and parent.obj: if parent.who == context.who and parent.action == BuiltinConcepts.EVALUATING_CONCEPT: body = parent.obj.get_metadata().body try: return self.sheerka.ret(self.NAME, True, InfiniteRecursionResolved(eval(body))) except Exception: pass concepts_found.add(parent.obj) parent = parent.get_parent() return self.sheerka.ret( self.NAME, False, self.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body=concepts_found)) def initialize_concept_asts(self, context, concept: Concept): """ Updates the codes of the newly created concept Basically, it runs the parsers on all parts :param concept: :param context: :return: """ def is_only_successful(r): """ :param r: return_value :return: """ return context.sheerka.isinstance(r, BuiltinConcepts.RETURN_VALUE) and \ context.sheerka.isinstance(r.body, BuiltinConcepts.ONLY_SUCCESSFUL) def parse_token_concept(s): """ :param s: source :return: """ if s.startswith("c:") and (identifier := unstr_concept(s)) != (None, None): return self.sheerka.fast_resolve(identifier) return None def get_return_value(current_context, c, s, p): """ :param current_context: :param c: concept :param s: source :param p: part of the concept being parsed :return: """ while True: return_value = current_context.sheerka.parse_unrecognized(current_context, s, parsers="all", prop=p, filter_func=only_successful) if not return_value.status: if current_context.preprocess: raise ChickenAndEggException(self.sheerka.new(BuiltinConcepts.CHICKEN_AND_EGG, body={c})) else: raise Exception(f"Failed to build '{s}'. But it doesn't seems to be recursion") return_value = return_value.body.body if is_only_successful(return_value) else [return_value] recursive_parsers = list(self.get_recursive_definitions(c, return_value)) if len(recursive_parsers) == 0: return return_value desc = f"Removing parsers {recursive_parsers}" current_context = current_context.push(context.action, context.action_context, desc=desc) for recursive_parser in recursive_parsers: current_context.add_preprocess(recursive_parser.name, enabled=False) for part_key in AllConceptParts: if part_key in concept.get_compiled(): continue source = getattr(concept.get_metadata(), concept_part_value(part_key)) if source is None: # or not isinstance(source, str): continue if not isinstance(source, str): raise Exception("Invalid concept init. metadata must be a string") if source.strip() == "": concept.get_compiled()[part_key] = DoNotResolve(source) else: if concept_found := parse_token_concept(source): # the compiled can be a reference to another concept... context.log(f"Recognized concept '{concept_found}'", self.NAME) concept.get_compiled()[part_key] = concept_found else: # ...or a list of ReturnValueConcept to resolve concept.get_compiled()[part_key] = get_return_value(context, concept, source, part_key) for var_name, default_value in concept.get_metadata().variables: if var_name in concept.get_compiled(): continue if default_value is None: continue if not isinstance(default_value, str): raise Exception("Invalid concept init. variable metadata must be a string") if default_value.strip() == "": concept.get_compiled()[var_name] = DoNotResolve(default_value) else: if concept_found := parse_token_concept(default_value): # the compiled can be a reference to another concept... context.log(f"Recognized concept '{concept_found}'", self.NAME) concept.get_compiled()[var_name] = concept_found else: # ...or a list of ReturnValueConcept to resolve concept.get_compiled()[var_name] = get_return_value(context, concept, default_value, var_name) # Updates the cache of concepts when possible # This piece of code is not used, a the compile part is removed by sheerka.new_from_template() service = context.sheerka.services[SheerkaConceptManager.NAME] if service.has_id(concept.id): self.sheerka.get_by_id(concept.id).set_compiled(concept.get_compiled()) def resolve(self, context, to_resolve, current_prop, current_concept, force_evaluation, where_clause_def): """ Resolve a variable or a Concept :param context: current execution context :param to_resolve: Concept or list of ReturnValueConcept to resolve :param current_prop: current property or ConceptPart :param current_concept: current concept :param force_evaluation: Force body evaluation :param where_clause_def: intermediate where clause for variables :return: """ def get_path(context_, prop_name): concept_name = f'"{context_.action_context.name}"' if isinstance(context_.action_context, Concept) \ else "'N/A'" prefix = context_.path if hasattr(context_, "path") else concept_name value = prop_name.name if isinstance(current_prop, ConceptParts) else prop_name return prefix + "." + value if isinstance(to_resolve, DoNotResolve): return to_resolve.value # manage infinite loop if self.infinite_recursion_detected(context, current_concept): with context.push(BuiltinConcepts.MANAGE_INFINITE_RECURSION, current_concept, desc="Infinite recursion detected", obj=current_concept) as sub_context: # I create a sub context in order to log what happened ret_val = self.manage_infinite_recursion(context) sub_context.add_values(return_values=ret_val) return ret_val.body path = get_path(context, current_prop) desc = f"Evaluating {path} (concept={current_concept})" with context.push(BuiltinConcepts.EVALUATING_ATTRIBUTE, current_prop, desc=desc, obj=current_concept) as sub_context: sub_context.add_inputs(path=path) if force_evaluation: sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) if current_prop in (ConceptParts.WHERE, ConceptParts.PRE): sub_context.protected_hints.add(BuiltinConcepts.EVAL_UNTIL_SUCCESS_REQUESTED) if current_prop == ConceptParts.WHERE: sub_context.protected_hints.add(BuiltinConcepts.EVAL_QUESTION_REQUESTED) # when it's a concept, evaluate it if isinstance(to_resolve, Concept) and \ not context.sheerka.isinstance(to_resolve, BuiltinConcepts.RETURN_VALUE): evaluated = self.evaluate_concept(sub_context, to_resolve) sub_context.add_values(return_values=evaluated) if not context.sheerka.is_success(evaluated) and evaluated.key != to_resolve.key: error = evaluated else: return evaluated elif isinstance(to_resolve, Rule): raise NotImplementedError() # how to resolve rules ? # otherwise, execute all return values to find out what is the value else: # update short term memory with current concept variables if current_concept: for var in current_concept.get_metadata().variables: value = current_concept.get_value(var[0]) if value != NotInit: sub_context.add_to_short_term_memory(var[0], current_concept.get_value(var[0])) use_copy = [r for r in to_resolve] if hasattr(to_resolve, "__iter__") else to_resolve r = self.sheerka.execute(sub_context, use_copy, CONCEPT_EVALUATION_STEPS) if where_clause_def: # apply intermediate where clause r = self.apply_where_clause(context, where_clause_def, r) if self.sheerka.isinstance(r, BuiltinConcepts.CONDITION_FAILED): return r else: one_r = expect_one(context, r) sub_context.add_values(return_values=one_r) if one_r.status: return one_r.value else: error = one_r.value return error if self.sheerka.isinstance(error, BuiltinConcepts.CHICKEN_AND_EGG) \ else self.sheerka.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body=error, concept=current_concept, property_name=current_prop) def resolve_list(self, context, list_to_resolve, current_prop, current_concept, force_evaluation, where_clause_def): """When dealing with a list, there are two possibilities""" # It may be a list of ReturnValueConcept to execute (always the case for metadata) # or a list of single values (may be the case for properties) # in this latter case, all values are to be processed one by one and a list should be returned if len(list_to_resolve) == 0: return [] if self.sheerka.isinstance(list_to_resolve[0], BuiltinConcepts.RETURN_VALUE): return self.resolve(context, list_to_resolve, current_prop, current_concept, force_evaluation, where_clause_def) res = [] for to_resolve in list_to_resolve: # sanity check if self.sheerka.isinstance(to_resolve, BuiltinConcepts.RETURN_VALUE): return self.sheerka.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body="Mix between real values and return values", concept=current_concept, property_name=current_prop) r = self.resolve(context, to_resolve, current_prop, current_concept, force_evaluation, where_clause_def) if self.sheerka.isinstance(r, BuiltinConcepts.CONCEPT_EVAL_ERROR): return r res.append(r) return res def evaluate_concept(self, context, concept: Concept, eval_body=False, metadata=None): """ Evaluation a concept ie : resolve its body :param context: :param concept: :param eval_body: :param metadata: list of metadata to evaluate ('pre', 'post'...) :return: value of the evaluation or error """ if concept.get_metadata().is_evaluated: return concept # I cannot use cache because of concept like 'number'. # They don't have variables, but their values change every time they are instantiated # TODO: Need to find a way to cache despite of them # need_body = eval_body or context.in_context(BuiltinConcepts.EVAL_BODY_REQUESTED) # if need_body and len(concept.get_metadata().variables) == 0 and context.sheerka.has_id(concept.id): # from_cache = context.sheerka.get_by_id(concept.id) # if from_cache.get_metadata().is_evaluated: # concept.set_value(ConceptParts.BODY, from_cache.body) # concept.get_metadata().is_evaluated = True # return concept desc = f"Evaluating concept {concept}" with context.push(BuiltinConcepts.EVALUATING_CONCEPT, concept, desc=desc) as sub_context: sub_context.add_inputs(eval_body=eval_body) if eval_body: # ask for body evaluation sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) # auto evaluate commands if context.sheerka.isa(concept, context.sheerka.new(BuiltinConcepts.AUTO_EVAL)): sub_context.protected_hints.add(BuiltinConcepts.EVAL_BODY_REQUESTED) sub_context.add_to_short_term_memory(CURRENT_OBJ, concept) try: self.initialize_concept_asts(sub_context, concept) except ChickenAndEggException as ex: return ex.error # to make sure of the order, it don't use ConceptParts.get_parts() # variables must be evaluated first, body must be evaluated before where all_metadata_to_eval = metadata or self.compute_metadata_to_eval(sub_context, concept) for metadata_to_eval in all_metadata_to_eval: if metadata_to_eval == "variables": for var_name in (v for v in concept.variables() if v in concept.get_compiled()): prop_ast = concept.get_compiled()[var_name] w_clause = self.get_where_clause_def(context, concept, var_name) # TODO, manage when the where clause cannot be parsed if isinstance(prop_ast, list): # Do not send the current concept for the properties resolved = self.resolve_list(sub_context, prop_ast, var_name, None, True, w_clause) else: # Do not send the current concept for the properties resolved = self.resolve(sub_context, prop_ast, var_name, None, True, w_clause) if isinstance(resolved, Concept) and not sub_context.sheerka.is_success(resolved): resolved.set_value("concept", concept) # since current concept was not sent return resolved else: concept.set_value(var_name, resolved) else: part_key = metadata_to_eval # do not evaluate where when the body is a set # Indeed, the way that the where clause is expressed is not a valid python or concept code if part_key == ConceptParts.WHERE and self.sheerka.isaset(sub_context, concept.body): continue if part_key not in concept.get_compiled() or concept.get_compiled()[part_key] is None: continue metadata_ast = concept.get_compiled()[part_key] # if part_key is PRE, POST or WHERE, the concept need to be evaluated # if we want the predicates to be resolved => so force_eval = True # otherwise no need to force force_concept_eval = False if part_key == ConceptParts.BODY else True # resolve resolved = self.resolve(sub_context, metadata_ast, part_key, concept, force_concept_eval, None) # 'FATAL' error is detected, let's stop if isinstance(resolved, Concept) and not sub_context.sheerka.is_success(resolved): return resolved concept.set_value(part_key, self.get_infinite_recursion_resolution(resolved) or resolved) # validate PRE and WHERE condition if part_key in (ConceptParts.PRE, ConceptParts.WHERE) and not self.sheerka.objvalue(resolved): return self.sheerka.new(BuiltinConcepts.CONDITION_FAILED, body=getattr(concept.get_metadata(), concept_part_value(metadata_to_eval)), concept=concept, prop=part_key) # # TODO : Validate the POST condition # concept.init_key() # Necessary for old unit tests. To remove someday if ConceptParts.BODY in all_metadata_to_eval: concept.get_metadata().is_evaluated = True # # update the cache for concepts with no variables # Cannot use cache. See the comment at the beginning of this method # if len(concept.get_metadata().variables) == 0: # self.sheerka.om.put(self.sheerka.CONCEPTS_BY_ID_ENTRY, concept.id, concept) if not concept.get_metadata().is_builtin: self.sheerka.register_object(sub_context, concept.name, concept) # manage RET metadata if sub_context.in_context(BuiltinConcepts.EVAL_BODY_REQUESTED) and ConceptParts.RET in concept.values(): return concept.get_value(ConceptParts.RET) else: return concept def compute_metadata_to_eval(self, context, concept): to_eval = [] needed, variables, body = self.get_needed_metadata(concept, ConceptParts.PRE, True, True) to_eval.extend(needed) if context.in_context(BuiltinConcepts.EVAL_WHERE_REQUESTED) or concept.get_metadata().need_validation: # What are the cases where we do not need a validation ? # see test_sheerka_non_reg::test_i_can_evaluate_bnf_concept_with_where_clause() # res = sheerka.evaluate_user_input("foobar") needed, v, b = self.get_needed_metadata(concept, ConceptParts.WHERE, not variables, not body) variables |= v body |= b to_eval.extend(needed) if context.in_context(BuiltinConcepts.EVAL_BODY_REQUESTED): needed, v, b = self.get_needed_metadata(concept, ConceptParts.RET, not variables, not body) variables |= v body |= b to_eval.extend(needed) needed, v, b = self.get_needed_metadata(concept, ConceptParts.POST, not variables, not body) variables |= v body |= b to_eval.extend(needed) if context.in_context(BuiltinConcepts.EVAL_BODY_REQUESTED): if not variables: to_eval.append('variables') if not body: to_eval.append(ConceptParts.BODY) return to_eval def set_auto_eval(self, context, concept): """ add AUTO_EVAL to ISA :param context: :param concept: :return: """ ensure_concept(concept) return self.sheerka.set_isa(context, concept, self.sheerka.new(BuiltinConcepts.AUTO_EVAL))