from os import path import os from datetime import datetime, date import hashlib import json import zlib from sdp.sheerkaSerializer import Serializer, SerializerContext def json_default_converter(o): """ Default formatter for json It's used when the json serializer does not know how to serialise a type :param o: :return: """ if isinstance(o, (date, datetime)): return o.isoformat() class Event(object): """ Class that represents something that modifies the state of the system """ def __init__(self, message="", user="kodjo", date=datetime.now()): self.version = 1 self.user = user self.date = date self.message = message def get_digest(self): """ Returns the digest of the event :return: hexa form of the sha256 """ if not isinstance(self.message, str): raise NotImplementedError return hashlib.sha256(f"Event:{self.user}{self.date}{self.message}".encode("utf-8")).hexdigest() def to_dict(self): return self.__dict__ def from_dict(self, as_dict): self.user = as_dict["user"] self.date = datetime.fromisoformat(as_dict["date"]) self.message = as_dict["message"] class ObjWithKey: """ Internal key value class to hold the key (and the value) when it is detected It's created to distinguish from {key, value} """ def __init__(self, key, obj): self.key = key self.obj = obj class State: """ Class that represents the state of the system (dictionary of all known entries) """ def __init__(self): self.version = 1 self.date = None self.parents = [] self.events = [] self.data = {} def update(self, entry, obj, append=True): """ adds obj to entry :param entry: :param obj: :param append: if True, ducplicate keys will create lists :return: """ obj_to_use = {obj.key: obj.obj} if isinstance(obj, ObjWithKey) else obj if entry not in self.data: self.data[entry] = obj_to_use elif not append: if isinstance(obj_to_use, dict): self.data[entry].update(obj_to_use) else: self.data[entry] = obj_to_use elif isinstance(self.data[entry], list): self.data[entry].append(obj.obj if isinstance(obj, ObjWithKey) else obj) # do not use obj_to_use ! elif isinstance(obj_to_use, dict): for k in obj_to_use: if k not in self.data[entry]: self.data[entry][k] = obj_to_use[k] elif isinstance(self.data[entry][k], list): self.data[entry][k].append(obj_to_use[k]) else: self.data[entry][k] = [self.data[entry][k], obj_to_use[k]] elif isinstance(self.data[entry], dict): raise SheerkaDataProviderError(f"Cannot found key on '{obj}' while all other elements have.", obj) else: self.data[entry] = [self.data[entry], obj_to_use] def modify(self, entry, key, obj, obj_key): # if the key changes, make sure to remove the previous entry append = False if obj_key != key: self.remove(entry, lambda k, o: k == key) # modify from on object to another append = True self.update(entry, ObjWithKey(obj_key, obj), append=append) def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed): found = False to_remove = None for i in range(len(self.data[entry][key])): item, is_ref = load_ref_if_needed(self.data[entry][key][i]) if not hasattr(item, "get_digest"): continue if item.get_digest() == obj_origin: obj = save_ref_if_needed(is_ref, obj) if obj_key == key: self.data[entry][key][i] = obj else: to_remove = i self.update(entry, ObjWithKey(obj_key, obj), append=True) found = True break if not found: raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj)) if to_remove is not None: del self.data[entry][key][to_remove] def remove(self, entry, filter): if filter is None: del (self.data[entry]) elif isinstance(self.data[entry], dict): keys_to_remove = [] for key, element in self.data[entry].items(): if filter(key, element): keys_to_remove.append(key) for key in keys_to_remove: del (self.data[entry][key]) elif not isinstance(self.data[entry], list): if filter(self.data[entry]): del (self.data[entry]) else: for element in self.data[entry]: if filter(element): self.data[entry].remove(element) def get_digest(self): as_json = json.dumps(self.__dict__, default=json_default_converter) return hashlib.sha256(as_json.encode("utf-8")).hexdigest() def contains(self, entry, key): """ if key is None, returns True if entry exists if key has a value returns True if entry is an dict and contains key :param entry: :param key: :return: """ if entry not in self.data: return False if key is None: return entry in self.data if not isinstance(self.data[entry], dict): return False return key in self.data[entry] class SheerkaDataProviderError(Exception): def __init__(self, message, obj): Exception.__init__(self, message) self.obj = obj class SheerkaDataProvider: """Manages the state of the system""" EventFolder = "events" StateFolder = "state" ObjectsFolder = "objects" CacheFolder = "cache" HeadFile = "HEAD" KeysFile = "keys" REF_PREFIX = "##REF##:" def __init__(self, root=None): self.root = path.abspath(path.join(path.expanduser("~"), ".sheerka")) \ if root is None \ else path.abspath(root) if not path.exists(self.root): os.makedirs(self.root) self.serializer = Serializer() def get_obj_path(self, object_type, digest): return path.join(self.root, object_type, digest[:24], digest) @staticmethod def get_obj_key(obj): """ Tries to find the key of an object Look for .key, .get_key() :param obj: :return: String version of that is found, None otherwise """ return str(obj.key) if hasattr(obj, "key") else str(obj.get_key()) if hasattr(obj, "get_key") else None @staticmethod def get_stream_digest(stream): sha256_hash = hashlib.sha256() for byte_block in iter(lambda: stream.read(4096), b""): sha256_hash.update(byte_block) stream.seek(0) return sha256_hash.hexdigest() @staticmethod def is_reference(obj): return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX) def add(self, event: Event, entry, obj, allow_multiple=True, use_ref=False): """ Adds obj to the entry 'entry' :param event: events that triggers the update of the state :param entry: entry of the state to update :param obj: obj to insert or add :param allow_multiple: if set to true, the same key can be added several times. All entries will be put in a list :param use_ref: if True the actual object is saved under 'objects' folder, only a reference is saved in the state :return: (entry, key) to retrieve the object """ event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) # check uniqueness, cannot add the same key twice if allow_multiple == False key = self.get_obj_key(obj) if not allow_multiple: if isinstance(obj, dict): for k in obj: if state.contains(entry, k): raise IndexError(f"{entry}.{k}") else: if state.contains(entry, key): raise IndexError(f"{entry}.{key}" if key else entry) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if use_ref: digest = self.save_obj(obj) obj = ObjWithKey(key, self.REF_PREFIX + digest) if key else self.REF_PREFIX + digest state.update(entry, obj if (isinstance(obj, ObjWithKey) or key is None) else ObjWithKey(key, obj)) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, key def add_with_auto_key(self, event: Event, entry, obj): """ Add obj to entry. An autogenerated key created for obj :param event: :param entry: :param obj: :return: """ next_key = self.get_next_key(entry) if hasattr(obj, "set_key"): obj.set_key(next_key) self.add(event, entry, ObjWithKey(next_key, obj)) return entry, next_key def add_unique(self, event: Event, entry, obj): """Add an entry and make sure it's unique""" event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if entry not in state.data: state.data[entry] = {obj} else: state.data[entry].add(obj) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, None def set(self, event: Event, entry, obj, use_ref=False): """ Add or replace an entry. The entry is reinitialized. If the previous value was dict, all keys are lost :param event: :param entry: :param obj: :param use_ref: :return: """ event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() key = self.get_obj_key(obj) obj = self.save_ref_if_needed(use_ref, obj) state.data[entry] = obj if key is None else {key: obj} new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, key def modify(self, event: Event, entry, key, obj): """ Replace an element If the key is not provided, has the same effect than set eg, the entry is reset :param event: :param entry: :param key: key of the object to update :param obj: new data :return: """ if key is None: raise SheerkaDataProviderError("Key is mandatory.", None) event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() # Gets obj original key, it will help to know if the key has changed obj_key = self.get_obj_key(obj) or key if isinstance(state.data[entry][key], list): if not hasattr(obj, Serializer.ORIGIN): raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj)) obj_origin = getattr(obj, Serializer.ORIGIN) state.modify_in_list(entry, key, obj, obj_key, obj_origin, self.load_ref_if_needed, self.save_ref_if_needed) else: obj = self.save_ref_if_needed(self.is_reference(state.data[entry][key]), obj) state.modify(entry, key, obj, obj_key) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return entry, obj_key def list(self, entry, filter=None): """ Lists elements of entry 'entry' :param entry: name of the entry to list :param filter: filter to use :return: list of elements """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return [] elements = state.data[entry] if isinstance(elements, dict): # manage when elements have a key filter_to_use = (lambda k, o: True) if filter is None else filter for key, element in elements.items(): if filter_to_use(key, element): yield self.load_ref_if_needed(element)[0] else: # manage when no key is defined for the elements if not isinstance(elements, list) and not isinstance(elements, set): elements = [elements] filter_to_use = (lambda o: True) if filter is None else filter for element in elements: if filter_to_use(element): yield self.load_ref_if_needed(element)[0] def remove(self, event: Event, entry, filter=None): """ Removes elements under the entry 'entry' :param event: event that triggers the deletion :param entry: :param filter: filter to use :return: new sha256 of the state TODO: Remove by key """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) event_digest = self.save_event(event) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() state.remove(entry, filter) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def get(self, entry, key=None): """ Retrieve an element by its key :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") item = state.data[entry] if key is None else state.data[entry][key] if isinstance(item, list): return [self.load_ref_if_needed(i)[0] for i in item] return self.load_ref_if_needed(item)[0] def get_safe(self, entry, key=None): """ Retrieve an element by its key. Return None if the element does not exist :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return None if key is not None and key not in state.data[entry]: return None return self.load_ref_if_needed(state.data[entry] if key is None else state.data[entry][key])[0] def exists(self, entry): """ Returns true if the entry is defined :param entry: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) return entry in state.data def save_event(self, event: Event): """ return an event, given its digest :param event: :return: digest of the event """ digest = event.get_digest() target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(self.serializer.serialize(event, None).read()) return digest def load_event(self, digest): """ return an event, given its digest :param digest: :return: """ target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest) with open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_state(self, state: State): digest = state.get_digest() target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(self.serializer.serialize(state, None).read()) return digest def load_state(self, digest): if digest is None: return State() target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest) with open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_obj(self, obj): stream = self.serializer.serialize(obj, SerializerContext(user_name="kodjo")) digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream) target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(stream.read()) return digest def load_obj(self, digest): if digest is None: return None target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest) if not path.exists(target_path): return None with open(target_path, "rb") as f: obj = self.serializer.deserialize(f, SerializerContext(origin=digest)) # set the origin of the object if not isinstance(obj, str): setattr(obj, Serializer.ORIGIN, digest) return obj def load_ref_if_needed(self, obj): if not isinstance(obj, str): return obj, False if not obj.startswith(SheerkaDataProvider.REF_PREFIX): return obj, False resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):]) if resolved is None: return obj, False return resolved, True def save_ref_if_needed(self, save_ref, obj): if not save_ref: return obj digest = self.save_obj(obj) return self.REF_PREFIX + digest def get_cache_params(self, category, key): digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() cache_path = path.join(self.root, SheerkaDataProvider.CacheFolder, digest[:24], digest) return digest, cache_path def add_to_cache(self, category, key, obj, update=False): """ Save obj in the internal cache system :param category: :param key: :param obj: :param update: :return: """ digest, cache_path = self.get_cache_params(category, key) if path.exists(cache_path) and not update: return digest if not path.exists(path.dirname(cache_path)): os.makedirs(path.dirname(cache_path)) with open(cache_path, "wb") as f: f.write(zlib.compress(obj.encode("utf-8"), 9)) return digest def load_from_cache(self, category, key): """ Reload a compress object from the cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if not path.exists(cache_path): raise IndexError(f"{category}.{key}") with open(cache_path, "rb") as f: return zlib.decompress(f.read()).decode("utf-8") def remove_from_cache(self, category, key): """ :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if path.exists(cache_path): os.remove(cache_path) return digest def in_cache(self, category, key): """ Returns true if the key is in cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) return path.exists(cache_path) def get_snapshot(self): head_file = path.join(self.root, SheerkaDataProvider.HeadFile) if not path.exists(head_file): return None with open(head_file, "r") as f: return f.read() def set_snapshot(self, digest): head_file = path.join(self.root, SheerkaDataProvider.HeadFile) with open(head_file, "w") as f: return f.write(digest) def load_keys(self): keys_file = path.join(self.root, SheerkaDataProvider.KeysFile) if not path.exists(keys_file): keys = {} else: with open(keys_file, "r") as f: keys = json.load(f) return keys def save_keys(self, keys): keys_file = path.join(self.root, SheerkaDataProvider.KeysFile) with open(keys_file, "w") as f: json.dump(keys, f) def get_next_key(self, entry): keys = self.load_keys() next_key = keys.get(entry, 0) + 1 keys[entry] = next_key self.save_keys(keys) return str(next_key) def set_key(self, entry, value): keys = self.load_keys() keys[entry] = value self.save_keys(keys) return str(value)