from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept, BuiltinErrors, BuiltinUnique from core.concept import Concept, ConceptParts, PROPERTIES_FOR_NEW from parsers.BaseParser import BaseParser from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError import core.utils import core.builtin_helpers from core.sheerka_logger import console_handler import logging import time CONCEPT_EVALUATION_STEPS = [ BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION] CONCEPT_LEXER_PARSER_CLASS = "parsers.ConceptLexerParser.ConceptLexerParser" DEBUG_TAB_SIZE = 4 class Sheerka(Concept): """ Main controller for the project """ CONCEPTS_ENTRY = "All_Concepts" # to store all the concepts CONCEPTS_DEFINITIONS_ENTRY = "Concepts_Definitions" # to store definitions (bnf) of concepts BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts def __init__(self, skip_builtins_in_db=False, debug=False, loggers=None): self.init_logging(debug, loggers) super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA) self.log.debug("Starting Sheerka.") # cache of the most used concepts # Note that these are only templates # They are used as a footprint for instantiation # Except of source when the concept is supposed to be unique # key is the key of the concept (not the name or the id) self.concepts_cache = {} # cache for concept definitions, # Primarily used for unit test that does not have access to sdp self.concepts_definition_cache = {} # # cache for concepts grammars # a grammar is a resolved BNF self.concepts_grammars = {} # a concept can be instantiated # ex: File is a concept, but File('foo.txt') is an instance # TODO: manage contexts self.instances = [] # List of the known rules by the system # ex: hello => say('hello') self.rules = [] self.sdp: SheerkaDataProvider = None # SheerkaDataProvider self.builtin_cache = {} # cache for builtin concepts self.parsers = {} # cache for builtin parsers self.evaluators = [] # cache for builtin evaluators self.evaluators_prefix: str = None self.parsers_prefix: str = None self.skip_builtins_in_db = skip_builtins_in_db def initialize(self, root_folder: str = None): """ Starting Sheerka Loads the current configuration Notes that when it's the first time, it also create the needed working folders :param root_folder: root configuration folder :return: ReturnValue(Success or Error) """ try: self.sdp = SheerkaDataProvider(root_folder) if self.sdp.first_time: self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000) event = Event("Initializing Sheerka.") self.sdp.save_event(event) exec_context = ExecutionContext(self.key, event, self) self.initialize_builtin_concepts() self.initialize_builtin_parsers() self.initialize_builtin_evaluators() self.initialize_concepts_definitions(exec_context) except IOError as e: return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e) return ReturnValueConcept(self, True, self) def initialize_builtin_concepts(self): """ Initializes the builtin concepts :return: None """ self.init_log.debug("Initializing builtin concepts") builtins_classes = self.get_builtins_classes_as_dict() # this all initialization of the builtins seems to be little bit complicated # why do we need to update it from DB ? for key in BuiltinConcepts: concept = self if key == BuiltinConcepts.SHEERKA \ else builtins_classes[str(key)]() if str(key) in builtins_classes \ else Concept(key, True, False, key) if key in BuiltinUnique: concept.metadata.is_unique = True concept.metadata.is_evaluated = True if not concept.metadata.is_unique and str(key) in builtins_classes: self.builtin_cache[key] = builtins_classes[str(key)] if not self.skip_builtins_in_db: from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.metadata.key) if from_db is None: self.init_log.debug(f"'{concept.name}' concept is not found in db. Adding.") self.set_id_if_needed(concept, True) self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True) else: self.init_log.debug(f"Found concept '{from_db}' in db. Updating.") concept.update_from(from_db) self.add_in_cache(concept) def initialize_builtin_parsers(self): """ Init the parsers :return: """ core.utils.init_package_import("parsers") base_class = core.utils.get_class("parsers.BaseParser.BaseParser") for parser in core.utils.get_sub_classes("parsers", base_class): if parser.__module__ == base_class.__module__: continue self.init_log.debug(f"Adding builtin parser '{parser.__name__}'") self.parsers[core.utils.get_full_qualified_name(parser)] = parser def initialize_builtin_evaluators(self): """ Init the evaluators :return: """ core.utils.init_package_import("evaluators") for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"): self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"): self.init_log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) def initialize_concepts_definitions(self, execution_context): self.init_log.debug("Initializing concepts definitions") definitions = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) if definitions is None: self.init_log.debug("No BNF defined") return lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]() ret_val = lexer_parser.initialize(execution_context, definitions) if not ret_val.status: self.init_log.error("Failed to initialize concepts definitions " + str(ret_val.body)) return self.concepts_grammars = lexer_parser.concepts_grammars def evaluate_user_input(self, text: str, user_name="kodjo"): """ Note to KSI: If you try to add execution context to this function, You may end in an infinite loop :param text: :param user_name: :return: """ self.log.debug(f"Processing user input '{text}', {user_name=}.") event = Event(text, user_name) evt_digest = self.sdp.save_event(event) self.log.debug(f"{evt_digest=}") with ExecutionContext(self.key, event, self, f"Evaluating '{text}'") as execution_context: user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name)) reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED)) steps = [ BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING, BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION ] ret = self.execute(execution_context, [user_input, reduce_requested], steps) execution_context.add_values(return_values=ret) if not self.skip_builtins_in_db: self.sdp.save_result(execution_context) return ret def _call_parsers(self, execution_context, return_values, logger=None): # return_values must be a list if not isinstance(return_values, list): return_values = [return_values] # first make the distinguish between what is for the parsers and what is not result = [] to_process = [] for r in return_values: if not r.status or not self.isinstance(r.body, BuiltinConcepts.USER_INPUT): result.append(r) else: to_process.append(r) if not to_process: return result # keep track of the originals user inputs, as they need to be removed at the end user_inputs = to_process[:] # group the parsers by priorities instantiated_parsers = [parser(sheerka=self) for parser in self.parsers.values()] grouped_parsers = {} for parser in [p for p in instantiated_parsers if p.enabled]: if logger: parser.log = logger grouped_parsers.setdefault(parser.priority, []).append(parser) sorted_priorities = sorted(grouped_parsers.keys(), reverse=True) stop_processing = False for priority in sorted_priorities: inputs_for_this_group = to_process[:] for parser in grouped_parsers[priority]: return_value_success_found = False for return_value in inputs_for_this_group: to_parse = return_value.body.body \ if self.isinstance(return_value.body, BuiltinConcepts.USER_INPUT) \ else return_value.body # if self.log.isEnabledFor(logging.DEBUG): # debug_text = "'" + to_parse + "'" if isinstance(to_parse, str) \ # else "'" + BaseParser.get_text_from_tokens(to_parse) + "' as tokens" # execution_context.log(logger or self.log, f"Parsing {debug_text}") with execution_context.push(desc=f"Parsing using {parser.name}") as sub_context: res = parser.parse(sub_context, to_parse) if res is not None: if hasattr(res, "__iter__"): for r in res: if r is None: continue r.parents = [return_value] result.append(r) if self.isinstance(r.body, BuiltinConcepts.PARSER_RESULT): to_process.append(r) if r.status: return_value_success_found = True else: res.parents = [return_value] result.append(res) if self.isinstance(res.body, BuiltinConcepts.PARSER_RESULT): to_process.append(res) if res.status: return_value_success_found = True if return_value_success_found: stop_processing = True break # Stop the other return_values (but not the other parsers with the same priority) if stop_processing: break # Do not try the other priorities if a match is found result = core.utils.remove_list_from_list(result, user_inputs) return result def _call_evaluators(self, execution_context, return_values, process_step, evaluation_context=None, logger=None): def _preprocess_evaluators(context, evaluators): if not context.preprocess: return evaluators if not hasattr(evaluators, "__iter__"): single_one = True evaluators = [evaluators] else: single_one = False for preprocess in context.preprocess: for e in evaluators: if preprocess.props["name"].value == e.name: for prop, value in preprocess.props.items(): if prop == "name": continue if hasattr(e, prop): setattr(e, prop, value.value) return evaluators[0] if single_one else evaluators # return_values must be a list if not isinstance(return_values, list): return_values = [return_values] # Evaluation context are contexts that may modify the behaviour of the execution # For example, a concept to indicate that the value is not wanted # Or a concept to indicate that we want the letter form of the response # But first, they need to be transformed into return values if evaluation_context is None: evaluation_return_values = [] else: evaluation_return_values = [self.ret(execution_context.who, True, c) for c in evaluation_context] # add the current step as part as the evaluation context evaluation_return_values.append(self.ret(execution_context.who, True, self.new(process_step))) # the pool of return values are the mix return_values.extend(evaluation_return_values) # group the evaluators by priority and sort them # The first one to be applied will be the one with the highest priority grouped_evaluators = {} instantiated_evaluators = [e_class() for e_class in self.evaluators] # pre-process evaluators if needed instantiated_evaluators = _preprocess_evaluators(execution_context, instantiated_evaluators) for evaluator in [e for e in instantiated_evaluators if e.enabled and process_step in e.steps]: if logger: evaluator.log = logger grouped_evaluators.setdefault(evaluator.priority, []).append(evaluator) # order the groups by priority, the higher first sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True) # process while True: simple_digest = return_values[:] for priority in sorted_priorities: original_items = return_values[:] evaluated_items = [] to_delete = [] for evaluator in grouped_evaluators[priority]: evaluator = _preprocess_evaluators(execution_context, evaluator.__class__()) # fresh copy # process evaluators that work on return value from evaluators.BaseEvaluator import OneReturnValueEvaluator if isinstance(evaluator, OneReturnValueEvaluator): for item in original_items: if evaluator.matches(execution_context, item): result = evaluator.eval(execution_context, item) if result is None: continue elif isinstance(result, list): evaluated_items.extend(result) to_delete.append(item) elif isinstance(result, ReturnValueConcept): evaluated_items.append(result) to_delete.append(item) else: error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result, evaluator=evaluator) evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item])) to_delete.append(item) # process evaluators that work on all return values else: if evaluator.matches(execution_context, original_items): results = evaluator.eval(execution_context, original_items) if results is None: continue if not isinstance(results, list): results = [results] for result in results: evaluated_items.append(result) to_delete.extend(result.parents) return_values = evaluated_items return_values.extend([item for item in original_items if item not in to_delete]) # have we done something ? to_compare = return_values[:] if simple_digest == to_compare: break # inc the iteration and continue execution_context = execution_context.push(iteration=execution_context.iteration + 1) # remove all evaluation context that are not reduced return_values = core.utils.remove_list_from_list(return_values, evaluation_return_values) return return_values def execute(self, execution_context, return_values, execution_steps, logger=None): """ Executes process for all initial contexts :param execution_context: :param return_values: :param execution_steps: :param logger: logger to use (if not directly called by sheerka) :return: """ for step in execution_steps: copy = return_values[:] if hasattr(return_values, "__iter__") else [return_values] with execution_context.push(step=step, iteration=0, desc=f"{step=}", return_values=copy) as sub_context: sub_context.log(logger or self.log, f"{step=}, context='{sub_context}'") if step == BuiltinConcepts.PARSING: return_values = self._call_parsers(sub_context, return_values, logger) else: return_values = self._call_evaluators(sub_context, return_values, step, None, logger) if copy != return_values: sub_context.log_result(logger or self.log, return_values) sub_context.add_values(return_values=return_values) return return_values def set_id_if_needed(self, obj: Concept, is_builtin: bool): """ Set the key for the concept if needed :param obj: :param is_builtin: :return: """ if obj.metadata.id is not None: return obj.metadata.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS) self.log.debug(f"Setting id '{obj.metadata.id}' to concept '{obj.metadata.name}'.") def create_new_concept(self, context, concept: Concept, logger=None): """ Adds a new concept to the system :param context: :param concept: DefConceptNode :param logger :return: digest of the new concept """ logger = logger or self.log concept.init_key() concepts_definitions = None init_ret_value = None # checks for duplicate concepts # TODO checks if it exists in cache first if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()): error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept) return self.ret( self.create_new_concept.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept), error.args[0]) # set id before saving in db self.set_id_if_needed(concept, False) # add the BNF if known if concept.bnf: concepts_definitions = self.get_concept_definition() concepts_definitions[concept] = concept.bnf # check if it's a valid BNF or whether it breaks the known rules concept_lexer_parser = self.parsers[CONCEPT_LEXER_PARSER_CLASS]() with context.push(self.name, desc=f"Initializing concept definition for {concept}") as sub_context: sub_context.concepts[concept.key] = concept # the concept is not in the real cache yet sub_context.log_new(logger) init_ret_value = concept_lexer_parser.initialize(sub_context, concepts_definitions) sub_context.add_values(return_values=init_ret_value) if not init_ret_value.status: return self.ret(self.create_new_concept.__name__, False, ErrorConcept(init_ret_value.value)) # save the new concept in sdp try: self.sdp.add(context.event.get_digest(), self.CONCEPTS_ENTRY, concept, use_ref=True) if concepts_definitions is not None: self.sdp.set(context.event.get_digest(), self.CONCEPTS_DEFINITIONS_ENTRY, concepts_definitions, use_ref=True) except SheerkaDataProviderDuplicateKeyError as error: context.log_error(logger, "Failed to create a new concept.", who=self.create_new_concept.__name__) return self.ret( self.create_new_concept.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_DEFINED, body=concept), error.args[0]) # Updates the caches self.concepts_cache[concept.key] = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key) if init_ret_value is not None and init_ret_value.status: self.concepts_grammars = init_ret_value.body # process the return in needed ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept)) return ret def add_concept_to_set(self, context, concept, concept_set, logger=None): """ Add an entry in sdp to tell that concept isa concept_set :param context: :param concept: :param concept_set: :param logger: :return: """ logger = logger or self.log context.log(logger, f"Adding concept {concept} to set {concept_set}", who=self.add_concept_to_set.__name__) assert concept.id assert concept_set.id try: ret = self.sdp.add_unique(context.event.get_digest(), "All_" + str(concept_set.id), concept.id) if ret == (None, None): # concept already in set return self.ret( self.add_concept_to_set.__name__, False, self.new(BuiltinConcepts.CONCEPT_ALREADY_IN_SET, body=concept, concept_set=concept_set)) else: return self.ret(self.add_concept_to_set.__name__, True, self.new(BuiltinConcepts.SUCCESS)) except Exception as error: context.log_error(logger, "Failed to add to set.", who=self.add_concept_to_set.__name__) return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0]) def initialize_concept_asts(self, context, concept: Concept, logger=None): """ Updates the codes of the newly created concept Basically, it runs the parsers on all parts :param concept: :param context: :param logger: :return: """ steps = [BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING] for part_key in ConceptParts: source = getattr(concept.metadata, part_key.value) if source is None or not isinstance(source, str) or source == "": # the only sources that I am sure to parse are strings # I refuse empty strings for performance matters, I don't want to handle useless NOPConcepts continue else: with context.push(desc=f"Initializing AST for {part_key}") as sub_context: sub_context.log_new(logger) to_parse = self.ret(context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=source)) res = self.execute(sub_context, to_parse, steps, logger) concept.cached_asts[part_key] = res sub_context.add_values(return_values=res) for prop in concept.props: value = concept.props[prop].value if value: if isinstance(value, Concept): concept.cached_asts[prop] = value else: to_parse = self.ret( context.who, True, self.new(BuiltinConcepts.USER_INPUT, body=value)) with context.push(desc=f"Initializing AST for property {prop}") as sub_context: sub_context.log_new(logger) res = self.execute(context, to_parse, steps) concept.cached_asts[prop] = res sub_context.add_values(return_values=res) # Updates the cache of concepts when possible if concept.key in self.concepts_cache: entry = self.concepts_cache[concept.key] if isinstance(entry, list): # TODO : manage when there are multiple entries pass else: self.concepts_cache[concept.key].cached_asts = concept.cached_asts def evaluate_concept(self, context, concept: Concept, logger=None): """ Evaluation a concept It means that if the where clause is True, will evaluate the body :param context: :param concept: :param logger: :return: value of the evaluation or error """ logger = logger or self.log if concept.metadata.is_evaluated: return concept def _resolve(return_value, desc, obj): context.log(logger, desc, self.evaluate_concept.__name__) with context.push(desc=desc, obj=obj) as sub_context: sub_context.log_new(logger) r = self.execute(sub_context, return_value, CONCEPT_EVALUATION_STEPS, logger) one_r = core.builtin_helpers.expect_one(context, r) sub_context.add_values(return_values=one_r) return one_r # WHERE condition should already be validated by the parser. # It's a mandatory condition for the concept before it can be recognized # # TODO : Validate the PRE condition # if len(concept.cached_asts) == 0: context.log(logger, "concept asts are not initialized. Initializing.", self.evaluate_concept.__name__) self.initialize_concept_asts(context, concept, logger) # to make sure of the order, it don't use ConceptParts.get_parts() # props must be evaluated first all_metadata_to_eval = ["props", "where", "pre", "post", "body"] for metadata_to_eval in all_metadata_to_eval: if metadata_to_eval == "props": for prop_name in (p for p in concept.props if p in concept.cached_asts): prop_ast = concept.cached_asts[prop_name] if isinstance(concept.cached_asts[prop_name], Concept): context.log( logger, f"Evaluation prop={prop_name}, value={prop_ast}", self.evaluate_concept.__name__) with context.push(f"Evaluation property '{prop_name}', value='{prop_ast}'") as sub_context: sub_context.log_new(logger) evaluated = self.evaluate_concept(sub_context, prop_ast) sub_context.add_values(return_values=evaluated) concept.set_prop(prop_name, evaluated) else: res = _resolve(prop_ast, f"Evaluating property '{prop_name}'", None) if res.status: concept.set_prop(prop_name, res.value) else: return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body=res.value, concept=concept, property_name=prop_name) else: part_key = ConceptParts(metadata_to_eval) if part_key in concept.cached_asts and concept.cached_asts[part_key] is not None: res = _resolve(concept.cached_asts[part_key], f"Evaluating '{part_key}'", concept) if res.status: setattr(concept.metadata, metadata_to_eval, res.value) else: return self.new(BuiltinConcepts.CONCEPT_EVAL_ERROR, body=res.value, concept=concept, property_name=part_key) # # TODO : Validate the POST condition # concept.init_key() # only does it if needed concept.metadata.is_evaluated = True return concept def add_in_cache(self, concept: Concept): """ Adds a concept template in cache. The cache is used as a proxy before looking at sdp :param concept: :return: """ # sanity check if concept.key is None: concept.init_key() if concept.key is None: raise KeyError() self.concepts_cache[concept.key] = concept return concept def get(self, concept_key, concept_id=None): """ Tries to find a concept What is return must be used a template for another concept. You must not modify the returned concept :param concept_key: key of the concept :param concept_id: when multiple concepts with the same key, use the id :return: """ if concept_key is None: return ErrorConcept("Concept key is undefined.") if isinstance(concept_key, BuiltinConcepts): concept_key = str(concept_key) # first search in cache result = self.concepts_cache[concept_key] if concept_key in self.concepts_cache else \ self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key) if result and (concept_id is None or not isinstance(result, list)): return result if isinstance(result, list): if concept_id: for c in result: if c.id == concept_id: return c else: return result # else return new Unknown concept # Note that I don't call the new() method to prevent cyclic call unknown_concept = Concept() template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)] unknown_concept.update_from(template) unknown_concept.metadata.body = concept_key return unknown_concept def new(self, concept_key, **kwargs): """ Returns an instance of a new concept When the concept is supposed to be unique, returns the same instance :param concept_key: :param kwargs: :return: """ if isinstance(concept_key, tuple): concept_key, concept_id = concept_key[0], concept_key[1] else: concept_id = None template = self.get(concept_key, concept_id) # manage concept not found if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \ concept_key != BuiltinConcepts.UNKNOWN_CONCEPT: return template if not isinstance(template, list): return self.new_from_template(template, concept_key, **kwargs) # if template is a list, it means that there a multiple concepts under the same key concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template] return concepts def new_from_template(self, template, key, **kwargs): # manage singleton if template.metadata.is_unique: return template # otherwise, create another instance concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept() concept.update_from(template) # update the properties for k, v in kwargs.items(): if k in concept.props: concept.set_prop(k, v) elif k in PROPERTIES_FOR_NEW: setattr(concept.metadata, k, v) elif hasattr(concept, k): setattr(concept, k, v) else: return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept) # TODO : add the concept to the list of known concepts (self.instances) return concept def ret(self, who: str, status: bool, value, message=None, parents=None): """ Creates and returns a ReturnValue concept :param who: :param status: :param value: :param message: :param parents: :return: """ return self.new( BuiltinConcepts.RETURN_VALUE, who=who, status=status, value=value, message=message, parents=parents) def value(self, obj, reduce_simple_list=False): if obj is None: return None if hasattr(obj, "get_value"): return obj.get_value() if not isinstance(obj, Concept): return obj if obj.body is None: return obj if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1: body_to_use = obj.body[0] else: body_to_use = obj.body return self.value(body_to_use) def values(self, objs): if not (isinstance(objs, list) or self.isinstance(objs, BuiltinConcepts.LIST) or self.isinstance(objs, BuiltinConcepts.ENUMERATION)): objs = [objs] return (self.value(obj) for obj in objs) def is_success(self, obj): if isinstance(obj, bool): # quick win return obj if isinstance(obj, ReturnValueConcept): return obj.status if isinstance(obj, Concept) and obj.metadata.is_builtin and obj.key in BuiltinErrors: return False return obj def is_unknown(self, obj): if not isinstance(obj, Concept): return True return obj.key == BuiltinConcepts.UNKNOWN_CONCEPT def isinstance(self, a, b): """ return true if the concept a is an instance of the concept b :param a: :param b: :return: """ if isinstance(a, BuiltinConcepts): # common KSI error ;-) raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept") if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) # TODO : manage when a is the list of all possible b # for example, if a is a color, it will be found the entry 'All_Colors' return a.key == b_key def isa(self, a, b): """ return true if the concept a is a b Will handle when the keyword isa will be implemented :param a: :param b: :return: """ if isinstance(a, BuiltinConcepts): # common KSI error ;-) raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept") if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) # TODO : manage when a is the list of all possible b # for example, if a is a color, it will be found the entry 'All_Colors' return a.key == b_key def get_evaluator_name(self, name): if self.evaluators_prefix is None: base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator") self.evaluators_prefix = base_evaluator_class.PREFIX return self.evaluators_prefix + name def get_parser_name(self, name): if self.parsers_prefix is None: base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser") self.parsers_prefix = base_parser_class.PREFIX return self.parsers_prefix + name def get_concept_definition(self): if self.concepts_definition_cache: return self.concepts_definition_cache self.concepts_definition_cache = self.sdp.get_safe(self.CONCEPTS_DEFINITIONS_ENTRY, load_origin=False) or {} return self.concepts_definition_cache def concepts(self): res = [] lst = self.sdp.list(self.CONCEPTS_ENTRY) for item in lst: if isinstance(item, list): res.extend(item) else: res.append(item) return sorted(res, key=lambda i: int(i.id)) def test(self): return f"I have access to Sheerka !" def test_error(self): raise Exception("I can raise an error") def dump_concepts(self): lst = self.sdp.list(self.CONCEPTS_ENTRY) for item in lst: if hasattr(item, "__iter__"): for i in item: self.log.info(i) else: self.log.info(item) def dump_definitions(self): defs = self.sdp.get(self.CONCEPTS_DEFINITIONS_ENTRY) self.log.info(defs) def dump_desc(self, *concept_names): first = True for concept_name in concept_names: if isinstance(concept_name, Concept): concepts = concept_name else: concepts = self.get(concept_name) if self.isinstance(concepts, BuiltinConcepts.UNKNOWN_CONCEPT): self.log.error(f"Concept '{concept_name}' is unknown") return False if not hasattr(concepts, "__iter__"): concepts = [concepts] for c in concepts: if not first: self.log.info("") self.log.info(f"name : {c.name}") self.log.info(f"bnf : {c.metadata.definition}") self.log.info(f"key : {c.key}") self.log.info(f"body : {c.body}") self.log.info(f"digest : {c.get_digest()}") first = False @staticmethod def get_builtins_classes_as_dict(): res = {} for c in core.utils.get_classes("core.builtin_concepts"): if issubclass(c, Concept) and c != Concept: res[c().metadata.key] = c return res @staticmethod def init_logging(debug, loggers): core.sheerka_logger.set_enabled(loggers) if debug: # log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s" log_format = "%(asctime)s [%(levelname)s] %(message)s" log_level = logging.DEBUG else: log_format = "%(message)s" log_level = logging.INFO logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler]) class ExecutionContext: """ To keep track of the execution of a request """ def __init__(self, who, event: Event, sheerka: Sheerka, desc: str = None, **kwargs): self._parent = None self._id = ExecutionContextIdManager.get_id(event.get_digest()) self._tab = "" self._bag = {} # other variables self._start = 0 self._stop = 0 self.who = who # who is asking self.event = event # what was the (original) trigger self.sheerka = sheerka # sheerka self.desc = desc # human description of what is going on self.children = [] self.preprocess = None self.values = {} # what was produced by the execution context self.obj = kwargs.pop("obj", None) self.concepts = kwargs.pop("concepts", {}) # update the other elements for k, v in kwargs.items(): self._bag[k] = v @property def elapsed(self): if self._start == 0: return 0 return (self._stop if self._stop > 0 else time.time_ns()) - self._start @property def elapsed_str(self): nano_sec = self.elapsed dt = nano_sec / 1e6 return f"{dt} ms" if dt < 1000 else f"{dt / 1000} s" @property def id(self): return self._id def __getattr__(self, item): if item in self._bag: return self._bag[item] raise AttributeError(f"'ExecutionContext' object has no attribute '{item}'") def __enter__(self): self._start = time.time_ns() return self def __exit__(self, exc_type, exc_val, exc_tb): self._stop = time.time_ns() def __repr__(self): msg = f"ExecutionContext(who={self.who}, id={self._id}" if self.desc: msg += f", desc='{self.desc}'" msg += ")" return msg def add_preprocess(self, name, **kwargs): preprocess = self.sheerka.new(BuiltinConcepts.EVALUATOR_PRE_PROCESS) preprocess.set_prop("name", name) for k, v in kwargs.items(): preprocess.set_prop(k, v) if not self.preprocess: self.preprocess = set() self.preprocess.add(preprocess) return self def add_values(self, **kwargs): for k, v in kwargs.items(): self.values[k] = v return self def new_concept(self, key, **kwargs): # search in obj if self.obj: if self.obj.key == key: return self.sheerka.new_from_template(self.obj, key, **kwargs) for prop in self.obj.props: if prop == key: value = self.obj.props[prop].value if isinstance(value, Concept): return self.sheerka.new_from_template(value, key, **kwargs) else: return value if self.concepts: for k, c in self.concepts.items(): if k == key: return self.sheerka.new_from_template(c, key, **kwargs) return self.sheerka.new(key, **kwargs) def push(self, who=None, desc=None, **kwargs): who = who or self.who _kwargs = {"obj": self.obj, "concepts": self.concepts} _kwargs.update(self._bag) _kwargs.update(kwargs) new = ExecutionContext( who, self.event, self.sheerka, desc, **_kwargs, ) new._parent = self new._tab = self._tab + " " * DEBUG_TAB_SIZE new.preprocess = self.preprocess self.children.append(new) return new def log_new(self, logger): logger.debug(f"[{self._id:2}]" + self._tab + str(self)) def log(self, logger, message, who=None): logger.debug(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_error(self, logger, message, who=None): logger.exception(f"[{self._id:2}]" + self._tab + (f"[{who}] " if who else "") + str(message)) def log_result(self, logger, return_values): if not logger.isEnabledFor(logging.DEBUG): return if len(return_values) == 0: logger.debug(self._tab + "No return value") for r in return_values: to_str = self.return_value_to_str(r) logger.debug(f"[{self._id:2}]" + self._tab + "-> " + to_str) def to_dict(self): from core.sheerka_transform import SheerkaTransform st = SheerkaTransform(self.sheerka) return st.to_dict(self) @staticmethod def return_value_to_str(r): value = str(r.value) if len(value) > 50: value = value[:47] + "..." to_str = f"ReturnValue(who={r.who}, status={r.status}, value={value})" return to_str class ExecutionContextIdManager: ids = {} @staticmethod def get_id(event_digest): if event_digest in ExecutionContextIdManager.ids: ExecutionContextIdManager.ids[event_digest] += 1 else: ExecutionContextIdManager.ids[event_digest] = 0 return ExecutionContextIdManager.ids[event_digest]