# import hashlib # import json # import zlib # import time # from dataclasses import dataclass # from datetime import datetime, date # # from core.sheerka_logger import get_logger # from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO # from sdp.sheerkaSerializer import Serializer, SerializerContext # # # def json_default_converter(o): # """ # Default formatter for json # It's used when the json serializer does not know # how to serialise a type # :param o: # :return: # """ # if isinstance(o, (date, datetime)): # return o.isoformat() # # if isinstance(o, SheerkaDataProviderRef): # return f"##XREF##:{o.target}" # # # class Event(object): # """ # Class that represents something that modifies the state of the system # """ # # def __init__(self, message="", user="", date=datetime.now(), parents=None): # self.version = 1 # self.user = user # self.date = date # self.message = message # self.parents = parents # self._digest = None # # def __str__(self): # return f"{self.date.strftime('%d/%m/%Y %H:%M:%S')} {self.message}" # # def __repr__(self): # return f"{self.get_digest()[:12]} {self.message}" # # def get_digest(self): # """ # Returns the digest of the event # :return: hexa form of the sha256 # """ # # if self._digest: # return self._digest # # if self.message == "" and self.user == "": # self._digest = "xxx" # to speed unit tests # return self._digest # # if not isinstance(self.message, str): # raise NotImplementedError # # to_hash = f"Event:{self.user}{self.date}{self.message}{self.parents}".encode("utf-8") # self._digest = hashlib.sha256(to_hash).hexdigest() # return self._digest # # def to_dict(self): # return self.__dict__ # # def from_dict(self, as_dict): # self.user = as_dict["user"] # self.date = datetime.fromisoformat(as_dict["date"]) # self.message = as_dict["message"] # self.parents = as_dict["parents"] # self._digest = as_dict["_digest"] # freeze the digest # # # class ObjToUpdate: # """ # Internal key value class; # You give it an obj, and it tries to figure out what is the key of the obj # Note that you can force the key if you want # It was first create to make the difference between an object that has a key and {key, value} # """ # # def __init__(self, obj, key=None, digest=None): # self.obj = obj # self.has_key = None # self.has_digest = None # self._key = None # self._digest = None # if key is not None: # self.set_key(key) # if digest is not None: # self.set_digest(digest) # # def get_key(self): # if self.has_key is None: # key = SheerkaDataProvider.get_obj_key(self.obj) # if key is None: # self.has_key = False # return None # else: # self.has_key = True # self._key = key # return key # elif not self.has_key: # return None # else: # return self._key # # def get_digest(self): # if self.has_digest is None: # digest = SheerkaDataProvider.get_obj_digest(self.obj) # if digest is None: # self.has_digest = False # return None # else: # self.has_digest = True # self._digest = digest # return digest # elif not self.has_digest: # return None # else: # return self._digest # # def set_digest(self, digest): # self.has_digest = True # self._digest = digest # # def set_key(self, key): # self.has_key = True # self._key = key # # # class State: # """ # Class that represents the state of the system (dictionary of all known entries) # """ # # def __init__(self): # self.version = 1 # self.date = None # self.parents = [] # self.events = [] # self.data = {} # # @staticmethod # def check_duplicate(items, obj: ObjToUpdate, key): # digest = obj.get_digest() # if digest is None: # return # # if not hasattr(items, "__iter__"): # items = [items] # # for item in items: # item_digest = SheerkaDataProvider.get_obj_digest(item) # if item_digest == digest: # raise SheerkaDataProviderDuplicateKeyError(key, obj.obj) # # def update(self, entry, obj: ObjToUpdate, append=True): # """ # adds obj to entry # :param entry: # :param obj: # :param append: if True, duplicate keys will create lists # :return: # """ # obj_to_use = {obj.get_key(): obj.obj} if obj.has_key else obj.obj # # if entry not in self.data: # self.data[entry] = obj_to_use # # elif not append: # if isinstance(obj_to_use, dict): # self.data[entry].update(obj_to_use) # else: # self.data[entry] = obj_to_use # # elif isinstance(self.data[entry], list): # self.check_duplicate(self.data[entry], obj, entry) # self.data[entry].append(obj.obj) # # elif isinstance(obj_to_use, dict): # for k in obj_to_use: # if k not in self.data[entry]: # self.data[entry][k] = obj_to_use[k] # elif isinstance(self.data[entry][k], list): # self.check_duplicate(self.data[entry][k], obj, entry + "." + k) # self.data[entry][k].append(obj_to_use[k]) # else: # self.check_duplicate(self.data[entry][k], obj, entry + "." + k) # self.data[entry][k] = [self.data[entry][k], obj_to_use[k]] # # elif isinstance(self.data[entry], dict): # raise SheerkaDataProviderError(f"Cannot found key on '{obj.obj}' while all other elements have.", obj.obj) # # else: # self.check_duplicate(self.data[entry], obj, entry) # self.data[entry] = [self.data[entry], obj_to_use] # # def modify(self, entry, key, obj, obj_key): # # if the key changes, make sure to remove the previous entry # append = False # if obj_key != key: # self.remove(entry, lambda k, o: k == key) # modify from on object to another # append = True # # self.update(entry, ObjToUpdate(obj, obj_key), append=append) # # def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed): # found = False # to_remove = None # new_digest = None # # def _get_item_origin(o): # if hasattr(o, Serializer.ORIGIN): # return getattr(o, Serializer.ORIGIN) # # if isinstance(o, dict) and Serializer.ORIGIN in o: # return o[Serializer.ORIGIN] # # if hasattr(o, "get_digest"): # return o.get_digest() # # if isinstance(o, str): # return o # # return None # # for i in range(len(self.data[entry][key])): # item, is_ref = load_ref_if_needed(self.data[entry][key][i]) # item_origin = _get_item_origin(item) # if item_origin is None: # continue # if item_origin == obj_origin: # obj = save_ref_if_needed(is_ref, obj) # if is_ref: # new_digest = obj[len(SheerkaDataProvider.REF_PREFIX):] # if obj_key == key: # self.data[entry][key][i] = obj # else: # to_remove = i # self.update(entry, ObjToUpdate(obj, obj_key), append=True) # found = True # break # # if not found: # raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj)) # # if to_remove is not None: # del self.data[entry][key][to_remove] # # return new_digest # # def remove(self, entry, filter): # if filter is None: # del (self.data[entry]) # # elif isinstance(self.data[entry], dict): # keys_to_remove = [] # for key, element in self.data[entry].items(): # if filter(key, element): # keys_to_remove.append(key) # for key in keys_to_remove: # del (self.data[entry][key]) # # elif not isinstance(self.data[entry], list): # if filter(self.data[entry]): # del (self.data[entry]) # # else: # for element in self.data[entry]: # if filter(element): # self.data[entry].remove(element) # # def get_digest(self): # as_json = json.dumps(self.__dict__, default=json_default_converter) # return hashlib.sha256(as_json.encode("utf-8")).hexdigest() # # def contains(self, entry, key): # """ # if key is None, returns True if entry exists # if key has a value # returns True if entry is an dict and contains key # :param entry: # :param key: # :return: # """ # if entry not in self.data: # return False # if key is None: # return entry in self.data # if not isinstance(self.data[entry], dict): # return False # return key in self.data[entry] # # # class SheerkaDataProviderError(Exception): # def __init__(self, message, obj): # Exception.__init__(self, message) # self.obj = obj # # # class SheerkaDataProviderDuplicateKeyError(Exception): # def __init__(self, key, obj): # Exception.__init__(self, "Duplicate object.") # self.key = key # self.obj = obj # # # @dataclass # class SheerkaDataProviderResult: # """ # Object that is returned after adding, setting or modifying an entry # """ # obj: object # obj that was given to store/modify # entry: str # entry where the object is put # key: str # key to use to retrieve the object # digest: str # digest used to store the reference # already_exists: bool = False # the same object was already persisted # # # @dataclass # class SheerkaDataProviderRef: # """ # Object that tells where an object is store (target is the digest of the reference) # """ # key: str # key of the object # target: str # digest of the reference # original_target: str = None # when the object is modified, previous digest # # def get_digest(self): # return self.original_target # # def get_key(self): # return self.key # # # class SheerkaDataProvider: # """Manages the state of the system""" # # EventFolder = "events" # StateFolder = "state" # ObjectsFolder = "objects" # CacheFolder = "cache" # HeadFile = "HEAD" # LastEventFile = "LAST_EVENT" # KeysFile = "keys" # REF_PREFIX = "##REF##:" # # def __init__(self, root=None, sheerka=None): # self.log = get_logger(__name__) # self.init_log = get_logger("init." + __name__) # self.init_log.debug("Initializing sdp.") # # self.sheerka = sheerka # self.io = SheerkaDataProviderIO.get(root) # self.first_time = self.io.first_time # # self.serializer = Serializer() # # @staticmethod # def get_obj_key(obj): # """ # Tries to find the key of an object # Look for .key, .get_key() # :param obj: # :return: String version of that is found, None otherwise # """ # return str(obj.get_key()) if hasattr(obj, "get_key") \ # else str(obj.key) if hasattr(obj, "key") \ # else None # # @staticmethod # def get_obj_digest(obj): # """ # Tries to find the key of an object # Look for .digest, .get_digest() # :param obj: # :return: digest, None otherwise # """ # if isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX): # return obj[len(SheerkaDataProvider.REF_PREFIX):] # # return obj.digest if hasattr(obj, "digest") \ # else obj.get_digest() if hasattr(obj, "get_digest") \ # else None # # @staticmethod # def get_obj_origin(obj): # """ # Get the digest used to save obj if set # """ # if isinstance(obj, dict) and Serializer.ORIGIN in obj: # return obj[Serializer.ORIGIN] # # if hasattr(obj, Serializer.ORIGIN): # return getattr(obj, Serializer.ORIGIN) # # if isinstance(obj, SheerkaDataProviderRef): # return obj.original_target # # return None # # @staticmethod # def get_stream_digest(stream): # sha256_hash = hashlib.sha256() # for byte_block in iter(lambda: stream.read(4096), b""): # sha256_hash.update(byte_block) # # stream.seek(0) # return sha256_hash.hexdigest() # # @staticmethod # def is_reference(obj): # return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX) # # def reset(self): # self.first_time = self.io.first_time # if hasattr(self.io, "reset"): # self.io.reset() # # def add(self, event_digest: str, entry, obj, allow_multiple=True, use_ref=False): # """ # Adds obj to the entry 'entry' # :param event_digest: digest of the event that triggers the modification of the state # :param entry: entry of the state to update # :param obj: obj to insert or add # :param allow_multiple: if set to true, the same key can be added several times. # All entries will be put in a list # :param use_ref: if True the actual object is saved under 'objects' folder, # only a reference is saved in the state # :return: (entry, key) to retrieve the object # """ # # original_obj = obj.copy() if isinstance(obj, dict) else obj # # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})") # # if not isinstance(obj, ObjToUpdate): # obj = ObjToUpdate(obj) # # # check uniqueness, cannot add the same key twice if allow_multiple == False # key = obj.get_key() # self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found") # if not allow_multiple: # if isinstance(obj.obj, dict): # for k in obj.obj: # if state.contains(entry, k): # raise IndexError(f"{entry}.{k}") # else: # if state.contains(entry, key): # raise IndexError(f"{entry}.{key}" if key else entry) # # state.parents = [] if snapshot is None else [snapshot] # state.events = [event_digest] # state.date = datetime.now() # # if use_ref: # obj.set_digest(self.save_obj(obj.obj)) # obj.obj = self.REF_PREFIX + obj.get_digest() # # state.update(entry, obj) # # new_snapshot = self.save_state(state) # self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) # return SheerkaDataProviderResult(original_obj, entry, obj.get_key(), obj.get_digest()) # # def add_with_auto_key(self, event_digest: str, entry, obj): # """ # Add obj to entry. An autogenerated key created for obj # :param event_digest: # :param entry: # :param obj: # :return: # """ # # original_obj = obj.copy() if isinstance(obj, dict) else obj # # next_key = self.get_next_key(entry) # if hasattr(obj, "set_key"): # obj.set_key(next_key) # res = self.add(event_digest, entry, ObjToUpdate(obj, next_key)) # return SheerkaDataProviderResult(original_obj, res.entry, res.key, res.digest) # # def add_unique(self, event_digest: str, entry, obj): # """Add an entry and make sure it's unique""" # # original_obj = obj.copy() if isinstance(obj, dict) else obj # # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # state.parents = [] if snapshot is None else [snapshot] # state.events = [event_digest] # state.date = datetime.now() # if entry not in state.data: # state.data[entry] = {obj} # already_exist = False # else: # already_exist = obj in state.data[entry] # if not already_exist: # state.data[entry].add(obj) # # new_snapshot = self.save_state(state) # self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) # return SheerkaDataProviderResult( # original_obj, # entry, # None, # None, # already_exist) # # def set(self, event_digest, entry, obj, use_ref=False, is_ref=False): # """ # Add or replace an entry. The entry is reinitialized. # If the previous value was dict, all keys are lost # :param event_digest: # :param entry: # :param obj: # :param use_ref: Do not save obj in State (save it under objects), use_ref in State # :param is_ref: obj is supposed to be a reference # :return: # """ # # original_obj = obj.copy() if isinstance(obj, dict) else obj # # if use_ref and is_ref: # raise SheerkaDataProviderError("Cannot use use_ref and is_ref at the same time", None) # # if is_ref and not isinstance(obj, dict): # raise SheerkaDataProviderError("is_ref can only be used with dictionaries", obj) # # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # state.parents = [] if snapshot is None else [snapshot] # state.events = [event_digest] # state.date = datetime.now() # # key = self.get_obj_key(obj) # obj = self.save_ref_if_needed(use_ref, obj) # # if is_ref: # for k, v in obj.items(): # obj[k] = self.REF_PREFIX + v # # state.data[entry] = obj if key is None else {key: obj} # # new_snapshot = self.save_state(state) # self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) # return SheerkaDataProviderResult(original_obj, entry, key, self.get_obj_digest(obj)) # # def modify(self, event_digest, entry, key, obj): # """ # Replace an element # If the key is not provided, has the same effect than set eg, the entry is reset # :param event_digest: # :param entry: # :param key: key of the object to update # :param obj: new data # :return: # """ # # original_obj = obj.copy() if isinstance(obj, dict) else obj # # if key is None: # raise SheerkaDataProviderError("Key is mandatory.", None) # # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # if entry not in state.data: # raise IndexError(entry) # # if key is not None and key not in state.data[entry]: # raise IndexError(f"{entry}.{key}") # # state.parents = [] if snapshot is None else [snapshot] # state.events = [event_digest] # state.date = datetime.now() # # # Gets obj original key, it will help to know if the key has changed # obj_key = self.get_obj_key(obj) or key # digest = None # # if isinstance(state.data[entry][key], list): # obj_origin = self.get_obj_origin(obj) # if obj_origin is None: # raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj)) # # digest = state.modify_in_list( # entry, # key, # obj, # obj_key, # obj_origin, # self.load_ref_if_needed, # self.save_ref_if_needed) # # else: # was_saved_as_reference = self.is_reference(state.data[entry][key]) # if was_saved_as_reference: # obj = self.save_ref_if_needed(True, obj) # digest = self.get_obj_digest(obj) # # state.modify(entry, key, obj, obj_key) # # new_snapshot = self.save_state(state) # self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) # return SheerkaDataProviderResult(original_obj, entry, obj_key, digest) # # def list(self, entry, filter=None): # """ # Lists elements of entry 'entry' # :param entry: name of the entry to list # :param filter: filter to use # :return: list of elements # """ # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # if entry not in state.data: # return [] # # elements = state.data[entry] # # if isinstance(elements, dict): # # manage when elements have a key # filter_to_use = (lambda k, o: True) if filter is None else filter # for key, element in elements.items(): # if filter_to_use(key, element): # if isinstance(element, list): # yield [self.load_ref_if_needed(e)[0] for e in element] # else: # yield self.load_ref_if_needed(element)[0] # else: # # manage when no key is defined for the elements # if not isinstance(elements, list) and not isinstance(elements, set): # elements = [elements] # # filter_to_use = (lambda o: True) if filter is None else filter # for element in elements: # if filter_to_use(element): # yield self.load_ref_if_needed(element)[0] # # def remove(self, event_digest, entry, filter=None, silent_remove=True): # """ # Removes elements under the entry 'entry' # :param event_digest: event that triggers the deletion # :param entry: # :param filter: filter to use # :param silent_remove: Do not throw exception if entry does not exist # :return: new sha256 of the state # TODO: Remove by key # """ # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # if entry not in state.data: # if silent_remove: # return snapshot # else: # raise IndexError(entry) # # state.parents = [] if snapshot is None else [snapshot] # state.events = [event_digest] # state.date = datetime.now() # state.remove(entry, filter) # # new_snapshot = self.save_state(state) # self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) # return new_snapshot # # def get(self, entry, key=None, load_origin=True): # """ # Retrieve an element by its key # :param entry: # :param key: # :param load_origin: if True, adds the origin (parent digest) to the object # :return: # """ # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # if entry not in state.data: # raise IndexError(entry) # # if key is not None and key not in state.data[entry]: # raise IndexError(f"{entry}.{key}") # # item = state.data[entry] if key is None else state.data[entry][key] # if isinstance(item, list): # return [self.load_ref_if_needed(i, load_origin)[0] for i in item] # # return self.load_ref_if_needed(item, load_origin)[0] # # def get_safe(self, entry, key=None, load_origin=True): # """ # Retrieve an element by its key. Return None if the element does not exist # :param entry: # :param key: # :param load_origin: if True, adds the origin (parent digest) to the object # :return: # """ # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # if entry not in state.data: # return None # # if key is not None and key not in state.data[entry]: # return None # # item = state.data[entry] if key is None else state.data[entry][key] # if isinstance(item, list): # return [self.load_ref_if_needed(i, load_origin)[0] for i in item] # # return self.load_ref_if_needed(item, load_origin)[0] # # def get_ref(self, entry, key=None): # """ # Returns the reference of an object if the object exists # This function allows to retrieve obj.##origin## without loading the object # :param entry: # :param key: # :return: # """ # # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # # if entry not in state.data: # raise IndexError(entry) # # if key is not None and key not in state.data[entry]: # raise IndexError(f"{entry}.{key}") # # item = state.data[entry] if key is None else state.data[entry][key] # if isinstance(item, list): # res = [] # for element in item: # if not self.is_reference(element): # raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") # res.append(self.get_obj_digest(element)) # return res # # if not self.is_reference(item): # raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") # # return self.get_obj_digest(item) # # def exists(self, entry, key=None, digest=None): # """ # Returns true if the entry is defined # :param key: # :param entry: # :param digest: digest of the object, when several entries share the same key # :return: # """ # snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(snapshot) # exist = entry in state.data # # if not exist or key is None: # return exist # # items = state.data[entry] # exist = key in items # if not exist or digest is None: # return exist # # items = items[key] # if not isinstance(items, list): # items = [items] # # for item in items: # item_digest = SheerkaDataProvider.get_obj_digest(item) # if item_digest == digest: # return True # # return False # # def save_event(self, event: Event): # """ # return an event, given its digest # :param event: # :return: digest of the event # """ # parent = self.get_snapshot(SheerkaDataProvider.LastEventFile) # event.parents = [parent] if parent else None # digest = event.get_digest() # must be call after setting the parents # # target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) # if self.io.exists(target_path): # return digest # # self.io.write_binary(target_path, self.serializer.serialize(event, None).read()) # self.set_snapshot(SheerkaDataProvider.LastEventFile, digest) # # return digest # # def load_event(self, digest=None): # """ # return an event, given its digest # :param digest: # :return: # """ # digest = digest or self.get_snapshot(SheerkaDataProvider.LastEventFile) # if digest is None: # return None # # target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) # # with self.io.open(target_path, "rb") as f: # return self.serializer.deserialize(f, None) # # def load_events(self, page_size, start=0): # """ # Load multiple events in the same command # :param start: # :param page_size: # :return: # """ # # digest = None # if start: # for i in range(start): # event = self.load_event(digest) # if event is None or event.parents is None: # return # digest = event.parents[0] # # count = 0 # while count < page_size or page_size <= 0: # event = self.load_event(digest) # if event is None: # return # # yield event # # if event.parents is None: # return # # digest = event.parents[0] # count += 1 # # def get_result_file_path(self, digest, is_admin): # ext = "_admin_result" if is_admin else "_result" # return self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) + ext # # def has_result(self, digest, is_admin=False): # """ # Check is a result file was created for a specific event # :param digest: # :param is_admin: True is the result is an internal admin result file # :return: # """ # target_path = self.get_result_file_path(digest, is_admin) # return self.io.exists(target_path) # # def save_result(self, execution_context, is_admin=False): # """ # Save the execution context associated with an event # To make a long story short, # for every single user input, there is an event (which is the first thing that is created) # and a result (the ExecutionContext created by sheerka.evaluate_user_input() # :param execution_context: # :param is_admin: True is the result is an internal admin result file # :return: # """ # start = time.time() # message = execution_context.event.message # digest = execution_context.event.get_digest() # self.log.debug(f"Saving execution context. digest={digest}, message={message}") # target_path = self.get_result_file_path(digest, is_admin) # if self.io.exists(target_path): # return digest # # context = SerializerContext(sheerka=self.sheerka) # length = self.io.write_binary(target_path, self.serializer.serialize(execution_context, context).read()) # elapsed = time.time() - start # self.log.debug(f"Saved execution context. message={message}, length={length}, elapsed={elapsed}") # return digest # # def load_result(self, digest, is_admin=False): # """ # Load and deserialize a result file # :param digest: # :param is_admin: True is the result is an internal admin result file # :return: # """ # target_path = self.get_result_file_path(digest, is_admin) # # with self.io.open(target_path, "rb") as f: # context = SerializerContext(sheerka=self.sheerka) # return self.serializer.deserialize(f, context) # # def save_state(self, state: State): # digest = state.get_digest() # self.log.debug(f"Saving new state. digest={digest}") # target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) # if self.io.exists(target_path): # return digest # # context = SerializerContext(sheerka=self.sheerka) # self.io.write_binary(target_path, self.serializer.serialize(state, context).read()) # return digest # # def load_state(self, digest): # if digest is None: # return State() # # target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) # with self.io.open(target_path, "rb") as f: # context = SerializerContext(sheerka=self.sheerka) # return self.serializer.deserialize(f, context) # # def save_obj(self, obj): # self.log.debug(f"Saving '{obj}' as reference...") # context = SerializerContext(user_name="kodjo", sheerka=self.sheerka) # stream = self.serializer.serialize(obj, context) # digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream) # # target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) # if self.io.exists(target_path): # self.log.debug(f"...already saved. digest is {digest}") # return digest # # self.io.write_binary(target_path, stream.read()) # # self.log.debug(f"...digest={digest}.") # return digest # # def load_obj(self, digest, add_origin=True): # if digest is None: # return None # # target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) # if not self.io.exists(target_path): # return None # # with self.io.open(target_path, "rb") as f: # context = SerializerContext(origin=digest, sheerka=self.sheerka) # obj = self.serializer.deserialize(f, context) # # # set the origin of the object # if add_origin: # if isinstance(obj, dict): # obj[Serializer.ORIGIN] = digest # elif not isinstance(obj, str): # setattr(obj, Serializer.ORIGIN, digest) # return obj # # def load_ref_if_needed(self, obj, load_origin=True): # if isinstance(obj, SheerkaDataProviderRef): # resolved = self.load_obj(obj.target, load_origin) # return resolved, False # # if not isinstance(obj, str) or not obj.startswith(SheerkaDataProvider.REF_PREFIX): # return obj, False # # resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):], load_origin) # return (obj, False) if resolved is None else (resolved, True) # # def save_ref_if_needed(self, save_ref, obj): # if not save_ref: # return obj # # digest = self.save_obj(obj) # return self.REF_PREFIX + digest # # def get_cache_params(self, category, key): # digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() # cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest) # return digest, cache_path # # def add_to_cache(self, category, key, obj, update=False): # """ # Save obj in the internal cache system # :param category: # :param key: # :param obj: # :param update: # :return: # """ # digest, cache_path = self.get_cache_params(category, key) # # if self.io.exists(cache_path) and not update: # return digest # # self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9)) # return digest # # def load_from_cache(self, category, key): # """ # Reload a compress object from the cache # :param category: # :param key: # :return: # """ # digest, cache_path = self.get_cache_params(category, key) # # if not self.io.exists(cache_path): # raise IndexError(f"{category}.{key}") # # with self.io.open(cache_path, "rb") as f: # return zlib.decompress(f.read()).decode("utf-8") # # def remove_from_cache(self, category, key): # """ # # :param category: # :param key: # :return: # """ # digest, cache_path = self.get_cache_params(category, key) # if self.io.exists(cache_path): # self.io.remove(cache_path) # # return digest # # def in_cache(self, category, key): # """ # Returns true if the key is in cache # :param category: # :param key: # :return: # """ # digest, cache_path = self.get_cache_params(category, key) # return self.io.exists(cache_path) # # def get_snapshot(self, file): # head_file = self.io.path_join(file) # if not self.io.exists(head_file): # return None # return self.io.read_text(head_file) # # with open(head_file, "r") as f: # # return f.read() # # def set_snapshot(self, file, digest): # head_file = self.io.path_join(file) # return self.io.write_text(head_file, digest) # # with open(head_file, "w") as f: # # return f.write(digest) # # def load_keys(self): # keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) # if not self.io.exists(keys_file): # keys = {} # else: # with self.io.open(keys_file, "r") as f: # keys = json.load(f) # return keys # # def save_keys(self, keys): # keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) # with self.io.open(keys_file, "w") as f: # json.dump(keys, f) # # def get_next_key(self, entry): # keys = self.load_keys() # # next_key = keys.get(entry, 0) + 1 # keys[entry] = next_key # # self.save_keys(keys) # return str(next_key) # # def set_key(self, entry, value): # keys = self.load_keys() # keys[entry] = value # self.save_keys(keys) # return str(value) # # def dump_state(self, digest=None): # digest = digest or self.get_snapshot(SheerkaDataProvider.HeadFile) # state = self.load_state(digest) # print(json.dumps(state.data, sort_keys=True, default=json_default_converter, indent=True)) # # def dump_obj(self, digest): # obj = self.load_obj(digest) # print(json.dumps(obj.__dict__, sort_keys=True, default=json_default_converter, indent=True))