from datetime import datetime, date import hashlib import json import zlib from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO from sdp.sheerkaSerializer import Serializer, SerializerContext from core.sheerka_logger import get_logger def json_default_converter(o): """ Default formatter for json It's used when the json serializer does not know how to serialise a type :param o: :return: """ if isinstance(o, (date, datetime)): return o.isoformat() class Event(object): """ Class that represents something that modifies the state of the system """ def __init__(self, message="", user="", date=datetime.now()): self.version = 1 self.user = user self.date = date self.message = message self._digest = None def get_digest(self): """ Returns the digest of the event :return: hexa form of the sha256 """ if self._digest: return self._digest if self.message == "" and self.user == "": self._digest = "xxx" # to speed unit tests return self._digest if not isinstance(self.message, str): raise NotImplementedError self._digest = hashlib.sha256(f"Event:{self.user}{self.date}{self.message}".encode("utf-8")).hexdigest() return self._digest def to_dict(self): return self.__dict__ def from_dict(self, as_dict): self.user = as_dict["user"] self.date = datetime.fromisoformat(as_dict["date"]) self.message = as_dict["message"] class ObjToUpdate: """ Internal key value class to hold the key (and the value) when it is detected It's created to distinguish from {key, value} """ def __init__(self, obj, key=None, digest=None): self.obj = obj self.has_key = None self.has_digest = None self._key = None self._digest = None if key is not None: self.set_key(key) if digest is not None: self.set_digest(digest) def get_key(self): if self.has_key is None: key = SheerkaDataProvider.get_obj_key(self.obj) if key is None: self.has_key = False return None else: self.has_key = True self._key = key return key elif not self.has_key: return None else: return self._key def get_digest(self): if self.has_digest is None: digest = SheerkaDataProvider.get_obj_digest(self.obj) if digest is None: self.has_digest = False return None else: self.has_digest = True self._digest = digest return digest elif not self.has_digest: return None else: return self._digest def set_digest(self, digest): self.has_digest = True self._digest = digest def set_key(self, key): self.has_key = True self._key = key class State: """ Class that represents the state of the system (dictionary of all known entries) """ def __init__(self): self.version = 1 self.date = None self.parents = [] self.events = [] self.data = {} @staticmethod def check_duplicate(items, obj: ObjToUpdate, key): digest = obj.get_digest() if digest is None: return if not hasattr(items, "__iter__"): items = [items] for item in items: item_digest = SheerkaDataProvider.get_obj_digest(item) if item_digest == digest: raise SheerkaDataProviderDuplicateKeyError(key, obj.obj) def update(self, entry, obj: ObjToUpdate, append=True): """ adds obj to entry :param entry: :param obj: :param append: if True, duplicate keys will create lists :return: """ obj_to_use = {obj.get_key(): obj.obj} if obj.has_key else obj.obj if entry not in self.data: self.data[entry] = obj_to_use elif not append: if isinstance(obj_to_use, dict): self.data[entry].update(obj_to_use) else: self.data[entry] = obj_to_use elif isinstance(self.data[entry], list): self.check_duplicate(self.data[entry], obj, entry) self.data[entry].append(obj.obj) elif isinstance(obj_to_use, dict): for k in obj_to_use: if k not in self.data[entry]: self.data[entry][k] = obj_to_use[k] elif isinstance(self.data[entry][k], list): self.check_duplicate(self.data[entry][k], obj, entry + "." + k) self.data[entry][k].append(obj_to_use[k]) else: self.check_duplicate(self.data[entry][k], obj, entry + "." + k) self.data[entry][k] = [self.data[entry][k], obj_to_use[k]] elif isinstance(self.data[entry], dict): raise SheerkaDataProviderError(f"Cannot found key on '{obj.obj}' while all other elements have.", obj.obj) else: self.check_duplicate(self.data[entry], obj, entry) self.data[entry] = [self.data[entry], obj_to_use] def modify(self, entry, key, obj, obj_key): # if the key changes, make sure to remove the previous entry append = False if obj_key != key: self.remove(entry, lambda k, o: k == key) # modify from on object to another append = True self.update(entry, ObjToUpdate(obj, obj_key), append=append) def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed): found = False to_remove = None for i in range(len(self.data[entry][key])): item, is_ref = load_ref_if_needed(self.data[entry][key][i]) if not hasattr(item, "get_digest"): continue if item.get_digest() == obj_origin: obj = save_ref_if_needed(is_ref, obj) if obj_key == key: self.data[entry][key][i] = obj else: to_remove = i self.update(entry, ObjToUpdate(obj, obj_key), append=True) found = True break if not found: raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj)) if to_remove is not None: del self.data[entry][key][to_remove] def remove(self, entry, filter): if filter is None: del (self.data[entry]) elif isinstance(self.data[entry], dict): keys_to_remove = [] for key, element in self.data[entry].items(): if filter(key, element): keys_to_remove.append(key) for key in keys_to_remove: del (self.data[entry][key]) elif not isinstance(self.data[entry], list): if filter(self.data[entry]): del (self.data[entry]) else: for element in self.data[entry]: if filter(element): self.data[entry].remove(element) def get_digest(self): as_json = json.dumps(self.__dict__, default=json_default_converter) return hashlib.sha256(as_json.encode("utf-8")).hexdigest() def contains(self, entry, key): """ if key is None, returns True if entry exists if key has a value returns True if entry is an dict and contains key :param entry: :param key: :return: """ if entry not in self.data: return False if key is None: return entry in self.data if not isinstance(self.data[entry], dict): return False return key in self.data[entry] class SheerkaDataProviderError(Exception): def __init__(self, message, obj): Exception.__init__(self, message) self.obj = obj class SheerkaDataProviderDuplicateKeyError(Exception): def __init__(self, key, obj): Exception.__init__(self, "Duplicate object.") self.key = key self.obj = obj class SheerkaDataProvider: """Manages the state of the system""" EventFolder = "events" StateFolder = "state" ObjectsFolder = "objects" CacheFolder = "cache" HeadFile = "HEAD" KeysFile = "keys" REF_PREFIX = "##REF##:" def __init__(self, root=None): self.log = get_logger(__name__) self.init_log = get_logger("init." + __name__) self.init_log.debug("Initializing sdp.") self.io = SheerkaDataProviderIO.get(root) self.first_time = self.io.first_time self.serializer = Serializer() @staticmethod def get_obj_key(obj): """ Tries to find the key of an object Look for .key, .get_key() :param obj: :return: String version of that is found, None otherwise """ return str(obj.key) if hasattr(obj, "key") \ else str(obj.get_key()) if hasattr(obj, "get_key") \ else None @staticmethod def get_obj_digest(obj): """ Tries to find the key of an object Look for .digest, .get_digest() :param obj: :return: digest, None otherwise """ if isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX): return obj[len(SheerkaDataProvider.REF_PREFIX):] return obj.digest if hasattr(obj, "digest") \ else obj.get_digest() if hasattr(obj, "get_digest") \ else None @staticmethod def get_obj_origin(obj): """ Get the digest used to save obj if set """ if isinstance(obj, dict) and Serializer.ORIGIN in obj: return obj[Serializer.ORIGIN] if hasattr(obj, Serializer.ORIGIN): return getattr(obj, Serializer.ORIGIN) return None @staticmethod def get_stream_digest(stream): sha256_hash = hashlib.sha256() for byte_block in iter(lambda: stream.read(4096), b""): sha256_hash.update(byte_block) stream.seek(0) return sha256_hash.hexdigest() @staticmethod def is_reference(obj): return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX) def add(self, event_digest: str, entry, obj, allow_multiple=True, use_ref=False): """ Adds obj to the entry 'entry' :param event_digest: digest of the event that triggers the modification of the state :param entry: entry of the state to update :param obj: obj to insert or add :param allow_multiple: if set to true, the same key can be added several times. All entries will be put in a list :param use_ref: if True the actual object is saved under 'objects' folder, only a reference is saved in the state :return: (entry, key) to retrieve the object """ snapshot = self.get_snapshot() state = self.load_state(snapshot) self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})") if not isinstance(obj, ObjToUpdate): obj = ObjToUpdate(obj) # check uniqueness, cannot add the same key twice if allow_multiple == False key = obj.get_key() self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found") if not allow_multiple: if isinstance(obj.obj, dict): for k in obj.obj: if state.contains(entry, k): raise IndexError(f"{entry}.{k}") else: if state.contains(entry, key): raise IndexError(f"{entry}.{key}" if key else entry) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if use_ref: obj.set_digest(self.save_obj(obj.obj)) obj.obj = self.REF_PREFIX + obj.get_digest() state.update(entry, obj) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, key def add_with_auto_key(self, event_digest: str, entry, obj): """ Add obj to entry. An autogenerated key created for obj :param event_digest: :param entry: :param obj: :return: """ next_key = self.get_next_key(entry) if hasattr(obj, "set_key"): obj.set_key(next_key) self.add(event_digest, entry, ObjToUpdate(obj, next_key)) return entry, next_key def add_unique(self, event_digest: str, entry, obj): """Add an entry and make sure it's unique""" snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if entry not in state.data: state.data[entry] = {obj} already_exist = False else: already_exist = obj in state.data[entry] if not already_exist: state.data[entry].add(obj) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return (None if already_exist else entry), None def set(self, event_digest, entry, obj, use_ref=False): """ Add or replace an entry. The entry is reinitialized. If the previous value was dict, all keys are lost :param event_digest: :param entry: :param obj: :param use_ref: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() key = self.get_obj_key(obj) obj = self.save_ref_if_needed(use_ref, obj) state.data[entry] = obj if key is None else {key: obj} new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, key def modify(self, event_digest, entry, key, obj): """ Replace an element If the key is not provided, has the same effect than set eg, the entry is reset :param event_digest: :param entry: :param key: key of the object to update :param obj: new data :return: """ if key is None: raise SheerkaDataProviderError("Key is mandatory.", None) snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() # Gets obj original key, it will help to know if the key has changed obj_key = self.get_obj_key(obj) or key if isinstance(state.data[entry][key], list): obj_origin = self.get_obj_origin(obj) if obj_origin is None: raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj)) state.modify_in_list(entry, key, obj, obj_key, obj_origin, self.load_ref_if_needed, self.save_ref_if_needed) else: obj = self.save_ref_if_needed(self.is_reference(state.data[entry][key]), obj) state.modify(entry, key, obj, obj_key) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, obj_key def list(self, entry, filter=None): """ Lists elements of entry 'entry' :param entry: name of the entry to list :param filter: filter to use :return: list of elements """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return [] elements = state.data[entry] if isinstance(elements, dict): # manage when elements have a key filter_to_use = (lambda k, o: True) if filter is None else filter for key, element in elements.items(): if filter_to_use(key, element): if isinstance(element, list): yield [self.load_ref_if_needed(e)[0] for e in element] else: yield self.load_ref_if_needed(element)[0] else: # manage when no key is defined for the elements if not isinstance(elements, list) and not isinstance(elements, set): elements = [elements] filter_to_use = (lambda o: True) if filter is None else filter for element in elements: if filter_to_use(element): yield self.load_ref_if_needed(element)[0] def remove(self, event_digest, entry, filter=None): """ Removes elements under the entry 'entry' :param event_digest: event that triggers the deletion :param entry: :param filter: filter to use :return: new sha256 of the state TODO: Remove by key """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() state.remove(entry, filter) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def get(self, entry, key=None, load_origin=True): """ Retrieve an element by its key :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") item = state.data[entry] if key is None else state.data[entry][key] if isinstance(item, list): return [self.load_ref_if_needed(i, load_origin)[0] for i in item] return self.load_ref_if_needed(item, load_origin)[0] def get_safe(self, entry, key=None, load_origin=True): """ Retrieve an element by its key. Return None if the element does not exist :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return None if key is not None and key not in state.data[entry]: return None item = state.data[entry] if key is None else state.data[entry][key] if isinstance(item, list): return [self.load_ref_if_needed(i, load_origin)[0] for i in item] return self.load_ref_if_needed(item, load_origin)[0] def exists(self, entry, key=None, digest=None): """ Returns true if the entry is defined :param key: :param entry: :param digest: digest of the object, when several entries share the same key :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) exist = entry in state.data if not exist or key is None: return exist items = state.data[entry] exist = key in items if not exist or digest is None: return exist items = items[key] if not isinstance(items, list): items = [items] for item in items: item_digest = SheerkaDataProvider.get_obj_digest(item) if item_digest == digest: return True return False def save_event(self, event: Event): """ return an event, given its digest :param event: :return: digest of the event """ digest = event.get_digest() target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) if self.io.exists(target_path): return digest self.io.write_binary(target_path, self.serializer.serialize(event, None).read()) return digest def load_event(self, digest): """ return an event, given its digest :param digest: :return: """ target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) with self.io.open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_state(self, state: State): digest = state.get_digest() self.log.debug(f"Saving new state. digest={digest}") target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) if self.io.exists(target_path): return digest self.io.write_binary(target_path, self.serializer.serialize(state, None).read()) return digest def load_state(self, digest): if digest is None: return State() target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest) with self.io.open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_obj(self, obj): self.log.debug(f"Saving '{obj}' as reference...") stream = self.serializer.serialize(obj, SerializerContext(user_name="kodjo")) digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream) target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) if self.io.exists(target_path): self.log.debug(f"...already saved. digest is {digest}") return digest self.io.write_binary(target_path, stream.read()) self.log.debug(f"...digest={digest}.") return digest def load_obj(self, digest, add_origin=True): if digest is None: return None target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest) if not self.io.exists(target_path): return None with self.io.open(target_path, "rb") as f: obj = self.serializer.deserialize(f, SerializerContext(origin=digest)) # set the origin of the object if add_origin: if isinstance(obj, dict): obj[Serializer.ORIGIN] = digest elif not isinstance(obj, str): setattr(obj, Serializer.ORIGIN, digest) return obj def load_ref_if_needed(self, obj, load_origin=True): if not isinstance(obj, str): return obj, False if not obj.startswith(SheerkaDataProvider.REF_PREFIX): return obj, False resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):], load_origin) if resolved is None: return obj, False return resolved, True def save_ref_if_needed(self, save_ref, obj): if not save_ref: return obj digest = self.save_obj(obj) return self.REF_PREFIX + digest def get_cache_params(self, category, key): digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest) return digest, cache_path def add_to_cache(self, category, key, obj, update=False): """ Save obj in the internal cache system :param category: :param key: :param obj: :param update: :return: """ digest, cache_path = self.get_cache_params(category, key) if self.io.exists(cache_path) and not update: return digest self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9)) return digest def load_from_cache(self, category, key): """ Reload a compress object from the cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if not self.io.exists(cache_path): raise IndexError(f"{category}.{key}") with self.io.open(cache_path, "rb") as f: return zlib.decompress(f.read()).decode("utf-8") def remove_from_cache(self, category, key): """ :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if self.io.exists(cache_path): self.io.remove(cache_path) return digest def in_cache(self, category, key): """ Returns true if the key is in cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) return self.io.exists(cache_path) def get_snapshot(self): head_file = self.io.path_join(SheerkaDataProvider.HeadFile) if not self.io.exists(head_file): return None return self.io.read_text(head_file) # with open(head_file, "r") as f: # return f.read() def set_snapshot(self, digest): head_file = self.io.path_join(SheerkaDataProvider.HeadFile) return self.io.write_text(head_file, digest) # with open(head_file, "w") as f: # return f.write(digest) def load_keys(self): keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) if not self.io.exists(keys_file): keys = {} else: with self.io.open(keys_file, "r") as f: keys = json.load(f) return keys def save_keys(self, keys): keys_file = self.io.path_join(SheerkaDataProvider.KeysFile) with self.io.open(keys_file, "w") as f: json.dump(keys, f) def get_next_key(self, entry): keys = self.load_keys() next_key = keys.get(entry, 0) + 1 keys[entry] = next_key self.save_keys(keys) return str(next_key) def set_key(self, entry, value): keys = self.load_keys() keys[entry] = value self.save_keys(keys) return str(value)