from dataclasses import dataclass from core.builtin_concepts import BuiltinConcepts, ErrorConcept, ReturnValueConcept from core.concept import Concept, ConceptParts from evaluators.BaseEvaluator import OneReturnValueEvaluator from parsers.BaseParser import BaseParser from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderDuplicateKeyError import core.utils import logging log = logging.getLogger(__name__) class Sheerka(Concept): """ Main controller for the project """ CONCEPTS_ENTRY = "All_Concepts" BUILTIN_CONCEPTS_KEYS = "Builtins_Concepts" USER_CONCEPTS_KEYS = "User_Concepts" def __init__(self, debug=False): log.debug("Starting Sheerka.") super().__init__(BuiltinConcepts.SHEERKA, True, True, BuiltinConcepts.SHEERKA) # cache of the most used concepts # Note that these are only templates # They are used as a footprint for instantiation self.concepts_cache = {} # cache for builtin types. # It allow instantiation of a builtin clas self.builtin_cache = {} # a concept can be instantiated # ex: File is a concept, but File('foo.txt') is an instance # TODO: manage contexts self.instances = [] # List of the known rules by the system # ex: hello => say('hello') self.rules = [] self.sdp = None self.parsers = [] self.evaluators = [] self.debug = debug def initialize(self, root_folder=None): """ Starting Sheerka Loads the current configuration Notes that when it's the first time, it also create the needed working folders :param root_folder: root configuration folder :return: ReturnValue(Success or Error) """ try: self.init_logging() self.sdp = SheerkaDataProvider(root_folder) if self.sdp.first_time: self.sdp.set_key(self.USER_CONCEPTS_KEYS, 1000) self.initialize_builtin_concepts() self.initialize_builtin_parsers() self.initialize_builtin_evaluators() except IOError as e: return ReturnValueConcept(self, False, self.get(BuiltinConcepts.ERROR), e) return ReturnValueConcept(self, True, self) def set_id_if_needed(self, obj, is_builtin): """ Set the key for the concept if needed :param obj: :param is_builtin: :return: """ if obj.id is not None: return obj.id = self.sdp.get_next_key(self.BUILTIN_CONCEPTS_KEYS if is_builtin else self.USER_CONCEPTS_KEYS) log.debug(f"Setting id '{obj.id}' to concept '{obj.name}'.") def initialize_builtin_concepts(self): """ Initializes the builtin concepts :return: None """ log.debug("Initializing builtin concepts") builtins_classes = self.get_builtins_classes_as_dict() # this all initialization of the builtins seems to be little bit complicated # why do we need to update it from DB ? for key in BuiltinConcepts: concept = self if key == BuiltinConcepts.SHEERKA \ else builtins_classes[str(key)]() if str(key) in builtins_classes \ else Concept(key, True, False, key) if not concept.is_unique and str(key) in builtins_classes: self.builtin_cache[key] = builtins_classes[str(key)] from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept.key) if from_db is None: log.debug(f"'{concept.name}' concept is not found in db. Adding.") self.set_id_if_needed(concept, True) self.sdp.add("init", self.CONCEPTS_ENTRY, concept, use_ref=True) else: log.debug(f"Found concept '{from_db}' in db. Updating.") concept.update_from(from_db) self.add_in_cache(concept) def initialize_builtin_parsers(self): """ Init the parsers :return: """ for parser in core.utils.get_sub_classes("parsers", "parsers.BaseParser.BaseParser"): log.debug(f"Adding builtin parser '{parser.__name__}'") self.parsers.append(parser) def initialize_builtin_evaluators(self): """ Init the evaluators :return: """ for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.OneReturnValueEvaluator"): log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) for evaluator in core.utils.get_sub_classes("evaluators", "evaluators.BaseEvaluator.AllReturnValuesEvaluator"): log.debug(f"Adding builtin evaluator '{evaluator.__name__}'") self.evaluators.append(evaluator) def init_logging(self): if self.debug: log_format = "%(asctime)s %(name)s [%(levelname)s] %(message)s" log_level = logging.DEBUG else: log_format = "%(message)s" log_level = logging.INFO logging.basicConfig(format=log_format, level=log_level) def eval(self, text): evt_digest = self.sdp.save_event(Event(text)) exec_context = ExecutionContext(self.key, evt_digest, self) before_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.BEFORE_PARSING)) return_values = self.process(exec_context, [], [before_parsing]) return_values = core.utils.remove_from_list(return_values, [before_parsing]) parsing_results = self.parse(exec_context, text) return_values.extend(parsing_results) processing_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.PARSING)) return_values = self.process(exec_context, return_values, [processing_parsing]) return_values = core.utils.remove_from_list(return_values, [processing_parsing]) after_parsing = self.ret(self.eval.__name__, True, self.new(BuiltinConcepts.AFTER_PARSING)) return_values = self.process(exec_context, return_values, [after_parsing]) return_values = core.utils.remove_from_list(return_values, [after_parsing]) return return_values def expect_one(self, context, items): if not isinstance(items, list): items = [items] if len(items) == 0: return self.ret(context.who, False, self.new(BuiltinConcepts.IS_EMPTY, obj=items)) successful_results = [item for item in items if item.status] number_of_successful = len(successful_results) total_items = len(items) # remove errors when a winner is found if number_of_successful == 1: # log.debug(f"1 / {total_items} good item found.") return successful_results[0] # too many winners, which one to choose ? if number_of_successful > 1: log.debug(f"{number_of_successful} / {total_items} good items. Too many success") return self.ret(context.who, False, self.new(BuiltinConcepts.TOO_MANY_SUCCESS, obj=successful_results)) # only errors, i cannot help you log.debug(f"{total_items} items. Only errors") return self.ret(context.who, False, self.new(BuiltinConcepts.TOO_MANY_ERRORS, obj=items)) def parse(self, context, text): result = [] if log.isEnabledFor(logging.DEBUG): debug_text = "'" + text + "'" if isinstance(text, str) \ else "'" + BaseParser.get_text_from_tokens(text) + "' as tokens" log.debug(f"Parsing {debug_text}") for parser in self.parsers: p = parser() res = p.parse(context, text) if isinstance(res, list): result.extend(res) else: result.append(res) return result def process(self, context, return_values, contextual_concepts=None): log.debug("Evaluating parsing result.") # return_values must be a list if not isinstance(return_values, list): return_values = [return_values] # adds contextual concepts if contextual_concepts: return_values.extend(contextual_concepts) # group the evaluators by priority and sort them # The first one to be applied will be the one with the highest priority grouped_evaluators = {} for item in [e() for e in self.evaluators]: grouped_evaluators.setdefault(item.priority, []).append(item) sorted_priorities = sorted(grouped_evaluators.keys(), reverse=True) # process while True: simple_digest = return_values[:] # set(id(r) for r in return_values) for priority in sorted_priorities: # log.debug("Processing priority " + str(priority)) # for item in return_values: # log.debug(item) original_items = return_values[:] evaluated_items = [] to_delete = [] for evaluator in grouped_evaluators[priority]: # process evaluators that work on return value if isinstance(evaluator, OneReturnValueEvaluator): for item in original_items: if evaluator.matches(context, item): result = evaluator.eval(context, item) if result is None: continue elif isinstance(result, list): evaluated_items.extend(result) to_delete.append(item) elif isinstance(result, ReturnValueConcept): evaluated_items.append(result) to_delete.append(item) else: error = self.new(BuiltinConcepts.INVALID_RETURN_VALUE, body=result, evaluator=evaluator) evaluated_items.append(self.ret("sheerka.process", False, error, parents=[item])) to_delete.append(item) # process evaluators that work on all return values else: if evaluator.matches(context, original_items): results = evaluator.eval(context, original_items) if not isinstance(results, list): results = [results] for result in results: evaluated_items.append(result) to_delete.extend(result.parents) return_values = evaluated_items return_values.extend([item for item in original_items if item not in to_delete]) # have we done something ? to_compare = return_values[:] # set(id(r) for r in return_values) if simple_digest == to_compare: break return return_values def create_new_concept(self, context, concept): """ Adds a new concept to the system :param context: :param concept: DefConceptNode :return: digest of the new concept """ concept.init_key() # checks for duplicate concepts if self.sdp.exists(self.CONCEPTS_ENTRY, concept.key, concept.get_digest()): error = SheerkaDataProviderDuplicateKeyError(self.CONCEPTS_ENTRY + "." + concept.key, concept) return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0]) # set id before saving in db self.set_id_if_needed(concept, False) # save the new context in sdp try: self.sdp.add(context.event_digest, self.CONCEPTS_ENTRY, concept, use_ref=True) except SheerkaDataProviderDuplicateKeyError as error: return self.ret(self.create_new_concept.__name__, False, ErrorConcept(error), error.args[0]) # add in cache for quick further reference self.concepts_cache[concept.key] = concept # process the return in needed ret = self.ret(self.create_new_concept.__name__, True, self.new(BuiltinConcepts.NEW_CONCEPT, body=concept)) return ret def add_codes_to_concept(self, context, concept): """ Updates the codes of the newly created concept Basically, it runs the parsers on all parts :param concept: :param context: :return: """ for part_key in ConceptParts: source = getattr(concept, part_key.value) if source is None or source == "": continue ret_val = self.expect_one(context, self.parse(context, source)) concept.codes[part_key] = ret_val def add_in_cache(self, concept): """ Adds a concept template in cache. The cache is used as a proxy before looking at sdp :param concept: :return: """ self.concepts_cache[concept.key] = concept def get(self, concept_key): """ Tries to find a concept What is return must be used a template for another concept. You must not modify the returned concept :param concept_key: :return: """ if isinstance(concept_key, BuiltinConcepts): concept_key = str(concept_key) # first search in cache if concept_key in self.concepts_cache: return self.concepts_cache[concept_key] # else look in sdp from_db = self.sdp.get_safe(self.CONCEPTS_ENTRY, concept_key) if from_db is not None: return from_db # else return new Unknown concept # Note that I don't call the new() method, as it use get() -> cyclic call unknown_concept = Concept() template = self.concepts_cache[str(BuiltinConcepts.UNKNOWN_CONCEPT)] unknown_concept.update_from(template) unknown_concept.body = concept_key return unknown_concept def new(self, concept_key, **kwargs): """ Returns an instance of a new concept When the concept is supposed to be unique, returns the same instance :param concept_key: :param kwargs: :return: """ template = self.get(concept_key) # manage concept not found if self.isinstance(template, BuiltinConcepts.UNKNOWN_CONCEPT) and \ concept_key != BuiltinConcepts.UNKNOWN_CONCEPT: return template # manage singleton if template.is_unique: return template # otherwise, create another instance concept = self.builtin_cache[concept_key]() if concept_key in self.builtin_cache else Concept() concept.update_from(template) # update the properties for k, v in kwargs.items(): if k in concept.props: concept.set_prop(k, v) elif hasattr(concept, k): setattr(concept, k, v) else: return self.new(BuiltinConcepts.UNKNOWN_PROPERTY, body=k, concept=concept) # TODO : add the concept to the list of known concepts (self.instances) return concept def ret(self, who, status, value, message=None, parents=None): """ Creates and returns a ReturnValue concept :param who: :param status: :param value: :param message: :param parents: :return: """ return self.new( BuiltinConcepts.RETURN_VALUE, who=who, status=status, value=value, message=message, parents=parents) def isinstance(self, a, b): """ return true if the concept a is an instance of the concept b :param a: :param b: :return: """ if isinstance(a, BuiltinConcepts): # common KSI error ;-) raise SyntaxError("Remember that the first parameter of isinstance MUST be a concept") if not isinstance(a, Concept): return False b_key = b.key if isinstance(b, Concept) else str(b) # TODO : manage when a is the list of all possible b # for example, if a is a color, it will be found the entry 'All_Colors' return a.key == b_key @staticmethod def get_builtins_classes_as_dict(): res = {} for c in core.utils.get_classes("core.builtin_concepts"): if issubclass(c, Concept) and c != Concept: res[c().key] = c return res @staticmethod def get_builtin_parsers(): res = [] # modules = core.utils.get_module("parsers") # for m in modules: base_class = core.utils.get_class("parsers.BaseParser.BaseParser") for c in core.utils.get_classes_recursive("parsers"): #if issubclass(c, base_class) and c != base_class: res.append(c) return res @staticmethod def test(): return "I have access to Sheerka !" @dataclass class ExecutionContext: """ To keep track of the execution of a request """ who: object event_digest: str sheerka: Sheerka def push(self, who): return ExecutionContext(who, self.event_digest, self.sheerka)