from os import path import os from datetime import datetime, date import hashlib import json import zlib from sdp.sheerkaSerializer import Serializer, SerializerContext def json_default_converter(o): """ Default formatter for json It's used when the json serializer does not know how to serialise a type :param o: :return: """ if isinstance(o, (date, datetime)): return o.isoformat() class Event(object): """ Class that represents something that modifies the state of the system """ def __init__(self, message="", user="kodjo", date=datetime.now()): self.version = 1 self.user = user self.date = date self.message = message def get_digest(self): """ Returns the digest of the event :return: hexa form of the sha256 """ if not isinstance(self.message, str): raise NotImplementedError return hashlib.sha256(f"Event:{self.user}{self.date}{self.message}".encode("utf-8")).hexdigest() def to_dict(self): return self.__dict__ def from_dict(self, as_dict): self.user = as_dict["user"] self.date = datetime.fromisoformat(as_dict["date"]) self.message = as_dict["message"] class State: """ Class that represents the state of the system (dictionary of all known entries) """ def __init__(self): self.version = 1 self.date = None self.parents = [] self.events = [] self.data = {} def update(self, entry, obj, append=True): obj_to_use = {str(obj.get_key()): obj} if hasattr(obj, "get_key") else obj if entry not in self.data: self.data[entry] = obj_to_use elif isinstance(obj_to_use, dict): if append: self.data[entry].update(obj_to_use) else: self.data[entry] = obj_to_use elif isinstance(self.data[entry], list): if append: self.data[entry].append(obj_to_use) else: self.data[entry] = obj_to_use else: if append: self.data[entry] = [self.data[entry], obj_to_use] else: self.data[entry] = obj_to_use def remove(self, entry, filter): if filter is None: del (self.data[entry]) elif isinstance(self.data[entry], dict): keys_to_remove = [] for key, element in self.data[entry].items(): if filter(key, element): keys_to_remove.append(key) for key in keys_to_remove: del (self.data[entry][key]) elif not isinstance(self.data[entry], list): if filter(self.data[entry]): del (self.data[entry]) else: for element in self.data[entry]: if filter(element): self.data[entry].remove(element) def get_digest(self): as_json = json.dumps(self.__dict__, default=json_default_converter) return hashlib.sha256(as_json.encode("utf-8")).hexdigest() def contains(self, entry, key): if entry not in self.data: return False if not isinstance(self.data[entry], dict): return False return key in self.data[entry] class SheerkaDataProvider: """Manages the state of the system""" EventFolder = "events" StateFolder = "state" ObjectsFolder = "objects" CacheFolder = "cache" HeadFile = "HEAD" KeysFile = "keys" def __init__(self, root=None): self.root = path.abspath(path.join(path.expanduser("~"), ".sheerka")) \ if root is None \ else path.abspath(root) if not path.exists(self.root): os.makedirs(self.root) self.serializer = Serializer() def get_obj_path(self, object_type, digest): path.join(self.root, object_type, digest[:24], digest) def add(self, event: Event, entry, obj): """ Adds obj to the entry 'entry' :param event: events that triggers the update of the state :param entry: entry of the state to update :param obj: obj to insert or add :return: new sha256 of the state """ event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) # check uniqueness, cannot add the same key twice obj_key = str(obj.get_key()) if hasattr(obj, "get_key") else None if state.contains(entry, obj_key): raise IndexError(f"{entry}.{obj_key}") elif isinstance(obj, dict): for k in obj: if state.contains(entry, k): raise IndexError(f"{entry}.{k}") state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() state.update(entry, obj if obj_key is None else {obj_key: obj}) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def add_with_auto_key(self, event: Event, entry, obj): """ Add obj to entry. An autogenerated key created for obj :param event: :param entry: :param obj: :return: """ next_key = self.get_next_key(entry) if hasattr(obj, "set_key"): obj.set_key(next_key) return self.add(event, entry, {next_key: obj}) def add_unique(self, event: Event, entry, obj): """Add an entry and make sure it's unique""" event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if entry not in state.data: state.data[entry] = {obj} else: state.data[entry].add(obj) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def set(self, event: Event, entry, obj): """ Add or replace an element :param event: :param entry: :param obj: :return: """ event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() obj_key = str(obj.get_key()) if hasattr(obj, "get_key") else None state.update(entry, obj if obj_key is None else {obj_key: obj}, append=False) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def modify(self, event: Event, entry, key, obj): """ Updates an existing element when element are saved by key :param event: :param entry: :param key: key of the object to update :param obj: new data :return: """ event_digest = self.save_event(event) snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() if key is None: state.data[entry] = obj else: state.update(entry, {key: obj}) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def list(self, entry, filter=None): """ Lists elements of entry 'entry' :param entry: name of the entry to list :param filter: filter to use :return: list of elements """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return [] elements = state.data[entry] if isinstance(elements, dict): # manage when elements have a key filter_to_use = (lambda k, o: True) if filter is None else filter for key, element in elements.items(): if filter_to_use(key, element): yield element else: # manage when no key is defined for the elements if not isinstance(elements, list) and not isinstance(elements, set): elements = [elements] filter_to_use = (lambda o: True) if filter is None else filter for element in elements: if filter_to_use(element): yield element def remove(self, event: Event, entry, filter=None): """ Removes elements under the entry 'entry' :param event: event that triggers the deletion :param entry: :param filter: filter to use :return: new sha256 of the state TODO: Remove by key """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) event_digest = self.save_event(event) state.parents = [] if snapshot is None else [snapshot] state.events = [event_digest] state.date = datetime.now() state.remove(entry, filter) new_snapshot = self.save_state(state) self.set_snapshot(new_snapshot) return new_snapshot def get(self, entry, key=None): """ Retrieve an element by its key :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: raise IndexError(entry) if key is not None and key not in state.data[entry]: raise IndexError(f"{entry}.{key}") return state.data[entry] if key is None else state.data[entry][key] def get_safe(self, entry, key=None): """ Retrieve an element by its key. Return None if the element does not exist :param entry: :param key: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) if entry not in state.data: return None if key is not None and key not in state.data[entry]: return None return state.data[entry] if key is None else state.data[entry][key] def exists(self, entry): """ Returns true if the entry is defined :param entry: :return: """ snapshot = self.get_snapshot() state = self.load_state(snapshot) return entry in state.data def save_event(self, event: Event): """ return an event, given its digest :param event: :return: digest of the event """ digest = event.get_digest() target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(self.serializer.serialize(event, None).read()) return digest def load_event(self, digest): """ return an event, given its digest :param digest: :return: """ target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest) with open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_state(self, state: State): digest = state.get_digest() target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(self.serializer.serialize(state, None).read()) return digest def load_state(self, digest): if digest is None: return State() target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest) with open(target_path, "rb") as f: return self.serializer.deserialize(f, None) def save_obj(self, obj): if hasattr(obj, "key") and hasattr(obj, "key_name") and obj.key is None: obj.key = self.get_next_key(obj.key_name) digest = obj.get_digest() target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest) if path.exists(target_path): return digest if not path.exists(path.dirname(target_path)): os.makedirs(path.dirname(target_path)) with open(target_path, "wb") as f: f.write(self.serializer.serialize(obj, SerializerContext("kodjo", digest)).read()) return digest def load_obj(self, digest): if digest is None: return State() target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest) with open(target_path, "rb") as f: return self.serializer.deserialize(f, SerializerContext("kodjo", digest)) def get_cache_params(self, category, key): digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest() cache_path = path.join(self.root, SheerkaDataProvider.CacheFolder, digest[:24], digest) return digest, cache_path def add_to_cache(self, category, key, obj, update=False): """ Save obj in the internal cache system :param category: :param key: :param obj: :param update: :return: """ digest, cache_path = self.get_cache_params(category, key) if path.exists(cache_path) and not update: return digest if not path.exists(path.dirname(cache_path)): os.makedirs(path.dirname(cache_path)) with open(cache_path, "wb") as f: f.write(zlib.compress(obj.encode("utf-8"), 9)) return digest def load_from_cache(self, category, key): """ Reload a compress object from the cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if not path.exists(cache_path): raise IndexError(f"{category}.{key}") with open(cache_path, "rb") as f: return zlib.decompress(f.read()).decode("utf-8") def remove_from_cache(self, category, key): """ :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) if path.exists(cache_path): os.remove(cache_path) return digest def in_cache(self, category, key): """ Returns true if the key is in cache :param category: :param key: :return: """ digest, cache_path = self.get_cache_params(category, key) return path.exists(cache_path) def get_snapshot(self): head_file = path.join(self.root, SheerkaDataProvider.HeadFile) if not path.exists(head_file): return None with open(head_file, "r") as f: return f.read() def set_snapshot(self, digest): head_file = path.join(self.root, SheerkaDataProvider.HeadFile) with open(head_file, "w") as f: return f.write(digest) def load_keys(self): keys_file = path.join(self.root, SheerkaDataProvider.KeysFile) if not path.exists(keys_file): keys = {} else: with open(keys_file, "r") as f: keys = json.load(f) return keys def save_keys(self, keys): keys_file = path.join(self.root, SheerkaDataProvider.KeysFile) with open(keys_file, "w") as f: json.dump(keys, f) def get_next_key(self, entry): keys = self.load_keys() next_key = keys.get(entry, 0) + 1 keys[entry] = next_key self.save_keys(keys) return str(next_key) def set_key(self, entry, value): keys = self.load_keys() keys[entry] = value self.save_keys(keys) return str(value)