from dataclasses import dataclass, field from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW from parsers.BaseParser import BaseParser from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError import core.utils import core.builtin_helpers from core.sheerka_logger import console_handler import logging CONCEPT_EVALUATION_STEPS = [ BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION] CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser" DEBUG_TAB_SIZE = 4 class Sheerka(Concept): """ Main controller for the project """ CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None): self.init_logging(debug, loggers) super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA) self.log.debug("Starting Sheerka.") # cache of the most used concepts # Note that these are only templates # They are used as a footprint for instantiation # Except of source when the concept is supposed to be unique # key is the key of the concept (not the name or the id) self.concepts_cache = {} # cache for concept definitions, # Primarily used for unit test that does not have access to sdp self.concepts_definition_cache = {} # # cache for concepts grammars # a grammar is a resolved BNF self.concepts_grammars = {} # a concept can be instantiated # ex: File is a concept, but File('foo.txt') is an instance # TODO: manage contexts self.instances = [] # List of the known rules by the system # ex: hello => say('hello') self.rules = [] self.sdp: SheerkaDataProvider = None # SheerkaDataProvider self.builtin_cache = {} # cache for builtin concepts self.parsers = {} # cache for builtin parsers self.evaluators = [] # cache for builtin evaluators self.evaluators_prefix: str = None self.parsers_prefix: str = None self.skip_builtins_in_db = skip_builtins_in_db def initialize(self, root_folder: str = None): """ Starting Sheerka Loads the current configuration Notes that when it's the first time, it also create the needed working folders :param root_folder: root configuration folder :return: ReturnValue(Success or Error) """ try: self.sdp = SheerkaDataProvider(root_folder) if self.sdp.first_time: self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000) event = Event("Initializing Sheerka.") self.sdp.save_event(event) exec_context = ExecutionContext(self.key, event, self) self.initialize_builtin_concepts() self.initialize_builtin_parsers() self.initialize_builtin_evaluators() self.initialize_concepts_definitions(exec_context) except IOError as e: return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e) return ReturnValueConcept(self, True, self) def initialize_builtin_concepts(self): """ Initializes the builtin concepts :return: None """ self.init_log.debug("Initializing builtin concepts") builtins_classes = self.get_builtins_classes_as_dict() # this all initialization of the builtins seems to be little bit complicated # why do we need to update it from DB ? for key in BuiltinConcepts: concept = self if key == BuiltinConcepts.SHEERKA \ else builtins_classes[str(key)]() if str(key) in builtins_classes \ else Concept(key, True, False, key) if not concept.metadata.is_unique and str(key) in builtins_classes: self.builtin_cache[key] = builtins_classes[str(key)] if not self.skip_builtins_in_db: from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key) if from_db is None: self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.") self.set_id_if_needed(concept, True) self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True) else: self.init_log.debug(f"Found concept '{from_db}' in db. Updating.") concept.update_from(from_db) self.add_in_cache(concept) def initialize_builtin_parsers(self): """ Init the parsers :return: """ core.utils.init_package_import("parsers") base_class = core.utils.get_class("parsers.BaseParser.BaseParser") for parser in core.utils.get_sub_classes("parsers", base_class): if parser.__module__ == base_class.__module__: continue self.init_log.debug(f"Adding builtin parser '{parser.__name__}'") self.parsers[core.utils.get_full_qualified_name(parser)] = parser def initialize_builtin_evaluators(self): """ Init the evaluators :return: """ core.utils.init_package_import("evaluators") for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"): self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"): self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) def initialize_concepts_definitions(self, execution_context): self.init_log.debug("Initializing concepts definitions") definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) if definitions is None: self.init_log.debug("No BNF defined") return lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]() ret_val = lexer_parser.initialize(execution_context, definitions) if not ret_val.status: self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body)) return self.concepts_grammars = lexer_parser.concepts_grammars def evaluate_user_input(self, text: str, user_name="kodjo"): """ Note to KSI: If you try to add execution context to this function, You may end in an infinite loop :param text: :param user_name: :return: """ self.log.debug(f"Processing user input '{text}', {user_name=}.") event = Event(text, user_name) evt_digest = self.sdp.save_event(event) self.log.debug(f"{evt_digest=}") execution_context = ExecutionContext(self.key, event, self) user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name)) reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED)) steps = [ BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING, BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION ] return self.execute(execution_context, [user_input, reduce_requested], steps) def _call_parsers(self, execution_context, return_values, logger=None): result = [] # return_values must be a list if not isinstance(return_values, list): return_values = [return_values] for return_value in return_values: # make sure we only parse user input if not return_value.status or not self.isinstance(return_value.body, BuiltinConcepts.USER_INPUT): result.append(return_value) continue to_parse = self.value(return_value) if self.log.isEnabledFor(logging.DEBUG): debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \ else "'" + BaseParser.get_text_from_tokens(to_parse) + "' as tokens" execution_context.log(logger or self.log, f"Parsing {debug_text}") for parser in self.parsers.values(): p = parser(sheerka=self) if logger: p.log = logger res = p.parse(execution_context, to_parse) if hasattr(res, "__iter__"): for r in res: r.parents = [return_value] result.append(r) else: res.parents = [return_value] result.append(res) return result def _call_evaluators(self, execution_context, return_values, process_step, evaluation_context=None, logger=None): def _preprocess_evaluators(context, evaluators): if not context.preprocess: return evaluators if not hasattr(evaluators, "__iter__"): single_one = True evaluators = [evaluators] else: single_one = False for preprocess in context.preprocess: for e in evaluators: if preprocess.props["name"].value == e.name: for prop, value in preprocess.props.items(): if prop == "name": continue if hasattr(e, prop): setattr(e, prop, value.value) return evaluators[0] if single_one else evaluators # return_values must be a list if not isinstance(return_values, list): return_values = [return_values] # Evaluation context are contexts that may modify the behaviour of the execution # For example, a concept to indicate that the value is not wanted # Or a concept to indicate that we want the letter form of the response # But first, they need to be transformed into return values if evaluation_context is None: evaluation_return_values = [] else: evaluation_return_values = [self.ret(execution_context.who, True, c) for c in evaluation_context] # add the current step as part as the evaluation context evaluation_return_values.append(self.ret(execution_context.who, True, self.new(process_step))) # the pool of return values are the mix return_values.extend(evaluation_return_values) # group the evaluators by priority and sort them # The first one to be applied will be the one with the highest priority grouped_evaluators = {} instantiated_evaluators = [e_class() for e_class in self.evaluators] # pre-process evaluators if needed instantiated_evaluators = _preprocess_evaluators(execution_context, instantiated_evaluators) for evaluator in [e for e in instantiated_evaluators if e.enabled and process_step in e.steps]: if logger: evaluator.log = logger grouped_evaluators.setdefault(evaluator.priority, []).append(evaluator) # order the groups by priority, the higher first sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True) # process while True: simple_digest = return_values[:] for priority in sorted_priorities: original_items = return_values[:] evaluated_items = [] to_delete = [] for evaluator in grouped_evaluators[priority]: evaluator = _preprocess_evaluators(execution_context, evaluator.__class__()) # fresh copy # process evaluators that work on return value from evaluators.BaseEvaluator import OneReturnValueEvaluator if isinstance(evaluator, OneReturnValueEvaluator): for item in original_items: if evaluator.matches(execution_context, item): result = evaluator.eval(execution_context, item) if result is None: continue elif isinstance(result, list): evaluated_items.extend(result) to_delete.append(item) elif isinstance(result, ReturnValueConcept): evaluated_items.append(result) to_delete.append(item) else: error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result, evaluator=evaluator) evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item])) to_delete.append(item) # process evaluators that work on all return values else: if evaluator.matches(execution_context, original_items): results = evaluator.eval(execution_context, original_items) if results is None: continue if not isinstance(results, list): results = [results] for result in results: evaluated_items.append(result) to_delete.extend(result.parents) return_values = evaluated_items return_values.extend([item for item in original_items if item not in to_delete]) # have we done something ? to_compare = return_values[:] if simple_digest == to_compare: break # inc the iteration and continue execution_context = execution_context.push(iteration=execution_context.iteration + 1) # remove all evaluation context that are not reduced return_values = core.utils.remove_list_from_list(return_values, evaluation_return_values) return return_values def execute(self, execution_context, return_values, execution_steps, logger=None): """ Executes process for all initial contexts :param execution_context: :param return_values: :param execution_steps: :param logger: logger to use (if not directly called by sheerka) :return: """ for step in execution_steps: sub_context = execution_context.push(step=step) sub_context.log(logger or self.log, f"{step=}, context='{sub_context}'") copy = return_values[:] if hasattr(return_values, "__iter__") else return_values if step == BuiltinConcepts.PARSING: return_values = self._call_parsers(sub_context, return_values, logger) else: return_values = self._call_evaluators(sub_context, return_values, step, None, logger) if copy != return_values: sub_context.log_result(logger or self.log, return_values) return return_values def set_id_if_needed(self, obj: Concept, is_builtin: bool): """ Set the key for the concept if needed :param obj: :param is_builtin: :return: """ if obj.metadata.id is not None: return obj.metadata.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS) self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.") def create_new_concept(self, context, concept: Concept, logger=None): """ Adds a new concept to the system :param context: :param concept: DefConceptNode :param logger :return: digest of the new concept """ logger = logger or self.log concept.init_key() concepts_definitions = None init_ret_value = None # checks for duplicate concepts # TODO checks if it exists in cache first if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()): error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept) return self.ret( self.create_new_concept.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept), error.args[0]) # set id before saving in db self.set_id_if_needed(concept, False) # add the BNF if known if concept.bnf: concepts_definitions = self.get_concept_definition() concepts_definitions[concept] = concept.bnf # check if it's a valid BNF or whether it breaks the known rules concept_lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]() sub_context = context.push(self.name, desc=f"Initializing concept definition for {concept}") sub_context.concepts[concept.key] = concept # the concept is not in the real cache yet init_ret_value = concept_lexer_parser.initialize(sub_context, concepts_definitions) if not init_ret_value.status: return self.ret(self.create_new_concept.__name__, False, ErrorConcept(init_ret_value.value)) # save the new context in sdp try: self.sdp.add(context.event.get_digest(), self.CONCEPTS_ENTRY, concept, use_ref=True) if concepts_definitions is not None: self.sdp.set(context.event.get_digest(), self.CONCEPTS_DEFINITIONS_ENTRY, concepts_definitions, use_ref=True) except SheerkaDataProviderDuplicateKeyError as error: context.log_error(logger, "Failed to create a new concept.", who=self.create_new_concept.__name__) return self.ret( self.create_new_concept.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept), error.args[0]) # Updates the caches self.concepts_cache[concept.key] = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key) if init_ret_value is not None and init_ret_value.status: self.concepts_grammars = init_ret_value.body # process the return in needed ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept)) return ret def add_concept_to_set(self, context, concept, concept_set, logger=None): """ Add an entry in sdp to tell that concept isa concept_set :param context: :param concept: :param concept_set: :param logger: :return: """ logger = logger or self.log context.log(logger, f"Adding concept {concept} to set {concept_set}", who=self.add_concept_to_set.__name__) assert concept.id assert concept_set.id try: ret = self.sdp.add_unique(context.event.get_digest(), "All_" + str(concept_set.id), concept.id) if ret == (None, None): # concept already in set return self.ret( self.add_concept_to_set.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_IN_SET, body=concept, concept_set=concept_set)) else: return self.ret(self.add_concept_to_set.__name__, True, self.new(BuiltinConcepts.SUCCESS)) except Exception as error: context.log_error(logger, "Failed to add to set.", who=self.add_concept_to_set.__name__) return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0]) def initialize_concept_asts(self, context, concept: Concept, logger=None): """ Updates the codes of the newly created concept Basically, it runs the parsers on all parts :param concept: :param context: :param logger: :return: """ # steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING] steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING] for part_key in ConceptParts: source = getattr(concept.metadata, part_key.value) if source is None or not isinstance(source, str) or source == "": # the only sources that I am sure to parse are strings # I refuse empty strings for performance matters, I don't want to handle useless NOPConcepts continue else: to_parse = self.ret(context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=source)) concept.cached_asts[part_key] = self.execute(context, to_parse, steps, logger) for prop in concept.props: if concept.props[prop].value: to_parse = self.ret( context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=concept.props[prop].value)) concept.cached_asts[prop] = self.execute(context, to_parse, steps) # Updates the cache of concepts when possible if concept.key in self.concepts_cache: entry = self.concepts_cache[concept.key] if isinstance(entry, list): # TODO : manage when there are multiple entries pass else: self.concepts_cache[concept.key].cached_asts = concept.cached_asts def evaluate_concept(self, context, concept: Concept, logger=None): """ Evaluation a concept It means that if the where clause is True, will evaluate the body :param context: :param concept: :param logger: :return: value of the evaluation or error """ if concept.metadata.is_evaluated: return concept def _resolve(resolve_context, return_value): r = self.execute(resolve_context, return_value, CONCEPT_EVALUATION_STEPS, logger or self.log) return core.builtin_helpers.expect_one(context, r) # WHERE condition should already be validated by the parser. # It's a mandatory condition for the concept before it can be recognized # # TODO : Validate the PRE condition # if len(concept.cached_asts) == 0: self.initialize_concept_asts(context, concept, logger) # to make sure of the order, it don't use ConceptParts.get_parts() # props must be evaluated first all_metadata_to_eval = ["props", "where", "pre", "post", "body"] for metadata_to_eval in all_metadata_to_eval: if metadata_to_eval == "props": for prop_name in (p for p in concept.props if p in concept.cached_asts): sub_context = context.push(desc=f"Evaluating property '{prop_name}'") sub_context.add_preprocess(self.get_evaluator_name("Concept"), return_body=True) res = _resolve(sub_context, concept.cached_asts[prop_name]) if res.status: concept.set_prop(prop_name, res.value) else: return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body=res.value, concept=concept, property_name=prop_name) else: part_key = ConceptParts(metadata_to_eval) if part_key in concept.cached_asts and concept.cached_asts[part_key] is not None: sub_context = context.push(desc=f"Evaluating '{part_key}'", obj=concept) sub_context.add_preprocess(self.get_evaluator_name("Concept"), return_body=True) res = _resolve(sub_context, concept.cached_asts[part_key]) if res.status: setattr(concept.metadata, metadata_to_eval, res.value) else: return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body=res.value, concept=concept, property_name=part_key) # # TODO : Validate the POST condition # concept.init_key() # only does it if needed concept.metadata.is_evaluated = True return concept def add_in_cache(self, concept: Concept): """ Adds a concept template in cache. The cache is used as a proxy before looking at sdp :param concept: :return: """ # sanity check if concept.key is None: concept.init_key() if concept.key is None: raise KeyError() self.concepts_cache[concept.key] = concept return concept def get(self, concept_key): """ Tries to find a concept What is return must be used a template for another concept. You must not modify the returned concept :param concept_key: :return: """ if isinstance(concept_key, BuiltinConcepts): concept_key = str(concept_key) # first search in cache if concept_key in self.concepts_cache: return self.concepts_cache[concept_key] # else look in sdp from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key) if from_db is not None: return from_db # else return new Unknown concept # Note that I don't call the new() method to prevent cyclic call unknown_concept = Concept() template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)] unknown_concept.update_from(template) unknown_concept.metadata.body = concept_key return unknown_concept def new(self, concept_key, **kwargs): """ Returns an instance of a new concept When the concept is supposed to be unique, returns the same instance :param concept_key: :param kwargs: :return: """ template = self.get(concept_key) def new_from_template(t, k, **kwargs_): # manage singleton if t.metadata.is_unique: return t # otherwise, create another instance concept = self.builtin_cache[k]() if k in self.builtin_cache else Concept() concept.update_from(t) # update the properties for k, v in kwargs_.items(): if k in concept.props: concept.set_prop(k, v) elif k in PROPERTIES_FOR_NEW: setattr(concept.metadata, k, v) elif hasattr(concept, k): setattr(concept, k, v) else: return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept) # TODO : add the concept to the list of known concepts (self.instances) return concept # manage concept not found if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \ concept_key != BuiltinConcepts.UNKNOWN_CONCEPT: return template if not isinstance(template, list): return new_from_template(template, concept_key, **kwargs) # if template is a list, it means that there a multiple concepts under the same key concepts = [new_from_template(t, concept_key, **kwargs) for t in template] return self.new(BuiltinConcepts.ENUMERATION, body=concepts) def ret(self, who: str, status: bool, value, message=None, parents=None): """ Creates and returns a ReturnValue concept :param who: :param status: :param value: :param message: :param parents: :return: """ return self.new( BuiltinConcepts.RETURN_VALUE, who=who, status=status, value=value, message=message, parents=parents) def value(self, obj, reduce_simple_list=False): if obj is None: return None if hasattr(obj, "get_value"): return obj.get_value() if not isinstance(obj, Concept): return obj if obj.body is None: return obj if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1: body_to_use = obj.body[0] else: body_to_use = obj.body return self.value(body_to_use) def values(self, objs): if not (isinstance(objs, list) or self.isinstance(objs, BuiltinConcepts.LIST) or self.isinstance(objs, BuiltinConcepts.ENUMERATION)): objs = [objs] return (self.value(obj) for obj in objs) def is_success(self, obj): if isinstance(obj, bool): # quick win return obj if isinstance(obj, ReturnValueConcept): return obj.status if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors: return False return obj def isinstance(self, a, b): """ return true if the concept a is an instance of the concept b :param a: :param b: :return: """ if isinstance(a, BuiltinConcepts): # common KSI error ;-) raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept") if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) # TODO : manage when a is the list of all possible b # for example, if a is a color, it will be found the entry 'All_Colors' return a.key == b_key def isa(self, a, b): """ return true if the concept a is a b Will handle when the keyword isa will be implemented :param a: :param b: :return: """ if isinstance(a, BuiltinConcepts): # common KSI error ;-) raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept") if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) # TODO : manage when a is the list of all possible b # for example, if a is a color, it will be found the entry 'All_Colors' return a.key == b_key def get_evaluator_name(self, name): if self.evaluators_prefix is None: base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator") self.evaluators_prefix = base_evaluator_class.PREFIX return self.evaluators_prefix + name def get_parser_name(self, name): if self.parsers_prefix is None: base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser") self.parsers_prefix = base_parser_class.PREFIX return self.parsers_prefix + name def get_concept_definition(self): if self.concepts_definition_cache: return self.concepts_definition_cache self.concepts_definition_cache = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) or {} return self.concepts_definition_cache def concepts(self): res = [] lst = self.sdp.list(self.CONCEPTS_ENTRY) for item in lst: if isinstance(item, list): res.extend(item) else: res.append(item) return sorted(res, key=lambda i: int(i.id)) def test(self): return f"I have access to Sheerka !" def test_error(self): raise Exception("I can raise an error") def dump_concepts(self): lst = self.sdp.list(self.CONCEPTS_ENTRY) for item in lst: if hasattr(item, "__iter__"): for i in item: self.log.info(i) else: self.log.info(item) def dump_definitions(self): defs = self.sdp.get(self.CONCEPTS_DEFINITIONS_ENTRY) self.log.info(defs) def dump_desc(self, concept_name): c = self.get(concept_name) if self.isinstance(c, BuiltinConcepts.UNKNOWN_CONCEPT): self.log.error("Concept unknown") return False self.log.info(f"name : {c.name}") self.log.info(f"bnf : {c.metadata.definition}") self.log.info(f"key : {c.key}") self.log.info(f"body : {c.body}") @staticmethod def get_builtins_classes_as_dict(): res = {} for c in core.utils.get_classes("core.builtin_concepts"): if issubclass(c, Concept) and c != Concept: res[c().metadata.key] = c return res @staticmethod def init_logging(debug, loggers): core.sheerka_logger.set_enabled(loggers) if debug: # log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s" log_format = "%(asctime)s [%(levelname)s] %(message)s" log_level = logging.DEBUG else: log_format = "%(message)s" log_level = logging.INFO logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler]) @dataclass class ExecutionContext: """ To keep track of the execution of a request """ def __init__(self, who, event: Event, sheerka: Sheerka, /, desc: str = None, obj: Concept = None, step: BuiltinConcepts = None, iteration: int = 0, concepts: dict = None): self.who = who # who is asking self.event = event # what was the (original) trigger self.sheerka = sheerka # sheerka self.step = step self.iteration = iteration self.preprocess = None self.desc = desc # human description of what is going on self.obj = obj # what is the subject of the execution context (if known) self.concepts = concepts or {} # cache for concepts that are specific to this execution self._id = ExecutionContextIdManager.get_id(event.get_digest()) self._tab = "" def add_preprocess(self, name, **kwargs): preprocess = self.sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS) preprocess.set_prop("name", name) for k, v in kwargs.items(): preprocess.set_prop(k, v) if not self.preprocess: self.preprocess = set() self.preprocess.add(preprocess) return self @property def id(self): return self._id def push(self, who=None, /, **kwargs): who = who or self.who desc = kwargs.get("desc", "") obj = kwargs.get("obj", self.obj) concepts = kwargs.get("concepts", self.concepts) step = kwargs.get("step", self.step) iteration = kwargs.get("iteration", self.iteration) new = ExecutionContext( who, self.event, self.sheerka, desc=desc, obj=obj, concepts=concepts, step=step, iteration=iteration, ) new._tab = self._tab + " " * DEBUG_TAB_SIZE new.preprocess = self.preprocess return new def log_new(self, logger): logger.debug(f"[{self._id:2}]" + self._tab + str(self)) def log(self, logger, message, who=None): logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_error(self, logger, message, who=None): logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_result(self, logger, return_values): if not logger.isEnabledFor(logging.DEBUG): return if len(return_values) == 0: logger.debug(self._tab + "No return value") for r in return_values: to_str = self.return_value_to_str(r) logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str) @staticmethod def return_value_to_str(r): value = str(r.value) if len(value) > 50: value = value[:47] + "..." to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})" return to_str def __repr__(self): msg = f"ExecutionContext(who={self.who}, id={self._id}" if self.desc: msg += f", desc='{self.desc}'" msg += ")" return msg class ExecutionContextIdManager: ids = {} @staticmethod def get_id(event_digest): if event_digest in ExecutionContextIdManager.ids: ExecutionContextIdManager.ids[event_digest] += 1 else: ExecutionContextIdManager.ids[event_digest] = 0 return ExecutionContextIdManager.ids[event_digest]