import inspect import logging import sys from dataclasses import dataclass from operator import attrgetter from os import path from typing import Callable from caching.Cache import Cache from caching.IncCache import IncCache from common.utils import get_logger_name, get_sub_classes, import_module_and_sub_module from core.BuiltinConcepts import BuiltinConcepts from core.Event import Event from core.ExecutionContext import ContextHint, ExecutionContext, ContextActions from core.ReturnValue import ReturnValue from core.concept import Concept, ConceptMetadata from core.error import ErrorContext from ontologies.SheerkaOntologyManager import SheerkaOntologyManager from server.authentication import User EXECUTE_STEPS = [ ContextActions.BEFORE_PARSING, ContextActions.PARSING, ContextActions.AFTER_PARSING, ContextActions.BEFORE_EVALUATION, ContextActions.EVALUATION, ContextActions.AFTER_EVALUATION ] @dataclass class SheerkaConfig: """ After each execution, persist the whole executions flow as a file This file will be used by the debugger """ save_execution_context: bool = True @dataclass class SheerkaMethod: """ Wrapper to sheerka method, to indicate if it's safe to call """ name: str service: str method: Callable has_side_effect: bool def __repr__(self): return self.name def __hash__(self): return hash((self.name, self.service)) class Sheerka: OBJECTS_IDS_ENTRY = "Objects_Ids" CHICKEN_AND_EGG_CONCEPTS_ENTRY = "Chicken_And_Egg_Concepts" def __init__(self): """ Engine of the so called Sheerka """ self.name = "Sheerka" self.om: SheerkaOntologyManager = None self.config = SheerkaConfig() self.during_initialisation = False self.log = logging.getLogger(get_logger_name(__name__)) self.init_log = logging.getLogger(get_logger_name("init." + __name__)) self.services = {} # sheerka plugins self.evaluators = {} # cache for evaluators self.sheerka_methods = {} self.methods_with_context = set() # only the names, the method is defined in sheerka_methods self.global_context_hints = set() def bind_service_method(self, service_name, bound_method, can_modify_state, as_name=None, visible=True): """ Bind service method to sheerka instance for ease of use ? :param service_name: :param bound_method: :param can_modify_state: Can update the state of Sheerka => can produce side_effect :param as_name: give another name to the method :param visible: make the method visible to Sheerka :return: """ if as_name is None: as_name = bound_method.__name__ if visible: signature = inspect.signature(bound_method) if len(signature.parameters) > 0 and list(signature.parameters.keys())[0] == "context": self.methods_with_context.add(as_name) self.sheerka_methods[as_name] = SheerkaMethod(as_name, service_name, bound_method, can_modify_state) setattr(self, bound_method.__name__, bound_method) def initialize(self, root_folder: str = None, **kwargs): """ Starting Sheerka Loads the current configuration Notes that when it's the first time, it also creates the needed working folders :param root_folder: root configuration folder :return: ReturnValue(Success or Error) """ if root_folder is None: root_folder = path.abspath(path.join(path.expanduser("~"), ".sheerka")) self.initialize_logging(False, root_folder) self.config.save_execution_context = kwargs.get("save_execution_context", self.config.save_execution_context) try: self.init_log.info("Starting Sheerka") self.during_initialisation = True # from sheerkapickle.sheerka_handlers import initialize_pickle_handlers # initialize_pickle_handlers() self.om = SheerkaOntologyManager(self, root_folder) # self.builtin_cache, self.builtin_cache_by_class_name = self.get_builtins_classes_as_dict() self.initialize_bind_methods() self.initialize_caching() self.initialize_evaluators() self.initialize_services() # self.initialize_builtin_evaluators() # self.om.init_subscriptions() event = Event("Initializing Sheerka.", user_id=self.name) self.om.save_event(event) with ExecutionContext(self.name, event, self, ContextActions.INIT_SHEERKA, None, desc="Initializing Sheerka.") as exec_context: if self.om.current_sdp().first_time: self.first_time_initialisation(exec_context) self.initialize_services_deferred(exec_context, self.om.current_sdp().first_time) res = ReturnValue(self.name, True, self.get_startup_config()) exec_context.add_values(return_values=res) if self.om.is_dirty(): self.om.commit(exec_context) if self.config.save_execution_context: self.om.save_execution_context(exec_context, is_admin=True) # append the other ontologies if needed self.om.freeze() self.initialize_ontologies(exec_context) # self.init_log.debug(f"Sheerka successfully initialized") except IOError as e: res = ReturnValue(self.name, False, ErrorContext(self.name, exec_context, e)) finally: self.during_initialisation = False return res def initialize_bind_methods(self): """ Add some methods to the list of available methods :return: :rtype: """ self.bind_service_method(self.name, self.echo, False) @staticmethod def initialize_logging(is_debug, root_folder): if is_debug: # log_format = "%(asctime)s %(name)s" log_format = "[%(levelname)s] [%(name)s]" log_format += " %(message)s" log_level = logging.DEBUG else: log_format = "%(message)s" log_level = logging.INFO logging.basicConfig(format=log_format, level=log_level, handlers=[logging.StreamHandler(sys.stdout)]) logging.addLevelName(logging.ERROR, f"\033[1;41m%s\033[1;0m{logging.getLevelName(logging.ERROR)}") def initialize_ontologies(self, context): ontologies = self.om.current_sdp().load_ontologies() if not ontologies: return for ontology_name in list(reversed(ontologies))[1:]: self.om.push_ontology(ontology_name, False) # self.initialize_services_deferred(context, False) def first_time_initialisation(self, context): pass # self.record_var(context, self.name, "save_execution_context", self.save_execution_context) def initialize_caching(self): cache = IncCache().auto_configure(self.OBJECTS_IDS_ENTRY) self.om.register_cache(self.OBJECTS_IDS_ENTRY, cache) cache = Cache().auto_configure(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY) self.om.register_cache(self.CHICKEN_AND_EGG_CONCEPTS_ENTRY, cache, persist=False) def initialize_services(self): """ Introspect to find services and bind them :return: """ self.init_log.info("Initializing services") import_module_and_sub_module('services') base_class = "services.BaseService.BaseService" services = [service(self) for service in get_sub_classes("services", base_class)] services.sort(key=attrgetter("order")) for service in services: if hasattr(service, "initialize"): service.initialize() self.services[service.NAME] = service self.init_log.info(f"{len(services)} service(s) found.") def initialize_services_deferred(self, context, is_first_time): """ Initialize part of services that may take some time or that need the execution context :return: """ self.init_log.debug(f"Initializing services (deferred, {is_first_time=})") for service in self.services.values(): if hasattr(service, "initialize_deferred"): service.initialize_deferred(context, is_first_time) def initialize_evaluators(self): self.init_log.info("Initializing evaluators") base_class1 = "evaluators.base_evaluator.OneReturnValueEvaluator" base_class2 = "evaluators.base_evaluator.AllReturnValuesEvaluator" import_module_and_sub_module('evaluators') evaluators = [evaluator() for evaluator in get_sub_classes("evaluators", base_class1)] + \ [evaluator() for evaluator in get_sub_classes("evaluators", base_class2)] self.evaluators = {e.NAME: e for e in evaluators} self.init_log.info(f"{len(evaluators)} evaluator(s) found.") def bind_services_methods(self): # init methods # self.bind_service_method(self.name, self.test, False) # self.bind_service_method(self.name, self.test_using_context, False) # self.bind_service_method(self.name, self.test_dict, False) # self.bind_service_method(self.name, self.test_error, False) # self.bind_service_method(self.name, self.is_sheerka, False) # self.bind_service_method(self.name, self.objvalue, False) pass def get_startup_config(self): """ Return a dictionary with current configuration, used for initialization :return: :rtype: """ return { "config": self.config.__dict__ } def publish(self, context, topic, data=None): """ To be removed as it must be part of the EventManager service :param context: :type context: :param topic: :type topic: :param data: :type data: :return: :rtype: """ pass def evaluate_user_input(self, command: str, user: User): self.log.info("Processing '%s' from '%s'", command, user.email) event = Event(command, user_id=user.email) self.om.save_event(event) with ExecutionContext(user.email, event, self, ContextActions.EVALUATE_USER_INPUT, command, desc=f"Evaluating '{command}'", global_hints=self.global_context_hints.copy()) as exec_context: user_input = ReturnValue(self.name, True, self.newn(BuiltinConcepts.USER_INPUT, command=command)) exec_context.private_hints.add(ContextHint.REDUCE_CONCEPTS) # KSI : 2023-04-30 # Il me manque le execute et toute la classe SheerkaProcessUserInput exec_context.add_inputs(user_input=user_input) ret = self.execute(exec_context, [user_input], EXECUTE_STEPS) exec_context.add_values(return_values=ret) if self.om.is_dirty(): self.om.commit(exec_context) return ret def isinstance(self, a, b): """ Returns true if 'a' is a concept of type 'b' Note that this function can be moved into ConceptManager I keep it here for quick access :param a: :type a: :param b: :type b: :return: :rtype: """ if not isinstance(a, Concept): return False if isinstance(b, (Concept, ConceptMetadata)): return a.id == b.id if b.startswith("c:#"): return a.id == b[3:-1] return a.key == b def echo(self, msg): """ test function :param msg: :type msg: :return: :rtype: """ return msg