import inspect import logging from dataclasses import dataclass from operator import attrgetter import core.builtin_helpers import core.utils from cache.Cache import Cache from cache.IncCache import IncCache from core.builtin_concepts import ErrorConcept, ReturnValueConcept, UnknownConcept from core.builtin_concepts_ids import BuiltinErrors, BuiltinConcepts from core.concept import Concept, ConceptParts, get_concept_attrs from core.global_symbols import EVENT_USER_INPUT_EVALUATED, NotInit, NotFound, ErrorObj, EVENT_ONTOLOGY_CREATED from core.profiling import profile from core.sheerka.ExecutionContext import ExecutionContext from core.sheerka.SheerkaOntologyManager import SheerkaOntologyManager, OntologyAlreadyExists from core.sheerka_logger import console_handler from core.tokenizer import Token, TokenKind from printer.SheerkaPrinter import SheerkaPrinter from sdp.sheerkaDataProvider import Event BASE_NODE_PARSER_CLASS = "parsers.BaseNodeParser.BaseNodeParser" EXIT_COMMANDS = ("quit", "exit", "bye") EXECUTE_STEPS = [ BuiltinConcepts.BEFORE_PARSING, BuiltinConcepts.PARSING, BuiltinConcepts.AFTER_PARSING, BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION ] RULES_EVALUATE_STEPS = [ BuiltinConcepts.BEFORE_RULES_EVALUATION, BuiltinConcepts.RULES_EVALUATION, BuiltinConcepts.AFTER_RULES_EVALUATION, ] RULES_EXECUTE_STEPS = [ BuiltinConcepts.BEFORE_EVALUATION, BuiltinConcepts.EVALUATION, BuiltinConcepts.AFTER_EVALUATION ] # when a concept is instantiated via resolve or false_resolve # It indicate which parameter was used to recognize the concept RECOGNIZED_BY_ID = "by_id" RECOGNIZED_BY_NAME = "by_name" @dataclass class SheerkaMethod: """ Wrapper to sheerka method, to indicate if it's safe to call """ method: object has_side_effect: bool class Sheerka(Concept): """ Main controller for the project """ CONCEPTS_BY_ID_ENTRY = "ConceptManager:Concepts_By_ID" CONCEPTS_BY_NAME_ENTRY = "ConceptManager:Concepts_By_Name" CHICKEN_AND_EGG_CONCEPTS_ENTRY = "Chicken_And_Egg_Concepts" OBJECTS_IDS_ENTRY = "Objects_Ids" BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" # sequential key for builtin concepts USER_CONCEPTS_KEYS = "User_Concepts" # sequential key for user defined concepts ALL_ATTRIBUTES = [] def __init__(self, cache_only=False, debug=False, loggers=None): super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA) self.init_logging(debug, loggers) self.loggers = loggers self.cache_only = cache_only # self.log.debug("Starting Sheerka.") self.bnp = None # reference to the BaseNodeParser class (to compute first keyword token) self.return_value_concept_id = None self.error_concept_id = None self.om: SheerkaOntologyManager = None self.services = {} # sheerka plugins self.builtin_cache = {} # cache for builtin concepts self.parsers = {} # cache for builtin parsers self.evaluators = [] # cache for builtin evaluators self.evaluators_prefix: str = None self.parsers_prefix: str = None self.printer_handler = SheerkaPrinter(self) self.during_restore = False self.during_initialisation = False self._builtins_classes_cache = None self.save_execution_context = True self.enable_process_return_values = True self.enable_process_rules = True self.enable_commands_backup = True self.methods_with_context = {"test_using_context"} # only the names, the method is defined in sheerka_methods self.pipe_functions = set() self.sheerka_methods = { "test": SheerkaMethod(self.test, False), "test_using_context": SheerkaMethod(self.test_using_context, False), "test_dict": SheerkaMethod(self.test_dict, False), "test_error": SheerkaMethod(self.test_error, False), "is_sheerka": SheerkaMethod(self.is_sheerka, False), "objvalue": SheerkaMethod(self.objvalue, False), } self.concepts_ids = None def __copy__(self): return self def __deepcopy__(self, memodict={}): return self @property def chicken_and_eggs(self): return self.om.current_cache_manager().caches[self.CHICKEN_AND_EGG_CONCEPTS_ENTRY].cache @property def root_folder(self): return self.om.root_folder def bind_service_method(self, bound_method, has_side_effect, as_name=None, visible=True): """ Bind service method to sheerka instance for ease of use ? :param bound_method: :param has_side_effect: False if the method is safe :param as_name: give another name to the method :param visible: make the method visible to Sheerka :return: """ if as_name is None: as_name = bound_method.__name__ if as_name.startswith("pipe_"): as_name = as_name[5:] is_pipe_function = True else: is_pipe_function = False if visible: signature = inspect.signature(bound_method) if len(signature.parameters) > 0 and list(signature.parameters.keys())[0] == "context": self.methods_with_context.add(as_name) self.sheerka_methods[as_name] = SheerkaMethod(bound_method, has_side_effect) if is_pipe_function: self.pipe_functions.add(as_name) if not is_pipe_function: setattr(self, bound_method.__name__, bound_method) def initialize(self, root_folder: str = None, **kwargs): """ Starting Sheerka Loads the current configuration Notes that when it's the first time, it also create the needed working folders :param root_folder: root configuration folder :return: ReturnValue(Success or Error) """ self.save_execution_context = kwargs.get("save_execution_context", self.save_execution_context) self.enable_process_return_values = kwargs.get("enable_process_return_values", self.enable_process_return_values) self.enable_process_rules = kwargs.get("enable_process_rules", self.enable_process_rules) self.enable_commands_backup = kwargs.get("enable_commands_backup", self.enable_commands_backup) try: self.during_initialisation = True from sheerkapickle.sheerka_handlers import initialize_pickle_handlers initialize_pickle_handlers() self.om = SheerkaOntologyManager(self, root_folder, self.cache_only) self.builtin_cache = self.get_builtins_classes_as_dict() self.initialize_caching() self.get_builtin_parsers() self.get_builtin_evaluators() self.initialize_services() self.initialize_builtin_evaluators() self.om.init_subscribers() event = Event("Initializing Sheerka.", user_id=self.name) self.om.save_event(event) with ExecutionContext(self.key, event, self, BuiltinConcepts.INIT_SHEERKA, None, desc="Initializing Sheerka.") as exec_context: if self.om.current_sdp().first_time: self.first_time_initialisation(exec_context) self.initialize_builtin_concepts() self.initialize_services_deferred(exec_context, self.om.current_sdp().first_time) res = ReturnValueConcept(self, True, self) exec_context.add_values(return_values=res) if self.om.is_dirty(): self.om.commit(exec_context) if self.save_execution_context: self.om.save_result(exec_context, is_admin=True) # append the other ontologies if needed self.om.freeze() self.initialize_ontologies(exec_context) # self.init_log.debug(f"Sheerka successfully initialized") except IOError as e: res = ReturnValueConcept(self.name, False, self.new(BuiltinConcepts.ERROR, body=e)) finally: self.during_initialisation = False return res def initialize_caching(self): cache = IncCache().auto_configure(self.OBJECTS_IDS_ENTRY) self.om.register_cache(self.OBJECTS_IDS_ENTRY, cache) cache = Cache().auto_configure(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY) self.om.register_cache(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY, cache, persist=False) def initialize_services(self): """ Introspect to find services and bind them :return: """ # self.init_log.debug("Initializing services") core.utils.import_module_and_sub_module('core.sheerka.services') base_class = "core.sheerka.services.sheerka_service.BaseService" services = [service(self) for service in core.utils.get_sub_classes("core.sheerka.services", base_class)] services.sort(key=attrgetter("order")) for service in services: if hasattr(service, "initialize"): service.initialize() self.services[service.NAME] = service def initialize_services_deferred(self, context, is_first_time): """ Initialize part of services that may takes some time or that need the execution context TODO: Create a separate thread for these initialisations as they may take time :return: """ # self.init_log.debug("Initializing services (deferred)") for service in self.services.values(): if hasattr(service, "initialize_deferred"): service.initialize_deferred(context, is_first_time) def first_time_initialisation(self, context): self.record_var(context, self.name, "save_execution_context", self.save_execution_context) def initialize_builtin_concepts(self): """ Initializes the builtin concepts :return: None """ # self.init_log.debug("Initializing builtin concepts") from core.sheerka.services.SheerkaConceptManager import SheerkaConceptManager concept_service = self.services[SheerkaConceptManager.NAME] concepts_ids = concept_service.initialize_builtin_concepts() self.concepts_ids = concepts_ids self.return_value_concept_id = concepts_ids[BuiltinConcepts.RETURN_VALUE] self.error_concept_id = concepts_ids[BuiltinConcepts.ERROR] def get_builtin_parsers(self): """ Init the parsers :return: """ core.utils.import_module_and_sub_module("parsers") base_class = core.utils.get_class("parsers.BaseParser.BaseParser") modules_to_skip = ["parsers.BaseNodeParser", "parsers.BaseCustomGrammarParser", "parsers.BaseExpressionParser"] temp_result = {} for parser in core.utils.get_sub_classes("parsers", base_class): if parser.__module__ == base_class.__module__: continue qualified_name = core.utils.get_full_qualified_name(parser) # self.init_log.debug(f"Adding builtin parser '{qualified_name}'") temp_result[qualified_name] = parser # keep a reference to base_node_parser self.bnp = temp_result[BASE_NODE_PARSER_CLASS] # Now we sort the parser by name. # It's not important for the logic of their usage as they have their priority anyway, # We do that for the unit tests. They are to complicated to write otherwise for name in sorted(temp_result.keys()): parser = temp_result[name] if parser.__module__ in modules_to_skip: # base node parser module does not contains any valid parser continue self.parsers[name] = temp_result[name] def get_builtin_evaluators(self): """ get all evaluators :return: """ core.utils.import_module_and_sub_module("evaluators") evaluators = core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator") evaluators.extend(core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator")) for evaluator in evaluators: self.evaluators.append(evaluator) def initialize_builtin_evaluators(self): """ Init the evaluators :return: """ for evaluator in self.evaluators: if hasattr(evaluator, "initialize"): evaluator.initialize(self) def initialize_ontologies(self, context): ontologies = self.om.current_sdp().load_ontologies() if not ontologies: return for ontology_name in list(reversed(ontologies))[1:]: self.om.push_ontology(ontology_name, False) self.initialize_services_deferred(context, False) # @profile(filename="profile_80") def evaluate_user_input(self, text: str, user_name="kodjo"): """ Note to KSI: If you try to add execution context to this function, You may end in an infinite loop :param text: :param user_name: :return: """ # self.log.debug(f"Processing user input '{text}', {user_name=}.") # my_debug(f"****************** Processing user input '{text}', {user_name=}.***********************************") event = Event(text, user_name) self.om.save_event(event) with ExecutionContext(self.key, event, self, BuiltinConcepts.PROCESS_INPUT, text, desc=f"Evaluating '{text}'") as execution_context: user_input = self.ret(self.name, True, self.new(BuiltinConcepts.USER_INPUT, body=text, user_name=user_name)) execution_context.add_inputs(user_input=user_input) # TODO. Must be a context hint, not a return value reduce_requested = self.ret(self.name, True, self.new(BuiltinConcepts.REDUCE_REQUESTED)) ret = self.execute(execution_context, [user_input, reduce_requested], EXECUTE_STEPS) execution_context.add_values(return_values=ret) # rule management if self.enable_process_rules: ret = self.execute_rules(execution_context, ret, RULES_EVALUATE_STEPS, RULES_EXECUTE_STEPS) if self.om.is_dirty(): self.om.commit(execution_context) self.publish(execution_context, EVENT_USER_INPUT_EVALUATED) # Do not save execution contexts from process_return_values if self.enable_process_return_values: self.process_return_values(execution_context, ret) return ret def print(self, result, instructions=None): """ Print the result to output :param result: :param instructions: :return: """ self.printer_handler.print(result, instructions) def resolve(self, concept): """ Try to find a concept by its name, id, or c:: definition A new instance (using new_from_template()) is returned when it's possible :param concept: :return: """ def add_recognized_by(c, _recognized_by): c.get_hints().recognized_by = _recognized_by return c def new_instances(concepts, _recognized_by): if hasattr(concepts, "__iter__"): return [add_recognized_by(self.new_from_template(c, c.key), _recognized_by) for c in concepts] return add_recognized_by(self.new_from_template(concepts, concepts.key), _recognized_by) if concept is None: return None # ############## # PREPROCESS # ############## # if the entry is a concept token, use its values. if isinstance(concept, Token): if concept.type == TokenKind.RULE: # do not recognize rules !!! return None concept = concept.value # concept is now a tuple if isinstance(concept, str) and \ concept.startswith("c:") and \ (tmp := core.utils.unstr_concept(concept)) != (None, None): concept = tmp # ############## # PROCESS # ############## # if the entry is a tuple # concept[0] is the name # concept[1] is the id if isinstance(concept, tuple): if concept[1]: if self.is_known(found := self.get_by_id(concept[1])): instance = self.new_from_template(found, found.key) instance.get_hints().is_evaluated = True instance.get_hints().recognized_by = RECOGNIZED_BY_ID return instance elif concept[0]: if self.is_known(found := self.get_by_name(concept[0])): instances = new_instances(found, RECOGNIZED_BY_NAME) core.builtin_helpers.set_is_evaluated(instances) return instances else: return None # otherwise search in db if isinstance(concept, str): if self.is_known(found := self.get_by_name(concept)): instances = new_instances(found, RECOGNIZED_BY_NAME) core.builtin_helpers.set_is_evaluated(instances, check_nb_variables=True) return instances return None def fast_resolve(self, key, return_new=True): def add_recognized_by(c, _recognized_by): c.get_hints().recognized_by = _recognized_by return c def new_instances(concepts, _recognized_by): if hasattr(concepts, "__iter__"): return [add_recognized_by(self.new_from_template(c, c.key), _recognized_by) for c in concepts] return add_recognized_by(self.new_from_template(concepts, concepts.key), _recognized_by) if isinstance(key, Token): if key.type == TokenKind.RULE: # do not recognize rules !!! return None else: key = key.value elif isinstance(key, str) and key.startswith("c:"): key = core.utils.unstr_concept(key) if isinstance(key, tuple): if key == (None, None): return None if key[1]: concept = self.om.get(self.CONCEPTS_BY_ID_ENTRY, key[1]) recognized_by = RECOGNIZED_BY_ID else: concept = self.om.get(self.CONCEPTS_BY_NAME_ENTRY, key[0]) recognized_by = RECOGNIZED_BY_NAME else: concept = self.om.get(self.CONCEPTS_BY_NAME_ENTRY, key) recognized_by = RECOGNIZED_BY_NAME if concept is NotFound: return None return new_instances(concept, recognized_by) if return_new else concept def new(self, concept_key, **kwargs): """ Returns an instance of a new concept When the concept is supposed to be unique, returns the same instance :param concept_key: :param kwargs: :return: """ if isinstance(concept_key, tuple): concept_key, concept_id = concept_key[0], concept_key[1] elif isinstance(concept_key, Concept): concept_key, concept_id = concept_key.key, concept_key.id else: concept_id = None template = self.get_by_id(concept_id) if not concept_key else self.get_by_key(concept_key, concept_id) # manage concept not found if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \ concept_key != BuiltinConcepts.UNKNOWN_CONCEPT: return template if isinstance(template, list): # if template is a list, it means that there a multiple concepts under the same key concepts = [self.new_from_template(t, concept_key, **kwargs) for t in template] return concepts else: return self.new_from_template(template, concept_key, **kwargs) def new_from_template(self, template, key, **kwargs): # core.utils.my_debug(f"Created {template}, {key=}, {kwargs=}") # manage singleton if template.get_metadata().is_unique: return template # otherwise, create another instance concept = self.builtin_cache[key]() if key in self.builtin_cache else Concept() concept.update_from(template, update_value=False) # concept.freeze_definition_hash() if len(kwargs) == 0: return concept # update the properties, values, attributes # Not quite sure that this is the correct process order for k, v in kwargs.items(): if k in get_concept_attrs(concept): concept.set_value(k, v) elif k == "body": concept.set_value(ConceptParts.BODY, v) elif hasattr(concept, k): setattr(concept, k, v) else: return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept) # TODO : add the concept to the list of known concepts (self.instances) concept.get_hints().is_evaluated = True # because we have manually set the variables return concept def push_ontology(self, context, name, cache_only=False): try: if self.om.already_on_top(name): return self.ret(self.name, True, self.new(BuiltinConcepts.SUCCESS)) except OntologyAlreadyExists: return self.ret(self.name, False, self.new(BuiltinConcepts.ONTOLOGY_ALREADY_DEFINED, body=name)) # record sheerka and services states self.om.record_sheerka_state() for service in self.services.values(): if hasattr(service, "save_state"): service.save_state(context) if hasattr(service, "reset_state"): service.reset_state() self.om.push_ontology(name, cache_only) # Not the first time for this ontology. Update the services if name in self.om.current_sdp().load_ontologies(): self.initialize_services_deferred(context, False) self.om.save_ontologies_names() self.publish(context, EVENT_ONTOLOGY_CREATED, name) return self.ret(self.name, True, self.new(BuiltinConcepts.SUCCESS)) def pop_ontology(self, context): ontology = self.om.pop_ontology(context) self.om.reset_sheerka_state() for service in self.services.values(): if hasattr(service, "restore_state"): service.restore_state() if hasattr(service, "reset_state"): service.reset_state() self.om.save_ontologies_names() return self.ret(self.name, True, self.new(BuiltinConcepts.ONTOLOGY_REMOVED, body=ontology)) def get_ontology(self, context): self.om.record_sheerka_state() for service in self.services.values(): if hasattr(service, "save_state"): service.save_state(context) return self.om.get_ontology() def add_ontology(self, context, ontology): """ Add the previously recorded ontology on the top """ # save the state of the current ontology self.om.record_sheerka_state() for service in self.services.values(): if hasattr(service, "save_state"): service.save_state(context) # if hasattr(service, "reset_state"): # no need to do it twice # service.reset_state() self.om.add_ontology(ontology) # update sheerka with this new ontology self.om.reset_sheerka_state() for service in self.services.values(): if hasattr(service, "restore_state"): service.restore_state() if hasattr(service, "reset_state"): service.reset_state() def ret(self, who: str, status: bool, value, parents=None): """ Creates and returns a ReturnValue concept :param who: :param status: :param value: :param parents: :return: """ return ReturnValueConcept( who=who, status=status, value=value, parents=parents, concept_id=self.return_value_concept_id ) def err(self, body): return ErrorConcept(body, self.error_concept_id) def objvalue(self, obj, reduce_simple_list=False): if obj is None: return None if hasattr(obj, "get_obj_value"): return obj.get_obj_value() if not isinstance(obj, Concept): return obj if obj.body is NotInit: return obj if obj.key in BuiltinErrors: # KSI 2021-04-20 # errors should be returned as soon as they are detected return obj if reduce_simple_list and (isinstance(obj.body, list) or isinstance(obj.body, set)) and len(obj.body) == 1: body_to_use = obj.body[0] else: body_to_use = obj.body return self.objvalue(body_to_use) def objvalues(self, objs): if not (isinstance(objs, list) or self.isinstance(objs, BuiltinConcepts.LIST) or self.isinstance(objs, BuiltinConcepts.ENUMERATION)): objs = [objs] if isinstance(objs, list): return (self.objvalue(obj) for obj in objs) return (self.objvalue(obj) for obj in objs.body) def get_errors(self, context, obj, **kwargs): """ Browse obj, looking for error :param context: :param obj: :param kwargs: if defined, specialize the error :return: """ def is_error(_obj): if isinstance(_obj, ErrorObj): return True if isinstance(_obj, Concept) and _obj.get_metadata().is_builtin and _obj.key in BuiltinErrors: return True return False def inner_get_errors(_obj): if self.isinstance(_obj, BuiltinConcepts.RETURN_VALUE) and _obj.status: return [] if isinstance(_obj, (list, set, tuple)): return core.utils.flatten([inner_get_errors(o) for o in _obj]) if is_error(_obj): if isinstance(_obj, Concept) and _obj.body not in (NotInit, None): return [_obj] + inner_get_errors(_obj.body) if isinstance(_obj, ErrorObj) and hasattr(_obj, "get_error"): return [_obj] + inner_get_errors(_obj.get_error()) else: return [_obj] if self.isinstance(_obj, BuiltinConcepts.FILTERED): return inner_get_errors(_obj.reason) if isinstance(_obj, Concept) and _obj.body != NotInit: return inner_get_errors(_obj.body) return [] errors = inner_get_errors(obj) return self.filter_objects(context, [e for e in errors], **kwargs) def has_error(self, context, obj, **kwargs): errors = self.get_errors(context, obj, **kwargs) return len(errors) > 0 def get_evaluator_name(self, name): if self.evaluators_prefix is None: base_evaluator_class = core.utils.get_class("evaluators.BaseEvaluator.BaseEvaluator") self.evaluators_prefix = base_evaluator_class.PREFIX return self.evaluators_prefix + name def get_parser_name(self, name): if self.parsers_prefix is None: base_parser_class = core.utils.get_class("parsers.BaseParser.BaseParser") self.parsers_prefix = base_parser_class.PREFIX return self.parsers_prefix + name @staticmethod def is_success(obj): if isinstance(obj, bool): # quick win return obj if isinstance(obj, ReturnValueConcept): return obj.status if isinstance(obj, ErrorObj): return False # other cases ? # ... # manage internal errors if isinstance(obj, Concept) and obj._metadata.is_builtin and obj.key in BuiltinErrors: return False return bool(obj) @staticmethod def is_known(obj): if not isinstance(obj, Concept): return True return obj.key not in (None, BuiltinConcepts.UNKNOWN_CONCEPT, BuiltinConcepts.UNKNOWN_RULE) @staticmethod def isinstance(a, b): """ return true if the concept a is an instance of the concept b :param a: :param b: :return: """ if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) return a.key == b_key @staticmethod def isin(a, b): """ True if the concept is the list :param a: :param b: :return: """ if not isinstance(a, Concept): return False return a.key in b def is_sheerka(self, obj): if isinstance(obj, Concept) and obj.id == self.id: return True from sheerkapython.python_wrapper import Expando if isinstance(obj, Expando) and obj.get_name() == "sheerka": return True return False @staticmethod def get_unknown(metadata): """ Returns the concept 'UnknownConcept' for a requested id or key Note that I don't call the new() method to prevent cyclic call :param metadata: :return: """ # metadata is a list of tuple that contains the known metadata for this concept # ex : (key, 'not_found) # or # (id, invalid_id) # # the metadata can be a list, if several attributes where given # (key, 'not_found), (id, invalid_id) unknown_concept = UnknownConcept() # don't use new() for prevent circular reference unknown_concept.set_value(ConceptParts.BODY, metadata) unknown_concept.get_hints().is_evaluated = True return unknown_concept @staticmethod def get_builtins_classes_as_dict(): res = {} for c in core.utils.get_classes("core.builtin_concepts"): if issubclass(c, Concept) and c != Concept: res[c()._metadata.key] = c return res @staticmethod def init_logging(debug, loggers): def add_coloring_to_emit_ansi(fn): # add methods we need to the class def new(*args): levelno = args[1].levelno if levelno >= 50: color = '\x1b[31m' # red elif levelno >= 40: color = '\x1b[31m' # red elif levelno >= 30: color = '\x1b[33m' # yellow elif levelno >= 20: color = '\x1b[32m' # green elif levelno >= 10: color = '\x1b[35m' # pink else: color = '\x1b[0m' # normal args[1].msg = color + str(args[1].msg) + '\x1b[0m' # normal # print "after" return fn(*args) return new core.sheerka_logger.init_config(loggers) if debug: log_format = "%(asctime)s" if "show-name" in loggers: log_format += " %(name)s" log_format += " [%(levelname)s] %(message)s" log_level = logging.DEBUG else: log_format = "%(message)s" log_level = logging.INFO logging.basicConfig(format=log_format, level=log_level, handlers=[console_handler]) logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR)) # uncomment the following line to enable colors # logging.StreamHandler.emit = add_coloring_to_emit_ansi(logging.StreamHandler.emit) def test_dict(self): bag2 = { "alpha": "value4", "beta": ["item1", "item2", "item3", ] } bag = { "a": "value1", "baba": "value2", "c": "value1", "de": ["item1", "item2", "item3", ], "e": bag2 } return self.new(BuiltinConcepts.TO_DICT, body=bag) def test(self): return f"I have access to Sheerka !" def test_using_context(self, context, param): event = context.event.get_digest() return f"I have access to Sheerka ! {param=}, {event=}." def test_error(self): raise Exception("I can raise an error") def test_only_add_in_cache(self, concept: Concept): """ Adds a concept template in cache. The cache is used as a proxy before looking at sdp :param concept: :return: """ # sanity check if concept.key is None: concept.init_key() if concept.key is None: raise KeyError() self.om.add_concept(concept) return concept @staticmethod def deepdiff(a, b): from deepdiff import DeepDiff ddiff = DeepDiff(a, b, ignore_order=True) print(ddiff) return ddiff def to_profile(): sheerka = Sheerka() sheerka.initialize(save_execution_context=False, enable_process_return_values=False) event = Event("test", "kodjo") execution_context = ExecutionContext(sheerka.name, event, sheerka, BuiltinConcepts.PROCESS_INPUT, None) profile_push(execution_context) @profile(filename="profile_push") def profile_push(execution_context): for i in range(177942): execution_context.push(BuiltinConcepts.NOP, {"action": "fake"}, execution_context.sheerka.name, desc="a proper description") if __name__ == '__main__': to_profile()