from dataclasses import dataclass from cache.FastCache import FastCache from cache.ListIfNeededCache import ListIfNeededCache from core.builtin_concepts import BuiltinConcepts from core.concept import Concept from core.global_symbols import EVENT_CONTEXT_DISPOSED, NotFound from core.sheerka.services.sheerka_service import BaseService, ServiceObj @dataclass class MemoryObject(ServiceObj): timestamp: float obj: object def __eq__(self, other): if not isinstance(other, MemoryObject): return False return self.obj == other.obj and self.event_id == other.event_id def __hash__(self): return hash((self.event_id, self.obj)) def __repr__(self): return f"MemoryObject({self.obj}, timestamp={self.timestamp})" class SheerkaMemory(BaseService): NAME = "Memory" GLOBAL = "global" OBJECTS_ENTRY = "Memory:Objects" def __init__(self, sheerka): super().__init__(sheerka, order=13) self.short_term_objects = FastCache() self.registration = {} self.enable_memory_registration = True def initialize(self): self.sheerka.bind_service_method(self.NAME, self.get_from_short_term_memory, False, visible=False) self.sheerka.bind_service_method(self.NAME, self.get_all_short_term_memory, False, visible=False) self.sheerka.bind_service_method(self.NAME, self.add_to_short_term_memory, True, visible=False) self.sheerka.bind_service_method(self.NAME, self.remove_context, True, as_name="clear_short_term_memory", visible=False) self.sheerka.bind_service_method(self.NAME, self.add_to_memory, True) self.sheerka.bind_service_method(self.NAME, self.add_many_to_short_term_memory, True, visible=False) self.sheerka.bind_service_method(self.NAME, self.get_from_memory, False) self.sheerka.bind_service_method(self.NAME, self.get_last_from_memory, False) self.sheerka.bind_service_method(self.NAME, self.register_object, True, visible=False) self.sheerka.bind_service_method(self.NAME, self.unregister_object, True, visible=False) self.sheerka.bind_service_method(self.NAME, self.commit_registered_objects, True, visible=False) self.sheerka.bind_service_method(self.NAME, self.memory, False) self.sheerka.bind_service_method(self.NAME, self.mem, False) cache = ListIfNeededCache().auto_configure(self.OBJECTS_ENTRY) self.sheerka.om.register_cache(self.OBJECTS_ENTRY, cache, persist=True, use_ref=True) self.sheerka.subscribe(EVENT_CONTEXT_DISPOSED, self.remove_context) def reset(self): self.short_term_objects.clear() self.sheerka.om.clear(self.OBJECTS_ENTRY) def reset_state(self): self.short_term_objects.clear() self.registration.clear() def get_from_short_term_memory(self, context, key): while True: try: id_to_use = context.id if context else self.GLOBAL return self.short_term_objects.cache[id_to_use][key] except KeyError: if context is None: return NotFound context = context.get_parent() def get_all_short_term_memory(self, context, recursive=False): id_to_use = context.id if context else self.GLOBAL if not recursive: return self.short_term_objects.get(id_to_use) all_vars = {} while True: try: all_vars.update(self.short_term_objects.cache[id_to_use]) except KeyError: pass if id_to_use == self.GLOBAL: break else: context = context.get_parent() id_to_use = context.id if context else self.GLOBAL return all_vars def add_to_short_term_memory(self, context, key, value): if context: context.stm = True id_to_use = context.id else: id_to_use = SheerkaMemory.GLOBAL if id_to_use in self.short_term_objects.cache: self.short_term_objects.cache[id_to_use][key] = value else: self.short_term_objects.put(id_to_use, {key: value}) def add_many_to_short_term_memory(self, context, bag): context.stm = True self.short_term_objects.put(context.id if context else self.GLOBAL, bag) def remove_context(self, context): try: del self.short_term_objects.cache[context.id] except KeyError: pass def add_to_memory(self, context, key, concept): """ Adds an object to memory :param context: :param key: :param concept: :return: """ last = self.sheerka.om.get(SheerkaMemory.OBJECTS_ENTRY, key) if last is NotFound: self.sheerka.om.put(SheerkaMemory.OBJECTS_ENTRY, key, MemoryObject(context.event.get_digest(), context.event.date.timestamp(), concept)) return if not isinstance(last, list) and last.obj == concept: # replace with the new one self.sheerka.om.delete(SheerkaMemory.OBJECTS_ENTRY, key, last) self.sheerka.om.put(SheerkaMemory.OBJECTS_ENTRY, key, MemoryObject(context.event.get_digest(), context.event.date.timestamp(), concept)) return if isinstance(last, list) and last[-1].obj == concept: # replace with the new one self.sheerka.om.delete(SheerkaMemory.OBJECTS_ENTRY, key, last[-1]) self.sheerka.om.put(SheerkaMemory.OBJECTS_ENTRY, key, MemoryObject(context.event.get_digest(), context.event.date.timestamp(), concept)) return # append the new one self.sheerka.om.put(SheerkaMemory.OBJECTS_ENTRY, key, MemoryObject(context.event.get_digest(), context.event.date.timestamp(), concept)) def get_from_memory(self, context, key): """" """ return self.sheerka.om.get(SheerkaMemory.OBJECTS_ENTRY, key) def get_last_from_memory(self, context, key): """ Return an object from memory When there are multiple items, returns the last one """ res = self.sheerka.om.get(SheerkaMemory.OBJECTS_ENTRY, key) if res is NotFound: return res if isinstance(res, list): return res[-1] return res def register_object(self, context, key, concept): """ Before adding memory_objects to memory, they first need to be registered More: We don't want to add all evaluated concept into memory (because some of them may be ref to concept already in memory) So we first register them, and add the end of sheerka.evaluate_user_input() all remaining registered concepts will be added to memory :param context: :param key: :param concept: :return: """ if not self.enable_memory_registration or self.sheerka.during_initialisation or self.sheerka.during_restore: return self.registration[key] = concept def unregister_object(self, context, key): """ To indicate that key is no longer to be remembered :param context: :param key: :return: """ try: del self.registration[key] except KeyError: pass def commit_registered_objects(self, context): """ Adds all registered memory_objects :param context: :return: """ for k, v in self.registration.items(): self.add_to_memory(context, k, v) self.registration.clear() def memory(self, context, name): """ Get the list of all memory_objects in memory :param context: :param name: :return: """ self.enable_memory_registration = False try: name_to_use = name.name if isinstance(name, Concept) else name self.unregister_object(context, name_to_use) # first try direct access obj = self.get_last_from_memory(context, name_to_use) if obj is not NotFound: return obj.obj # only filter if it's a valid predicate from parsers.BaseExpressionParser import VariableNode, NameExprNode parsed_ret_val = self.sheerka.parse_expression(context, name_to_use, auto_compile=False) if parsed_ret_val.status and not isinstance(parsed_ret_val.body.body, (VariableNode, NameExprNode)): all_objects = self.sheerka.om.list(SheerkaMemory.OBJECTS_ENTRY) # not always a list of list all_objects_copy = [] # to transform into list of list for obj in all_objects: if isinstance(obj, list): all_objects_copy.append(obj.copy()) else: all_objects_copy.append([obj]) while len(all_objects_copy) > 0: current_list = [] temp = [] for obj in all_objects_copy: current_list.append(obj.pop(-1)) if len(obj) > 0: temp.append(obj) all_objects_copy = temp # list constructed with the last item of each item current_list = sorted(current_list, key=lambda o: o.timestamp, reverse=True) current_objects = [o.obj for o in current_list] res = self.sheerka.filter_objects(context, current_objects, predicate=name_to_use) if len(res) > 0: return res[0] # only the first, as it should have a better timestamp return self.sheerka.new(BuiltinConcepts.NOT_FOUND, body={"#name": name_to_use}) finally: self.enable_memory_registration = True def mem(self): keys = sorted([k for k in self.sheerka.om.list(SheerkaMemory.OBJECTS_ENTRY)]) return {"keys": keys, "len": len(keys)}