diff --git a/src/core/sheerka/Services/SheerkaCreateNewConcept.py b/src/core/sheerka/Services/SheerkaCreateNewConcept.py index f11a475..f6e082f 100644 --- a/src/core/sheerka/Services/SheerkaCreateNewConcept.py +++ b/src/core/sheerka/Services/SheerkaCreateNewConcept.py @@ -1,7 +1,6 @@ import core.utils from core.builtin_concepts import BuiltinConcepts, ErrorConcept from core.concept import Concept -from sdp.sheerkaDataProvider_Old import SheerkaDataProviderDuplicateKeyError BNF_NODE_PARSER_CLASS = "parsers.BnfNodeParser_Old.BnfNodeParser" BASE_NODE_PARSER_CLASS = "parsers.BaseNodeParser.BaseNodeParser" diff --git a/src/core/sheerka/Services/SheerkaDump.py b/src/core/sheerka/Services/SheerkaDump.py index 7d89c82..4c6154a 100644 --- a/src/core/sheerka/Services/SheerkaDump.py +++ b/src/core/sheerka/Services/SheerkaDump.py @@ -18,7 +18,7 @@ class SheerkaDump: self.sheerka = sheerka def dump_concepts(self): - lst = self.sheerka.sdp.list(self.sheerka.CONCEPTS_BY_KEY_ENTRY) + lst = self.sheerka.sdp.list(self.sheerka.CONCEPTS_BY_ID_ENTRY) for item in lst: if hasattr(item, "__iter__"): for i in item: diff --git a/src/sdp/sheerkaDataProvider.py b/src/sdp/sheerkaDataProvider.py index 9ba36f8..51629df 100644 --- a/src/sdp/sheerkaDataProvider.py +++ b/src/sdp/sheerkaDataProvider.py @@ -89,6 +89,13 @@ class State: return hashlib.sha256(as_json.encode("utf-8")).hexdigest() +class SheerkaDataProviderDuplicateKeyError(Exception): + def __init__(self, key, obj): + Exception.__init__(self, "Duplicate object.") + self.key = key + self.obj = obj + + @dataclass class SheerkaDataProviderResult: """ @@ -224,6 +231,39 @@ class SheerkaDataProvider: return {self.load_ref_if_needed(i, load_origin) for i in item} return self.load_ref_if_needed(item, load_origin) + def list(self, entry, filter=None): + """ + Lists elements of entry 'entry' + :param entry: name of the entry to list + :param filter: filter to use + :return: list of elements + """ + snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) + state = self.load_state(snapshot) + if entry not in state.data: + return [] + + elements = state.data[entry] + + if isinstance(elements, dict): + # manage when elements have a key + filter_to_use = (lambda k, o: True) if filter is None else filter + for key, element in elements.items(): + if filter_to_use(key, element): + if isinstance(element, list): + yield [self.load_ref_if_needed(e) for e in element] + else: + yield self.load_ref_if_needed(element) + else: + # manage when no key is defined for the elements + if not isinstance(elements, list) and not isinstance(elements, set): + elements = [elements] + + filter_to_use = (lambda o: True) if filter is None else filter + for element in elements: + if filter_to_use(element): + yield self.load_ref_if_needed(element) + def exists(self, entry, key=None): """ Returns true if the entry is defined diff --git a/src/sdp/sheerkaDataProvider_Old.py b/src/sdp/sheerkaDataProvider_Old.py index f8dd4e4..dd7080c 100644 --- a/src/sdp/sheerkaDataProvider_Old.py +++ b/src/sdp/sheerkaDataProvider_Old.py @@ -1,1087 +1,1087 @@ -import hashlib -import json -import zlib -import time -from dataclasses import dataclass -from datetime import datetime, date - -from core.sheerka_logger import get_logger -from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO -from sdp.sheerkaSerializer import Serializer, SerializerContext - - -def json_default_converter(o): - """ - Default formatter for json - It's used when the json serializer does not know - how to serialise a type - :param o: - :return: - """ - if isinstance(o, (date, datetime)): - return o.isoformat() - - if isinstance(o, SheerkaDataProviderRef): - return f"##XREF##:{o.target}" - - -class Event(object): - """ - Class that represents something that modifies the state of the system - """ - - def __init__(self, message="", user="", date=datetime.now(), parents=None): - self.version = 1 - self.user = user - self.date = date - self.message = message - self.parents = parents - self._digest = None - - def __str__(self): - return f"{self.date.strftime('%d/%m/%Y %H:%M:%S')} {self.message}" - - def __repr__(self): - return f"{self.get_digest()[:12]} {self.message}" - - def get_digest(self): - """ - Returns the digest of the event - :return: hexa form of the sha256 - """ - - if self._digest: - return self._digest - - if self.message == "" and self.user == "": - self._digest = "xxx" # to speed unit tests - return self._digest - - if not isinstance(self.message, str): - raise NotImplementedError - - to_hash = f"Event:{self.user}{self.date}{self.message}{self.parents}".encode("utf-8") - self._digest = hashlib.sha256(to_hash).hexdigest() - return self._digest - - def to_dict(self): - return self.__dict__ - - def from_dict(self, as_dict): - self.user = as_dict["user"] - self.date = datetime.fromisoformat(as_dict["date"]) - self.message = as_dict["message"] - self.parents = as_dict["parents"] - self._digest = as_dict["_digest"] # freeze the digest - - -class ObjToUpdate: - """ - Internal key value class; - You give it an obj, and it tries to figure out what is the key of the obj - Note that you can force the key if you want - It was first create to make the difference between an object that has a key and {key, value} - """ - - def __init__(self, obj, key=None, digest=None): - self.obj = obj - self.has_key = None - self.has_digest = None - self._key = None - self._digest = None - if key is not None: - self.set_key(key) - if digest is not None: - self.set_digest(digest) - - def get_key(self): - if self.has_key is None: - key = SheerkaDataProvider.get_obj_key(self.obj) - if key is None: - self.has_key = False - return None - else: - self.has_key = True - self._key = key - return key - elif not self.has_key: - return None - else: - return self._key - - def get_digest(self): - if self.has_digest is None: - digest = SheerkaDataProvider.get_obj_digest(self.obj) - if digest is None: - self.has_digest = False - return None - else: - self.has_digest = True - self._digest = digest - return digest - elif not self.has_digest: - return None - else: - return self._digest - - def set_digest(self, digest): - self.has_digest = True - self._digest = digest - - def set_key(self, key): - self.has_key = True - self._key = key - - -class State: - """ - Class that represents the state of the system (dictionary of all known entries) - """ - - def __init__(self): - self.version = 1 - self.date = None - self.parents = [] - self.events = [] - self.data = {} - - @staticmethod - def check_duplicate(items, obj: ObjToUpdate, key): - digest = obj.get_digest() - if digest is None: - return - - if not hasattr(items, "__iter__"): - items = [items] - - for item in items: - item_digest = SheerkaDataProvider.get_obj_digest(item) - if item_digest == digest: - raise SheerkaDataProviderDuplicateKeyError(key, obj.obj) - - def update(self, entry, obj: ObjToUpdate, append=True): - """ - adds obj to entry - :param entry: - :param obj: - :param append: if True, duplicate keys will create lists - :return: - """ - obj_to_use = {obj.get_key(): obj.obj} if obj.has_key else obj.obj - - if entry not in self.data: - self.data[entry] = obj_to_use - - elif not append: - if isinstance(obj_to_use, dict): - self.data[entry].update(obj_to_use) - else: - self.data[entry] = obj_to_use - - elif isinstance(self.data[entry], list): - self.check_duplicate(self.data[entry], obj, entry) - self.data[entry].append(obj.obj) - - elif isinstance(obj_to_use, dict): - for k in obj_to_use: - if k not in self.data[entry]: - self.data[entry][k] = obj_to_use[k] - elif isinstance(self.data[entry][k], list): - self.check_duplicate(self.data[entry][k], obj, entry + "." + k) - self.data[entry][k].append(obj_to_use[k]) - else: - self.check_duplicate(self.data[entry][k], obj, entry + "." + k) - self.data[entry][k] = [self.data[entry][k], obj_to_use[k]] - - elif isinstance(self.data[entry], dict): - raise SheerkaDataProviderError(f"Cannot found key on '{obj.obj}' while all other elements have.", obj.obj) - - else: - self.check_duplicate(self.data[entry], obj, entry) - self.data[entry] = [self.data[entry], obj_to_use] - - def modify(self, entry, key, obj, obj_key): - # if the key changes, make sure to remove the previous entry - append = False - if obj_key != key: - self.remove(entry, lambda k, o: k == key) # modify from on object to another - append = True - - self.update(entry, ObjToUpdate(obj, obj_key), append=append) - - def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed): - found = False - to_remove = None - new_digest = None - - def _get_item_origin(o): - if hasattr(o, Serializer.ORIGIN): - return getattr(o, Serializer.ORIGIN) - - if isinstance(o, dict) and Serializer.ORIGIN in o: - return o[Serializer.ORIGIN] - - if hasattr(o, "get_digest"): - return o.get_digest() - - if isinstance(o, str): - return o - - return None - - for i in range(len(self.data[entry][key])): - item, is_ref = load_ref_if_needed(self.data[entry][key][i]) - item_origin = _get_item_origin(item) - if item_origin is None: - continue - if item_origin == obj_origin: - obj = save_ref_if_needed(is_ref, obj) - if is_ref: - new_digest = obj[len(SheerkaDataProvider.REF_PREFIX):] - if obj_key == key: - self.data[entry][key][i] = obj - else: - to_remove = i - self.update(entry, ObjToUpdate(obj, obj_key), append=True) - found = True - break - - if not found: - raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj)) - - if to_remove is not None: - del self.data[entry][key][to_remove] - - return new_digest - - def remove(self, entry, filter): - if filter is None: - del (self.data[entry]) - - elif isinstance(self.data[entry], dict): - keys_to_remove = [] - for key, element in self.data[entry].items(): - if filter(key, element): - keys_to_remove.append(key) - for key in keys_to_remove: - del (self.data[entry][key]) - - elif not isinstance(self.data[entry], list): - if filter(self.data[entry]): - del (self.data[entry]) - - else: - for element in self.data[entry]: - if filter(element): - self.data[entry].remove(element) - - def get_digest(self): - as_json = json.dumps(self.__dict__, default=json_default_converter) - return hashlib.sha256(as_json.encode("utf-8")).hexdigest() - - def contains(self, entry, key): - """ - if key is None, returns True if entry exists - if key has a value - returns True if entry is an dict and contains key - :param entry: - :param key: - :return: - """ - if entry not in self.data: - return False - if key is None: - return entry in self.data - if not isinstance(self.data[entry], dict): - return False - return key in self.data[entry] - - -class SheerkaDataProviderError(Exception): - def __init__(self, message, obj): - Exception.__init__(self, message) - self.obj = obj - - -class SheerkaDataProviderDuplicateKeyError(Exception): - def __init__(self, key, obj): - Exception.__init__(self, "Duplicate object.") - self.key = key - self.obj = obj - - -@dataclass -class SheerkaDataProviderResult: - """ - Object that is returned after adding, setting or modifying an entry - """ - obj: object # obj that was given to store/modify - entry: str # entry where the object is put - key: str # key to use to retrieve the object - digest: str # digest used to store the reference - already_exists: bool = False # the same object was already persisted - - -@dataclass -class SheerkaDataProviderRef: - """ - Object that tells where an object is store (target is the digest of the reference) - """ - key: str # key of the object - target: str # digest of the reference - original_target: str = None # when the object is modified, previous digest - - def get_digest(self): - return self.original_target - - def get_key(self): - return self.key - - -class SheerkaDataProvider: - """Manages the state of the system""" - - EventFolder = "events" - StateFolder = "state" - ObjectsFolder = "objects" - CacheFolder = "cache" - HeadFile = "HEAD" - LastEventFile = "LAST_EVENT" - KeysFile = "keys" - REF_PREFIX = "##REF##:" - - def __init__(self, root=None, sheerka=None): - self.log = get_logger(__name__) - self.init_log = get_logger("init." + __name__) - self.init_log.debug("Initializing sdp.") - - self.sheerka = sheerka - self.io = SheerkaDataProviderIO.get(root) - self.first_time = self.io.first_time - - self.serializer = Serializer() - - @staticmethod - def get_obj_key(obj): - """ - Tries to find the key of an object - Look for .key, .get_key() - :param obj: - :return: String version of that is found, None otherwise - """ - return str(obj.get_key()) if hasattr(obj, "get_key") \ - else str(obj.key) if hasattr(obj, "key") \ - else None - - @staticmethod - def get_obj_digest(obj): - """ - Tries to find the key of an object - Look for .digest, .get_digest() - :param obj: - :return: digest, None otherwise - """ - if isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX): - return obj[len(SheerkaDataProvider.REF_PREFIX):] - - return obj.digest if hasattr(obj, "digest") \ - else obj.get_digest() if hasattr(obj, "get_digest") \ - else None - - @staticmethod - def get_obj_origin(obj): - """ - Get the digest used to save obj if set - """ - if isinstance(obj, dict) and Serializer.ORIGIN in obj: - return obj[Serializer.ORIGIN] - - if hasattr(obj, Serializer.ORIGIN): - return getattr(obj, Serializer.ORIGIN) - - if isinstance(obj, SheerkaDataProviderRef): - return obj.original_target - - return None - - @staticmethod - def get_stream_digest(stream): - sha256_hash = hashlib.sha256() - for byte_block in iter(lambda: stream.read(4096), b""): - sha256_hash.update(byte_block) - - stream.seek(0) - return sha256_hash.hexdigest() - - @staticmethod - def is_reference(obj): - return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX) - - def reset(self): - self.first_time = self.io.first_time - if hasattr(self.io, "reset"): - self.io.reset() - - def add(self, event_digest: str, entry, obj, allow_multiple=True, use_ref=False): - """ - Adds obj to the entry 'entry' - :param event_digest: digest of the event that triggers the modification of the state - :param entry: entry of the state to update - :param obj: obj to insert or add - :param allow_multiple: if set to true, the same key can be added several times. - All entries will be put in a list - :param use_ref: if True the actual object is saved under 'objects' folder, - only a reference is saved in the state - :return: (entry, key) to retrieve the object - """ - - original_obj = obj.copy() if isinstance(obj, dict) else obj - - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})") - - if not isinstance(obj, ObjToUpdate): - obj = ObjToUpdate(obj) - - # check uniqueness, cannot add the same key twice if allow_multiple == False - key = obj.get_key() - self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found") - if not allow_multiple: - if isinstance(obj.obj, dict): - for k in obj.obj: - if state.contains(entry, k): - raise IndexError(f"{entry}.{k}") - else: - if state.contains(entry, key): - raise IndexError(f"{entry}.{key}" if key else entry) - - state.parents = [] if snapshot is None else [snapshot] - state.events = [event_digest] - state.date = datetime.now() - - if use_ref: - obj.set_digest(self.save_obj(obj.obj)) - obj.obj = self.REF_PREFIX + obj.get_digest() - - state.update(entry, obj) - - new_snapshot = self.save_state(state) - self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) - return SheerkaDataProviderResult(original_obj, entry, obj.get_key(), obj.get_digest()) - - def add_with_auto_key(self, event_digest: str, entry, obj): - """ - Add obj to entry. An autogenerated key created for obj - :param event_digest: - :param entry: - :param obj: - :return: - """ - - original_obj = obj.copy() if isinstance(obj, dict) else obj - - next_key = self.get_next_key(entry) - if hasattr(obj, "set_key"): - obj.set_key(next_key) - res = self.add(event_digest, entry, ObjToUpdate(obj, next_key)) - return SheerkaDataProviderResult(original_obj, res.entry, res.key, res.digest) - - def add_unique(self, event_digest: str, entry, obj): - """Add an entry and make sure it's unique""" - - original_obj = obj.copy() if isinstance(obj, dict) else obj - - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - state.parents = [] if snapshot is None else [snapshot] - state.events = [event_digest] - state.date = datetime.now() - if entry not in state.data: - state.data[entry] = {obj} - already_exist = False - else: - already_exist = obj in state.data[entry] - if not already_exist: - state.data[entry].add(obj) - - new_snapshot = self.save_state(state) - self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) - return SheerkaDataProviderResult( - original_obj, - entry, - None, - None, - already_exist) - - def set(self, event_digest, entry, obj, use_ref=False, is_ref=False): - """ - Add or replace an entry. The entry is reinitialized. - If the previous value was dict, all keys are lost - :param event_digest: - :param entry: - :param obj: - :param use_ref: Do not save obj in State (save it under objects), use_ref in State - :param is_ref: obj is supposed to be a reference - :return: - """ - - original_obj = obj.copy() if isinstance(obj, dict) else obj - - if use_ref and is_ref: - raise SheerkaDataProviderError("Cannot use use_ref and is_ref at the same time", None) - - if is_ref and not isinstance(obj, dict): - raise SheerkaDataProviderError("is_ref can only be used with dictionaries", obj) - - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - state.parents = [] if snapshot is None else [snapshot] - state.events = [event_digest] - state.date = datetime.now() - - key = self.get_obj_key(obj) - obj = self.save_ref_if_needed(use_ref, obj) - - if is_ref: - for k, v in obj.items(): - obj[k] = self.REF_PREFIX + v - - state.data[entry] = obj if key is None else {key: obj} - - new_snapshot = self.save_state(state) - self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) - return SheerkaDataProviderResult(original_obj, entry, key, self.get_obj_digest(obj)) - - def modify(self, event_digest, entry, key, obj): - """ - Replace an element - If the key is not provided, has the same effect than set eg, the entry is reset - :param event_digest: - :param entry: - :param key: key of the object to update - :param obj: new data - :return: - """ - - original_obj = obj.copy() if isinstance(obj, dict) else obj - - if key is None: - raise SheerkaDataProviderError("Key is mandatory.", None) - - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - if entry not in state.data: - raise IndexError(entry) - - if key is not None and key not in state.data[entry]: - raise IndexError(f"{entry}.{key}") - - state.parents = [] if snapshot is None else [snapshot] - state.events = [event_digest] - state.date = datetime.now() - - # Gets obj original key, it will help to know if the key has changed - obj_key = self.get_obj_key(obj) or key - digest = None - - if isinstance(state.data[entry][key], list): - obj_origin = self.get_obj_origin(obj) - if obj_origin is None: - raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj)) - - digest = state.modify_in_list( - entry, - key, - obj, - obj_key, - obj_origin, - self.load_ref_if_needed, - self.save_ref_if_needed) - - else: - was_saved_as_reference = self.is_reference(state.data[entry][key]) - if was_saved_as_reference: - obj = self.save_ref_if_needed(True, obj) - digest = self.get_obj_digest(obj) - - state.modify(entry, key, obj, obj_key) - - new_snapshot = self.save_state(state) - self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) - return SheerkaDataProviderResult(original_obj, entry, obj_key, digest) - - def list(self, entry, filter=None): - """ - Lists elements of entry 'entry' - :param entry: name of the entry to list - :param filter: filter to use - :return: list of elements - """ - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - if entry not in state.data: - return [] - - elements = state.data[entry] - - if isinstance(elements, dict): - # manage when elements have a key - filter_to_use = (lambda k, o: True) if filter is None else filter - for key, element in elements.items(): - if filter_to_use(key, element): - if isinstance(element, list): - yield [self.load_ref_if_needed(e)[0] for e in element] - else: - yield self.load_ref_if_needed(element)[0] - else: - # manage when no key is defined for the elements - if not isinstance(elements, list) and not isinstance(elements, set): - elements = [elements] - - filter_to_use = (lambda o: True) if filter is None else filter - for element in elements: - if filter_to_use(element): - yield self.load_ref_if_needed(element)[0] - - def remove(self, event_digest, entry, filter=None, silent_remove=True): - """ - Removes elements under the entry 'entry' - :param event_digest: event that triggers the deletion - :param entry: - :param filter: filter to use - :param silent_remove: Do not throw exception if entry does not exist - :return: new sha256 of the state - TODO: Remove by key - """ - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - if entry not in state.data: - if silent_remove: - return snapshot - else: - raise IndexError(entry) - - state.parents = [] if snapshot is None else [snapshot] - state.events = [event_digest] - state.date = datetime.now() - state.remove(entry, filter) - - new_snapshot = self.save_state(state) - self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) - return new_snapshot - - def get(self, entry, key=None, load_origin=True): - """ - Retrieve an element by its key - :param entry: - :param key: - :param load_origin: if True, adds the origin (parent digest) to the object - :return: - """ - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - if entry not in state.data: - raise IndexError(entry) - - if key is not None and key not in state.data[entry]: - raise IndexError(f"{entry}.{key}") - - item = state.data[entry] if key is None else state.data[entry][key] - if isinstance(item, list): - return [self.load_ref_if_needed(i, load_origin)[0] for i in item] - - return self.load_ref_if_needed(item, load_origin)[0] - - def get_safe(self, entry, key=None, load_origin=True): - """ - Retrieve an element by its key. Return None if the element does not exist - :param entry: - :param key: - :param load_origin: if True, adds the origin (parent digest) to the object - :return: - """ - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - if entry not in state.data: - return None - - if key is not None and key not in state.data[entry]: - return None - - item = state.data[entry] if key is None else state.data[entry][key] - if isinstance(item, list): - return [self.load_ref_if_needed(i, load_origin)[0] for i in item] - - return self.load_ref_if_needed(item, load_origin)[0] - - def get_ref(self, entry, key=None): - """ - Returns the reference of an object if the object exists - This function allows to retrieve obj.##origin## without loading the object - :param entry: - :param key: - :return: - """ - - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - - if entry not in state.data: - raise IndexError(entry) - - if key is not None and key not in state.data[entry]: - raise IndexError(f"{entry}.{key}") - - item = state.data[entry] if key is None else state.data[entry][key] - if isinstance(item, list): - res = [] - for element in item: - if not self.is_reference(element): - raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") - res.append(self.get_obj_digest(element)) - return res - - if not self.is_reference(item): - raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") - - return self.get_obj_digest(item) - - def exists(self, entry, key=None, digest=None): - """ - Returns true if the entry is defined - :param key: - :param entry: - :param digest: digest of the object, when several entries share the same key - :return: - """ - snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(snapshot) - exist = entry in state.data - - if not exist or key is None: - return exist - - items = state.data[entry] - exist = key in items - if not exist or digest is None: - return exist - - items = items[key] - if not isinstance(items, list): - items = [items] - - for item in items: - item_digest = SheerkaDataProvider.get_obj_digest(item) - if item_digest == digest: - return True - - return False - - def save_event(self, event: Event): - """ - return an event, given its digest - :param event: - :return: digest of the event - """ - parent = self.get_snapshot(SheerkaDataProvider.LastEventFile) - event.parents = [parent] if parent else None - digest = event.get_digest() # must be call after setting the parents - - target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) - if self.io.exists(target_path): - return digest - - self.io.write_binary(target_path, self.serializer.serialize(event, None).read()) - self.set_snapshot(SheerkaDataProvider.LastEventFile, digest) - - return digest - - def load_event(self, digest=None): - """ - return an event, given its digest - :param digest: - :return: - """ - digest = digest or self.get_snapshot(SheerkaDataProvider.LastEventFile) - if digest is None: - return None - - target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) - - with self.io.open(target_path, "rb") as f: - return self.serializer.deserialize(f, None) - - def load_events(self, page_size, start=0): - """ - Load multiple events in the same command - :param start: - :param page_size: - :return: - """ - - digest = None - if start: - for i in range(start): - event = self.load_event(digest) - if event is None or event.parents is None: - return - digest = event.parents[0] - - count = 0 - while count < page_size or page_size <= 0: - event = self.load_event(digest) - if event is None: - return - - yield event - - if event.parents is None: - return - - digest = event.parents[0] - count += 1 - - def get_result_file_path(self, digest, is_admin): - ext = "_admin_result" if is_admin else "_result" - return self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) + ext - - def has_result(self, digest, is_admin=False): - """ - Check is a result file was created for a specific event - :param digest: - :param is_admin: True is the result is an internal admin result file - :return: - """ - target_path = self.get_result_file_path(digest, is_admin) - return self.io.exists(target_path) - - def save_result(self, execution_context, is_admin=False): - """ - Save the execution context associated with an event - To make a long story short, - for every single user input, there is an event (which is the first thing that is created) - and a result (the ExecutionContext created by sheerka.evaluate_user_input() - :param execution_context: - :param is_admin: True is the result is an internal admin result file - :return: - """ - start = time.time() - message = execution_context.event.message - digest = execution_context.event.get_digest() - self.log.debug(f"Saving execution context. digest={digest}, message={message}") - target_path = self.get_result_file_path(digest, is_admin) - if self.io.exists(target_path): - return digest - - context = SerializerContext(sheerka=self.sheerka) - length = self.io.write_binary(target_path, self.serializer.serialize(execution_context, context).read()) - elapsed = time.time() - start - self.log.debug(f"Saved execution context. message={message}, length={length}, elapsed={elapsed}") - return digest - - def load_result(self, digest, is_admin=False): - """ - Load and deserialize a result file - :param digest: - :param is_admin: True is the result is an internal admin result file - :return: - """ - target_path = self.get_result_file_path(digest, is_admin) - - with self.io.open(target_path, "rb") as f: - context = SerializerContext(sheerka=self.sheerka) - return self.serializer.deserialize(f, context) - - def save_state(self, state: State): - digest = state.get_digest() - self.log.debug(f"Saving new state. digest={digest}") - target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) - if self.io.exists(target_path): - return digest - - context = SerializerContext(sheerka=self.sheerka) - self.io.write_binary(target_path, self.serializer.serialize(state, context).read()) - return digest - - def load_state(self, digest): - if digest is None: - return State() - - target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) - with self.io.open(target_path, "rb") as f: - context = SerializerContext(sheerka=self.sheerka) - return self.serializer.deserialize(f, context) - - def save_obj(self, obj): - self.log.debug(f"Saving '{obj}' as reference...") - context = SerializerContext(user_name="kodjo", sheerka=self.sheerka) - stream = self.serializer.serialize(obj, context) - digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream) - - target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) - if self.io.exists(target_path): - self.log.debug(f"...already saved. digest is {digest}") - return digest - - self.io.write_binary(target_path, stream.read()) - - self.log.debug(f"...digest={digest}.") - return digest - - def load_obj(self, digest, add_origin=True): - if digest is None: - return None - - target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) - if not self.io.exists(target_path): - return None - - with self.io.open(target_path, "rb") as f: - context = SerializerContext(origin=digest, sheerka=self.sheerka) - obj = self.serializer.deserialize(f, context) - - # set the origin of the object - if add_origin: - if isinstance(obj, dict): - obj[Serializer.ORIGIN] = digest - elif not isinstance(obj, str): - setattr(obj, Serializer.ORIGIN, digest) - return obj - - def load_ref_if_needed(self, obj, load_origin=True): - if isinstance(obj, SheerkaDataProviderRef): - resolved = self.load_obj(obj.target, load_origin) - return resolved, False - - if not isinstance(obj, str) or not obj.startswith(SheerkaDataProvider.REF_PREFIX): - return obj, False - - resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):], load_origin) - return (obj, False) if resolved is None else (resolved, True) - - def save_ref_if_needed(self, save_ref, obj): - if not save_ref: - return obj - - digest = self.save_obj(obj) - return self.REF_PREFIX + digest - - def get_cache_params(self, category, key): - digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() - cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest) - return digest, cache_path - - def add_to_cache(self, category, key, obj, update=False): - """ - Save obj in the internal cache system - :param category: - :param key: - :param obj: - :param update: - :return: - """ - digest, cache_path = self.get_cache_params(category, key) - - if self.io.exists(cache_path) and not update: - return digest - - self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9)) - return digest - - def load_from_cache(self, category, key): - """ - Reload a compress object from the cache - :param category: - :param key: - :return: - """ - digest, cache_path = self.get_cache_params(category, key) - - if not self.io.exists(cache_path): - raise IndexError(f"{category}.{key}") - - with self.io.open(cache_path, "rb") as f: - return zlib.decompress(f.read()).decode("utf-8") - - def remove_from_cache(self, category, key): - """ - - :param category: - :param key: - :return: - """ - digest, cache_path = self.get_cache_params(category, key) - if self.io.exists(cache_path): - self.io.remove(cache_path) - - return digest - - def in_cache(self, category, key): - """ - Returns true if the key is in cache - :param category: - :param key: - :return: - """ - digest, cache_path = self.get_cache_params(category, key) - return self.io.exists(cache_path) - - def get_snapshot(self, file): - head_file = self.io.path_join(file) - if not self.io.exists(head_file): - return None - return self.io.read_text(head_file) - # with open(head_file, "r") as f: - # return f.read() - - def set_snapshot(self, file, digest): - head_file = self.io.path_join(file) - return self.io.write_text(head_file, digest) - # with open(head_file, "w") as f: - # return f.write(digest) - - def load_keys(self): - keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) - if not self.io.exists(keys_file): - keys = {} - else: - with self.io.open(keys_file, "r") as f: - keys = json.load(f) - return keys - - def save_keys(self, keys): - keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) - with self.io.open(keys_file, "w") as f: - json.dump(keys, f) - - def get_next_key(self, entry): - keys = self.load_keys() - - next_key = keys.get(entry, 0) + 1 - keys[entry] = next_key - - self.save_keys(keys) - return str(next_key) - - def set_key(self, entry, value): - keys = self.load_keys() - keys[entry] = value - self.save_keys(keys) - return str(value) - - def dump_state(self, digest=None): - digest = digest or self.get_snapshot(SheerkaDataProvider.HeadFile) - state = self.load_state(digest) - print(json.dumps(state.data, sort_keys=True, default=json_default_converter, indent=True)) - - def dump_obj(self, digest): - obj = self.load_obj(digest) - print(json.dumps(obj.__dict__, sort_keys=True, default=json_default_converter, indent=True)) +# import hashlib +# import json +# import zlib +# import time +# from dataclasses import dataclass +# from datetime import datetime, date +# +# from core.sheerka_logger import get_logger +# from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO +# from sdp.sheerkaSerializer import Serializer, SerializerContext +# +# +# def json_default_converter(o): +# """ +# Default formatter for json +# It's used when the json serializer does not know +# how to serialise a type +# :param o: +# :return: +# """ +# if isinstance(o, (date, datetime)): +# return o.isoformat() +# +# if isinstance(o, SheerkaDataProviderRef): +# return f"##XREF##:{o.target}" +# +# +# class Event(object): +# """ +# Class that represents something that modifies the state of the system +# """ +# +# def __init__(self, message="", user="", date=datetime.now(), parents=None): +# self.version = 1 +# self.user = user +# self.date = date +# self.message = message +# self.parents = parents +# self._digest = None +# +# def __str__(self): +# return f"{self.date.strftime('%d/%m/%Y %H:%M:%S')} {self.message}" +# +# def __repr__(self): +# return f"{self.get_digest()[:12]} {self.message}" +# +# def get_digest(self): +# """ +# Returns the digest of the event +# :return: hexa form of the sha256 +# """ +# +# if self._digest: +# return self._digest +# +# if self.message == "" and self.user == "": +# self._digest = "xxx" # to speed unit tests +# return self._digest +# +# if not isinstance(self.message, str): +# raise NotImplementedError +# +# to_hash = f"Event:{self.user}{self.date}{self.message}{self.parents}".encode("utf-8") +# self._digest = hashlib.sha256(to_hash).hexdigest() +# return self._digest +# +# def to_dict(self): +# return self.__dict__ +# +# def from_dict(self, as_dict): +# self.user = as_dict["user"] +# self.date = datetime.fromisoformat(as_dict["date"]) +# self.message = as_dict["message"] +# self.parents = as_dict["parents"] +# self._digest = as_dict["_digest"] # freeze the digest +# +# +# class ObjToUpdate: +# """ +# Internal key value class; +# You give it an obj, and it tries to figure out what is the key of the obj +# Note that you can force the key if you want +# It was first create to make the difference between an object that has a key and {key, value} +# """ +# +# def __init__(self, obj, key=None, digest=None): +# self.obj = obj +# self.has_key = None +# self.has_digest = None +# self._key = None +# self._digest = None +# if key is not None: +# self.set_key(key) +# if digest is not None: +# self.set_digest(digest) +# +# def get_key(self): +# if self.has_key is None: +# key = SheerkaDataProvider.get_obj_key(self.obj) +# if key is None: +# self.has_key = False +# return None +# else: +# self.has_key = True +# self._key = key +# return key +# elif not self.has_key: +# return None +# else: +# return self._key +# +# def get_digest(self): +# if self.has_digest is None: +# digest = SheerkaDataProvider.get_obj_digest(self.obj) +# if digest is None: +# self.has_digest = False +# return None +# else: +# self.has_digest = True +# self._digest = digest +# return digest +# elif not self.has_digest: +# return None +# else: +# return self._digest +# +# def set_digest(self, digest): +# self.has_digest = True +# self._digest = digest +# +# def set_key(self, key): +# self.has_key = True +# self._key = key +# +# +# class State: +# """ +# Class that represents the state of the system (dictionary of all known entries) +# """ +# +# def __init__(self): +# self.version = 1 +# self.date = None +# self.parents = [] +# self.events = [] +# self.data = {} +# +# @staticmethod +# def check_duplicate(items, obj: ObjToUpdate, key): +# digest = obj.get_digest() +# if digest is None: +# return +# +# if not hasattr(items, "__iter__"): +# items = [items] +# +# for item in items: +# item_digest = SheerkaDataProvider.get_obj_digest(item) +# if item_digest == digest: +# raise SheerkaDataProviderDuplicateKeyError(key, obj.obj) +# +# def update(self, entry, obj: ObjToUpdate, append=True): +# """ +# adds obj to entry +# :param entry: +# :param obj: +# :param append: if True, duplicate keys will create lists +# :return: +# """ +# obj_to_use = {obj.get_key(): obj.obj} if obj.has_key else obj.obj +# +# if entry not in self.data: +# self.data[entry] = obj_to_use +# +# elif not append: +# if isinstance(obj_to_use, dict): +# self.data[entry].update(obj_to_use) +# else: +# self.data[entry] = obj_to_use +# +# elif isinstance(self.data[entry], list): +# self.check_duplicate(self.data[entry], obj, entry) +# self.data[entry].append(obj.obj) +# +# elif isinstance(obj_to_use, dict): +# for k in obj_to_use: +# if k not in self.data[entry]: +# self.data[entry][k] = obj_to_use[k] +# elif isinstance(self.data[entry][k], list): +# self.check_duplicate(self.data[entry][k], obj, entry + "." + k) +# self.data[entry][k].append(obj_to_use[k]) +# else: +# self.check_duplicate(self.data[entry][k], obj, entry + "." + k) +# self.data[entry][k] = [self.data[entry][k], obj_to_use[k]] +# +# elif isinstance(self.data[entry], dict): +# raise SheerkaDataProviderError(f"Cannot found key on '{obj.obj}' while all other elements have.", obj.obj) +# +# else: +# self.check_duplicate(self.data[entry], obj, entry) +# self.data[entry] = [self.data[entry], obj_to_use] +# +# def modify(self, entry, key, obj, obj_key): +# # if the key changes, make sure to remove the previous entry +# append = False +# if obj_key != key: +# self.remove(entry, lambda k, o: k == key) # modify from on object to another +# append = True +# +# self.update(entry, ObjToUpdate(obj, obj_key), append=append) +# +# def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed): +# found = False +# to_remove = None +# new_digest = None +# +# def _get_item_origin(o): +# if hasattr(o, Serializer.ORIGIN): +# return getattr(o, Serializer.ORIGIN) +# +# if isinstance(o, dict) and Serializer.ORIGIN in o: +# return o[Serializer.ORIGIN] +# +# if hasattr(o, "get_digest"): +# return o.get_digest() +# +# if isinstance(o, str): +# return o +# +# return None +# +# for i in range(len(self.data[entry][key])): +# item, is_ref = load_ref_if_needed(self.data[entry][key][i]) +# item_origin = _get_item_origin(item) +# if item_origin is None: +# continue +# if item_origin == obj_origin: +# obj = save_ref_if_needed(is_ref, obj) +# if is_ref: +# new_digest = obj[len(SheerkaDataProvider.REF_PREFIX):] +# if obj_key == key: +# self.data[entry][key][i] = obj +# else: +# to_remove = i +# self.update(entry, ObjToUpdate(obj, obj_key), append=True) +# found = True +# break +# +# if not found: +# raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj)) +# +# if to_remove is not None: +# del self.data[entry][key][to_remove] +# +# return new_digest +# +# def remove(self, entry, filter): +# if filter is None: +# del (self.data[entry]) +# +# elif isinstance(self.data[entry], dict): +# keys_to_remove = [] +# for key, element in self.data[entry].items(): +# if filter(key, element): +# keys_to_remove.append(key) +# for key in keys_to_remove: +# del (self.data[entry][key]) +# +# elif not isinstance(self.data[entry], list): +# if filter(self.data[entry]): +# del (self.data[entry]) +# +# else: +# for element in self.data[entry]: +# if filter(element): +# self.data[entry].remove(element) +# +# def get_digest(self): +# as_json = json.dumps(self.__dict__, default=json_default_converter) +# return hashlib.sha256(as_json.encode("utf-8")).hexdigest() +# +# def contains(self, entry, key): +# """ +# if key is None, returns True if entry exists +# if key has a value +# returns True if entry is an dict and contains key +# :param entry: +# :param key: +# :return: +# """ +# if entry not in self.data: +# return False +# if key is None: +# return entry in self.data +# if not isinstance(self.data[entry], dict): +# return False +# return key in self.data[entry] +# +# +# class SheerkaDataProviderError(Exception): +# def __init__(self, message, obj): +# Exception.__init__(self, message) +# self.obj = obj +# +# +# class SheerkaDataProviderDuplicateKeyError(Exception): +# def __init__(self, key, obj): +# Exception.__init__(self, "Duplicate object.") +# self.key = key +# self.obj = obj +# +# +# @dataclass +# class SheerkaDataProviderResult: +# """ +# Object that is returned after adding, setting or modifying an entry +# """ +# obj: object # obj that was given to store/modify +# entry: str # entry where the object is put +# key: str # key to use to retrieve the object +# digest: str # digest used to store the reference +# already_exists: bool = False # the same object was already persisted +# +# +# @dataclass +# class SheerkaDataProviderRef: +# """ +# Object that tells where an object is store (target is the digest of the reference) +# """ +# key: str # key of the object +# target: str # digest of the reference +# original_target: str = None # when the object is modified, previous digest +# +# def get_digest(self): +# return self.original_target +# +# def get_key(self): +# return self.key +# +# +# class SheerkaDataProvider: +# """Manages the state of the system""" +# +# EventFolder = "events" +# StateFolder = "state" +# ObjectsFolder = "objects" +# CacheFolder = "cache" +# HeadFile = "HEAD" +# LastEventFile = "LAST_EVENT" +# KeysFile = "keys" +# REF_PREFIX = "##REF##:" +# +# def __init__(self, root=None, sheerka=None): +# self.log = get_logger(__name__) +# self.init_log = get_logger("init." + __name__) +# self.init_log.debug("Initializing sdp.") +# +# self.sheerka = sheerka +# self.io = SheerkaDataProviderIO.get(root) +# self.first_time = self.io.first_time +# +# self.serializer = Serializer() +# +# @staticmethod +# def get_obj_key(obj): +# """ +# Tries to find the key of an object +# Look for .key, .get_key() +# :param obj: +# :return: String version of that is found, None otherwise +# """ +# return str(obj.get_key()) if hasattr(obj, "get_key") \ +# else str(obj.key) if hasattr(obj, "key") \ +# else None +# +# @staticmethod +# def get_obj_digest(obj): +# """ +# Tries to find the key of an object +# Look for .digest, .get_digest() +# :param obj: +# :return: digest, None otherwise +# """ +# if isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX): +# return obj[len(SheerkaDataProvider.REF_PREFIX):] +# +# return obj.digest if hasattr(obj, "digest") \ +# else obj.get_digest() if hasattr(obj, "get_digest") \ +# else None +# +# @staticmethod +# def get_obj_origin(obj): +# """ +# Get the digest used to save obj if set +# """ +# if isinstance(obj, dict) and Serializer.ORIGIN in obj: +# return obj[Serializer.ORIGIN] +# +# if hasattr(obj, Serializer.ORIGIN): +# return getattr(obj, Serializer.ORIGIN) +# +# if isinstance(obj, SheerkaDataProviderRef): +# return obj.original_target +# +# return None +# +# @staticmethod +# def get_stream_digest(stream): +# sha256_hash = hashlib.sha256() +# for byte_block in iter(lambda: stream.read(4096), b""): +# sha256_hash.update(byte_block) +# +# stream.seek(0) +# return sha256_hash.hexdigest() +# +# @staticmethod +# def is_reference(obj): +# return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX) +# +# def reset(self): +# self.first_time = self.io.first_time +# if hasattr(self.io, "reset"): +# self.io.reset() +# +# def add(self, event_digest: str, entry, obj, allow_multiple=True, use_ref=False): +# """ +# Adds obj to the entry 'entry' +# :param event_digest: digest of the event that triggers the modification of the state +# :param entry: entry of the state to update +# :param obj: obj to insert or add +# :param allow_multiple: if set to true, the same key can be added several times. +# All entries will be put in a list +# :param use_ref: if True the actual object is saved under 'objects' folder, +# only a reference is saved in the state +# :return: (entry, key) to retrieve the object +# """ +# +# original_obj = obj.copy() if isinstance(obj, dict) else obj +# +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})") +# +# if not isinstance(obj, ObjToUpdate): +# obj = ObjToUpdate(obj) +# +# # check uniqueness, cannot add the same key twice if allow_multiple == False +# key = obj.get_key() +# self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found") +# if not allow_multiple: +# if isinstance(obj.obj, dict): +# for k in obj.obj: +# if state.contains(entry, k): +# raise IndexError(f"{entry}.{k}") +# else: +# if state.contains(entry, key): +# raise IndexError(f"{entry}.{key}" if key else entry) +# +# state.parents = [] if snapshot is None else [snapshot] +# state.events = [event_digest] +# state.date = datetime.now() +# +# if use_ref: +# obj.set_digest(self.save_obj(obj.obj)) +# obj.obj = self.REF_PREFIX + obj.get_digest() +# +# state.update(entry, obj) +# +# new_snapshot = self.save_state(state) +# self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) +# return SheerkaDataProviderResult(original_obj, entry, obj.get_key(), obj.get_digest()) +# +# def add_with_auto_key(self, event_digest: str, entry, obj): +# """ +# Add obj to entry. An autogenerated key created for obj +# :param event_digest: +# :param entry: +# :param obj: +# :return: +# """ +# +# original_obj = obj.copy() if isinstance(obj, dict) else obj +# +# next_key = self.get_next_key(entry) +# if hasattr(obj, "set_key"): +# obj.set_key(next_key) +# res = self.add(event_digest, entry, ObjToUpdate(obj, next_key)) +# return SheerkaDataProviderResult(original_obj, res.entry, res.key, res.digest) +# +# def add_unique(self, event_digest: str, entry, obj): +# """Add an entry and make sure it's unique""" +# +# original_obj = obj.copy() if isinstance(obj, dict) else obj +# +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# state.parents = [] if snapshot is None else [snapshot] +# state.events = [event_digest] +# state.date = datetime.now() +# if entry not in state.data: +# state.data[entry] = {obj} +# already_exist = False +# else: +# already_exist = obj in state.data[entry] +# if not already_exist: +# state.data[entry].add(obj) +# +# new_snapshot = self.save_state(state) +# self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) +# return SheerkaDataProviderResult( +# original_obj, +# entry, +# None, +# None, +# already_exist) +# +# def set(self, event_digest, entry, obj, use_ref=False, is_ref=False): +# """ +# Add or replace an entry. The entry is reinitialized. +# If the previous value was dict, all keys are lost +# :param event_digest: +# :param entry: +# :param obj: +# :param use_ref: Do not save obj in State (save it under objects), use_ref in State +# :param is_ref: obj is supposed to be a reference +# :return: +# """ +# +# original_obj = obj.copy() if isinstance(obj, dict) else obj +# +# if use_ref and is_ref: +# raise SheerkaDataProviderError("Cannot use use_ref and is_ref at the same time", None) +# +# if is_ref and not isinstance(obj, dict): +# raise SheerkaDataProviderError("is_ref can only be used with dictionaries", obj) +# +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# state.parents = [] if snapshot is None else [snapshot] +# state.events = [event_digest] +# state.date = datetime.now() +# +# key = self.get_obj_key(obj) +# obj = self.save_ref_if_needed(use_ref, obj) +# +# if is_ref: +# for k, v in obj.items(): +# obj[k] = self.REF_PREFIX + v +# +# state.data[entry] = obj if key is None else {key: obj} +# +# new_snapshot = self.save_state(state) +# self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) +# return SheerkaDataProviderResult(original_obj, entry, key, self.get_obj_digest(obj)) +# +# def modify(self, event_digest, entry, key, obj): +# """ +# Replace an element +# If the key is not provided, has the same effect than set eg, the entry is reset +# :param event_digest: +# :param entry: +# :param key: key of the object to update +# :param obj: new data +# :return: +# """ +# +# original_obj = obj.copy() if isinstance(obj, dict) else obj +# +# if key is None: +# raise SheerkaDataProviderError("Key is mandatory.", None) +# +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# if entry not in state.data: +# raise IndexError(entry) +# +# if key is not None and key not in state.data[entry]: +# raise IndexError(f"{entry}.{key}") +# +# state.parents = [] if snapshot is None else [snapshot] +# state.events = [event_digest] +# state.date = datetime.now() +# +# # Gets obj original key, it will help to know if the key has changed +# obj_key = self.get_obj_key(obj) or key +# digest = None +# +# if isinstance(state.data[entry][key], list): +# obj_origin = self.get_obj_origin(obj) +# if obj_origin is None: +# raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj)) +# +# digest = state.modify_in_list( +# entry, +# key, +# obj, +# obj_key, +# obj_origin, +# self.load_ref_if_needed, +# self.save_ref_if_needed) +# +# else: +# was_saved_as_reference = self.is_reference(state.data[entry][key]) +# if was_saved_as_reference: +# obj = self.save_ref_if_needed(True, obj) +# digest = self.get_obj_digest(obj) +# +# state.modify(entry, key, obj, obj_key) +# +# new_snapshot = self.save_state(state) +# self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) +# return SheerkaDataProviderResult(original_obj, entry, obj_key, digest) +# +# def list(self, entry, filter=None): +# """ +# Lists elements of entry 'entry' +# :param entry: name of the entry to list +# :param filter: filter to use +# :return: list of elements +# """ +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# if entry not in state.data: +# return [] +# +# elements = state.data[entry] +# +# if isinstance(elements, dict): +# # manage when elements have a key +# filter_to_use = (lambda k, o: True) if filter is None else filter +# for key, element in elements.items(): +# if filter_to_use(key, element): +# if isinstance(element, list): +# yield [self.load_ref_if_needed(e)[0] for e in element] +# else: +# yield self.load_ref_if_needed(element)[0] +# else: +# # manage when no key is defined for the elements +# if not isinstance(elements, list) and not isinstance(elements, set): +# elements = [elements] +# +# filter_to_use = (lambda o: True) if filter is None else filter +# for element in elements: +# if filter_to_use(element): +# yield self.load_ref_if_needed(element)[0] +# +# def remove(self, event_digest, entry, filter=None, silent_remove=True): +# """ +# Removes elements under the entry 'entry' +# :param event_digest: event that triggers the deletion +# :param entry: +# :param filter: filter to use +# :param silent_remove: Do not throw exception if entry does not exist +# :return: new sha256 of the state +# TODO: Remove by key +# """ +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# if entry not in state.data: +# if silent_remove: +# return snapshot +# else: +# raise IndexError(entry) +# +# state.parents = [] if snapshot is None else [snapshot] +# state.events = [event_digest] +# state.date = datetime.now() +# state.remove(entry, filter) +# +# new_snapshot = self.save_state(state) +# self.set_snapshot(SheerkaDataProvider.HeadFile, new_snapshot) +# return new_snapshot +# +# def get(self, entry, key=None, load_origin=True): +# """ +# Retrieve an element by its key +# :param entry: +# :param key: +# :param load_origin: if True, adds the origin (parent digest) to the object +# :return: +# """ +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# if entry not in state.data: +# raise IndexError(entry) +# +# if key is not None and key not in state.data[entry]: +# raise IndexError(f"{entry}.{key}") +# +# item = state.data[entry] if key is None else state.data[entry][key] +# if isinstance(item, list): +# return [self.load_ref_if_needed(i, load_origin)[0] for i in item] +# +# return self.load_ref_if_needed(item, load_origin)[0] +# +# def get_safe(self, entry, key=None, load_origin=True): +# """ +# Retrieve an element by its key. Return None if the element does not exist +# :param entry: +# :param key: +# :param load_origin: if True, adds the origin (parent digest) to the object +# :return: +# """ +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# if entry not in state.data: +# return None +# +# if key is not None and key not in state.data[entry]: +# return None +# +# item = state.data[entry] if key is None else state.data[entry][key] +# if isinstance(item, list): +# return [self.load_ref_if_needed(i, load_origin)[0] for i in item] +# +# return self.load_ref_if_needed(item, load_origin)[0] +# +# def get_ref(self, entry, key=None): +# """ +# Returns the reference of an object if the object exists +# This function allows to retrieve obj.##origin## without loading the object +# :param entry: +# :param key: +# :return: +# """ +# +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# +# if entry not in state.data: +# raise IndexError(entry) +# +# if key is not None and key not in state.data[entry]: +# raise IndexError(f"{entry}.{key}") +# +# item = state.data[entry] if key is None else state.data[entry][key] +# if isinstance(item, list): +# res = [] +# for element in item: +# if not self.is_reference(element): +# raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") +# res.append(self.get_obj_digest(element)) +# return res +# +# if not self.is_reference(item): +# raise SheerkaDataProviderError("Not a reference", f"{entry}.{key}") +# +# return self.get_obj_digest(item) +# +# def exists(self, entry, key=None, digest=None): +# """ +# Returns true if the entry is defined +# :param key: +# :param entry: +# :param digest: digest of the object, when several entries share the same key +# :return: +# """ +# snapshot = self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(snapshot) +# exist = entry in state.data +# +# if not exist or key is None: +# return exist +# +# items = state.data[entry] +# exist = key in items +# if not exist or digest is None: +# return exist +# +# items = items[key] +# if not isinstance(items, list): +# items = [items] +# +# for item in items: +# item_digest = SheerkaDataProvider.get_obj_digest(item) +# if item_digest == digest: +# return True +# +# return False +# +# def save_event(self, event: Event): +# """ +# return an event, given its digest +# :param event: +# :return: digest of the event +# """ +# parent = self.get_snapshot(SheerkaDataProvider.LastEventFile) +# event.parents = [parent] if parent else None +# digest = event.get_digest() # must be call after setting the parents +# +# target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) +# if self.io.exists(target_path): +# return digest +# +# self.io.write_binary(target_path, self.serializer.serialize(event, None).read()) +# self.set_snapshot(SheerkaDataProvider.LastEventFile, digest) +# +# return digest +# +# def load_event(self, digest=None): +# """ +# return an event, given its digest +# :param digest: +# :return: +# """ +# digest = digest or self.get_snapshot(SheerkaDataProvider.LastEventFile) +# if digest is None: +# return None +# +# target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) +# +# with self.io.open(target_path, "rb") as f: +# return self.serializer.deserialize(f, None) +# +# def load_events(self, page_size, start=0): +# """ +# Load multiple events in the same command +# :param start: +# :param page_size: +# :return: +# """ +# +# digest = None +# if start: +# for i in range(start): +# event = self.load_event(digest) +# if event is None or event.parents is None: +# return +# digest = event.parents[0] +# +# count = 0 +# while count < page_size or page_size <= 0: +# event = self.load_event(digest) +# if event is None: +# return +# +# yield event +# +# if event.parents is None: +# return +# +# digest = event.parents[0] +# count += 1 +# +# def get_result_file_path(self, digest, is_admin): +# ext = "_admin_result" if is_admin else "_result" +# return self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) + ext +# +# def has_result(self, digest, is_admin=False): +# """ +# Check is a result file was created for a specific event +# :param digest: +# :param is_admin: True is the result is an internal admin result file +# :return: +# """ +# target_path = self.get_result_file_path(digest, is_admin) +# return self.io.exists(target_path) +# +# def save_result(self, execution_context, is_admin=False): +# """ +# Save the execution context associated with an event +# To make a long story short, +# for every single user input, there is an event (which is the first thing that is created) +# and a result (the ExecutionContext created by sheerka.evaluate_user_input() +# :param execution_context: +# :param is_admin: True is the result is an internal admin result file +# :return: +# """ +# start = time.time() +# message = execution_context.event.message +# digest = execution_context.event.get_digest() +# self.log.debug(f"Saving execution context. digest={digest}, message={message}") +# target_path = self.get_result_file_path(digest, is_admin) +# if self.io.exists(target_path): +# return digest +# +# context = SerializerContext(sheerka=self.sheerka) +# length = self.io.write_binary(target_path, self.serializer.serialize(execution_context, context).read()) +# elapsed = time.time() - start +# self.log.debug(f"Saved execution context. message={message}, length={length}, elapsed={elapsed}") +# return digest +# +# def load_result(self, digest, is_admin=False): +# """ +# Load and deserialize a result file +# :param digest: +# :param is_admin: True is the result is an internal admin result file +# :return: +# """ +# target_path = self.get_result_file_path(digest, is_admin) +# +# with self.io.open(target_path, "rb") as f: +# context = SerializerContext(sheerka=self.sheerka) +# return self.serializer.deserialize(f, context) +# +# def save_state(self, state: State): +# digest = state.get_digest() +# self.log.debug(f"Saving new state. digest={digest}") +# target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) +# if self.io.exists(target_path): +# return digest +# +# context = SerializerContext(sheerka=self.sheerka) +# self.io.write_binary(target_path, self.serializer.serialize(state, context).read()) +# return digest +# +# def load_state(self, digest): +# if digest is None: +# return State() +# +# target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) +# with self.io.open(target_path, "rb") as f: +# context = SerializerContext(sheerka=self.sheerka) +# return self.serializer.deserialize(f, context) +# +# def save_obj(self, obj): +# self.log.debug(f"Saving '{obj}' as reference...") +# context = SerializerContext(user_name="kodjo", sheerka=self.sheerka) +# stream = self.serializer.serialize(obj, context) +# digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream) +# +# target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) +# if self.io.exists(target_path): +# self.log.debug(f"...already saved. digest is {digest}") +# return digest +# +# self.io.write_binary(target_path, stream.read()) +# +# self.log.debug(f"...digest={digest}.") +# return digest +# +# def load_obj(self, digest, add_origin=True): +# if digest is None: +# return None +# +# target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) +# if not self.io.exists(target_path): +# return None +# +# with self.io.open(target_path, "rb") as f: +# context = SerializerContext(origin=digest, sheerka=self.sheerka) +# obj = self.serializer.deserialize(f, context) +# +# # set the origin of the object +# if add_origin: +# if isinstance(obj, dict): +# obj[Serializer.ORIGIN] = digest +# elif not isinstance(obj, str): +# setattr(obj, Serializer.ORIGIN, digest) +# return obj +# +# def load_ref_if_needed(self, obj, load_origin=True): +# if isinstance(obj, SheerkaDataProviderRef): +# resolved = self.load_obj(obj.target, load_origin) +# return resolved, False +# +# if not isinstance(obj, str) or not obj.startswith(SheerkaDataProvider.REF_PREFIX): +# return obj, False +# +# resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):], load_origin) +# return (obj, False) if resolved is None else (resolved, True) +# +# def save_ref_if_needed(self, save_ref, obj): +# if not save_ref: +# return obj +# +# digest = self.save_obj(obj) +# return self.REF_PREFIX + digest +# +# def get_cache_params(self, category, key): +# digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() +# cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest) +# return digest, cache_path +# +# def add_to_cache(self, category, key, obj, update=False): +# """ +# Save obj in the internal cache system +# :param category: +# :param key: +# :param obj: +# :param update: +# :return: +# """ +# digest, cache_path = self.get_cache_params(category, key) +# +# if self.io.exists(cache_path) and not update: +# return digest +# +# self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9)) +# return digest +# +# def load_from_cache(self, category, key): +# """ +# Reload a compress object from the cache +# :param category: +# :param key: +# :return: +# """ +# digest, cache_path = self.get_cache_params(category, key) +# +# if not self.io.exists(cache_path): +# raise IndexError(f"{category}.{key}") +# +# with self.io.open(cache_path, "rb") as f: +# return zlib.decompress(f.read()).decode("utf-8") +# +# def remove_from_cache(self, category, key): +# """ +# +# :param category: +# :param key: +# :return: +# """ +# digest, cache_path = self.get_cache_params(category, key) +# if self.io.exists(cache_path): +# self.io.remove(cache_path) +# +# return digest +# +# def in_cache(self, category, key): +# """ +# Returns true if the key is in cache +# :param category: +# :param key: +# :return: +# """ +# digest, cache_path = self.get_cache_params(category, key) +# return self.io.exists(cache_path) +# +# def get_snapshot(self, file): +# head_file = self.io.path_join(file) +# if not self.io.exists(head_file): +# return None +# return self.io.read_text(head_file) +# # with open(head_file, "r") as f: +# # return f.read() +# +# def set_snapshot(self, file, digest): +# head_file = self.io.path_join(file) +# return self.io.write_text(head_file, digest) +# # with open(head_file, "w") as f: +# # return f.write(digest) +# +# def load_keys(self): +# keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) +# if not self.io.exists(keys_file): +# keys = {} +# else: +# with self.io.open(keys_file, "r") as f: +# keys = json.load(f) +# return keys +# +# def save_keys(self, keys): +# keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) +# with self.io.open(keys_file, "w") as f: +# json.dump(keys, f) +# +# def get_next_key(self, entry): +# keys = self.load_keys() +# +# next_key = keys.get(entry, 0) + 1 +# keys[entry] = next_key +# +# self.save_keys(keys) +# return str(next_key) +# +# def set_key(self, entry, value): +# keys = self.load_keys() +# keys[entry] = value +# self.save_keys(keys) +# return str(value) +# +# def dump_state(self, digest=None): +# digest = digest or self.get_snapshot(SheerkaDataProvider.HeadFile) +# state = self.load_state(digest) +# print(json.dumps(state.data, sort_keys=True, default=json_default_converter, indent=True)) +# +# def dump_obj(self, digest): +# obj = self.load_obj(digest) +# print(json.dumps(obj.__dict__, sort_keys=True, default=json_default_converter, indent=True))