Refactored sdp serializers
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
# How to serialize ?
|
||||
|
||||
- 1 byte : type of object code
|
||||
- int : version of the encoder
|
||||
- data : can be the json representation of the object
|
||||
@@ -0,0 +1,510 @@
|
||||
from os import path
|
||||
import os
|
||||
from datetime import datetime, date
|
||||
import hashlib
|
||||
import json
|
||||
import zlib
|
||||
from sdp.sheerkaSerializer import Serializer
|
||||
|
||||
|
||||
def json_default_converter(o):
|
||||
"""
|
||||
Default formatter for json
|
||||
It's used when the json serializer does not know
|
||||
how to serialise a type
|
||||
:param o:
|
||||
:return:
|
||||
"""
|
||||
if isinstance(o, (date, datetime)):
|
||||
return o.isoformat()
|
||||
|
||||
|
||||
class Event(object):
|
||||
"""
|
||||
Class that represents something that modifies the state of the system
|
||||
"""
|
||||
|
||||
def __init__(self, message="", user="kodjo", date=datetime.now()):
|
||||
self.version = 1
|
||||
self.user = user
|
||||
self.date = date
|
||||
self.message = message
|
||||
|
||||
def get_digest(self):
|
||||
"""
|
||||
Returns the digest of the event
|
||||
:return: hexa form of the sha256
|
||||
"""
|
||||
if not isinstance(self.message, str):
|
||||
raise NotImplementedError
|
||||
|
||||
return hashlib.sha256(f"{self.user}{self.date}{self.message}".encode("utf-8")).hexdigest()
|
||||
|
||||
def to_json(self):
|
||||
return json.dumps(self.__dict__, default=json_default_converter)
|
||||
|
||||
def from_json(self, json_message):
|
||||
self.user = json_message["user"]
|
||||
self.date = datetime.fromisoformat(json_message["date"])
|
||||
self.message = json_message["message"]
|
||||
|
||||
|
||||
class State:
|
||||
"""
|
||||
Class that represents the state of the system (dictionary of all known entries)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.version = 1
|
||||
self.date = None
|
||||
self.parents = []
|
||||
self.events = []
|
||||
self.data = {}
|
||||
|
||||
def update(self, entry, obj, append=True):
|
||||
obj_to_use = {str(obj.get_key()): obj} if hasattr(obj, "get_key") else obj
|
||||
|
||||
if entry not in self.data:
|
||||
self.data[entry] = obj_to_use
|
||||
elif isinstance(obj_to_use, dict):
|
||||
if append:
|
||||
self.data[entry].update(obj_to_use)
|
||||
else:
|
||||
self.data[entry] = obj_to_use
|
||||
elif isinstance(self.data[entry], list):
|
||||
if append:
|
||||
self.data[entry].append(obj_to_use)
|
||||
else:
|
||||
self.data[entry] = obj_to_use
|
||||
else:
|
||||
if append:
|
||||
self.data[entry] = [self.data[entry], obj_to_use]
|
||||
else:
|
||||
self.data[entry] = obj_to_use
|
||||
|
||||
def remove(self, entry, filter):
|
||||
if filter is None:
|
||||
del (self.data[entry])
|
||||
|
||||
elif isinstance(self.data[entry], dict):
|
||||
keys_to_remove = []
|
||||
for key, element in self.data[entry].items():
|
||||
if filter(key, element):
|
||||
keys_to_remove.append(key)
|
||||
for key in keys_to_remove:
|
||||
del (self.data[entry][key])
|
||||
|
||||
elif not isinstance(self.data[entry], list):
|
||||
if filter(self.data[entry]):
|
||||
del (self.data[entry])
|
||||
|
||||
else:
|
||||
for element in self.data[entry]:
|
||||
if filter(element):
|
||||
self.data[entry].remove(element)
|
||||
|
||||
def get_digest(self):
|
||||
as_json = json.dumps(self.__dict__, default=json_default_converter)
|
||||
return hashlib.sha256(as_json.encode("utf-8")).hexdigest()
|
||||
|
||||
def contains(self, entry, key):
|
||||
if entry not in self.data:
|
||||
return False
|
||||
if not isinstance(self.data[entry], dict):
|
||||
return False
|
||||
return key in self.data[entry]
|
||||
|
||||
|
||||
class SheerkaDataProvider:
|
||||
"""Manages the state of the system"""
|
||||
|
||||
EventFolder = "events"
|
||||
StateFolder = "state"
|
||||
CacheFolder = "cache"
|
||||
HeadFile = "HEAD"
|
||||
KeysFile = "keys"
|
||||
|
||||
def __init__(self, root=None):
|
||||
|
||||
self.root = path.abspath(path.join(path.expanduser("~"), ".sheerka")) \
|
||||
if root is None \
|
||||
else path.abspath(root)
|
||||
|
||||
if not path.exists(self.root):
|
||||
os.makedirs(self.root)
|
||||
|
||||
self.serializer = Serializer()
|
||||
|
||||
def add(self, event: Event, entry, obj):
|
||||
"""
|
||||
Adds obj to the entry 'entry'
|
||||
:param event: events that triggers the update of the state
|
||||
:param entry: entry of the state to update
|
||||
:param obj: obj to insert or add
|
||||
:return: new sha256 of the state
|
||||
"""
|
||||
event_digest = self.save_event(event)
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
# check uniqueness, cannot add the same key twice
|
||||
obj_key = str(obj.get_key()) if hasattr(obj, "get_key") else None
|
||||
if state.contains(entry, obj_key):
|
||||
raise IndexError(f"{entry}.{obj_key}")
|
||||
elif isinstance(obj, dict):
|
||||
for k in obj:
|
||||
if state.contains(entry, k):
|
||||
raise IndexError(f"{entry}.{k}")
|
||||
|
||||
state.parents = [] if snapshot is None else [snapshot]
|
||||
state.events = [event_digest]
|
||||
state.date = datetime.now()
|
||||
state.update(entry, obj if obj_key is None else {obj_key: obj})
|
||||
|
||||
new_snapshot = self.save_state(state)
|
||||
self.set_snapshot(new_snapshot)
|
||||
return new_snapshot
|
||||
|
||||
def add_with_auto_key(self, event: Event, entry, obj):
|
||||
"""
|
||||
Add obj to entry. An autogenerated key created for obj
|
||||
:param event:
|
||||
:param entry:
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
next_key = self.get_next_key(entry)
|
||||
if hasattr(obj, "set_key"):
|
||||
obj.set_key(next_key)
|
||||
return self.add(event, entry, {next_key: obj})
|
||||
|
||||
def add_unique(self, event: Event, entry, obj):
|
||||
"""Add an entry and make sure it's unique"""
|
||||
event_digest = self.save_event(event)
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
state.parents = [] if snapshot is None else [snapshot]
|
||||
state.events = [event_digest]
|
||||
state.date = datetime.now()
|
||||
if entry not in state.data:
|
||||
state.data[entry] = {obj}
|
||||
else:
|
||||
state.data[entry].add(obj)
|
||||
|
||||
new_snapshot = self.save_state(state)
|
||||
self.set_snapshot(new_snapshot)
|
||||
return new_snapshot
|
||||
|
||||
def set(self, event: Event, entry, obj):
|
||||
"""
|
||||
Add or replace an element
|
||||
:param event:
|
||||
:param entry:
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
event_digest = self.save_event(event)
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
state.parents = [] if snapshot is None else [snapshot]
|
||||
state.events = [event_digest]
|
||||
state.date = datetime.now()
|
||||
obj_key = str(obj.get_key()) if hasattr(obj, "get_key") else None
|
||||
state.update(entry, obj if obj_key is None else {obj_key: obj}, append=False)
|
||||
|
||||
new_snapshot = self.save_state(state)
|
||||
self.set_snapshot(new_snapshot)
|
||||
return new_snapshot
|
||||
|
||||
def modify(self, event: Event, entry, key, obj):
|
||||
"""
|
||||
Updates an existing element when element are saved by key
|
||||
:param event:
|
||||
:param entry:
|
||||
:param key: key of the object to update
|
||||
:param obj: new data
|
||||
:return:
|
||||
"""
|
||||
event_digest = self.save_event(event)
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
if entry not in state.data:
|
||||
raise IndexError(entry)
|
||||
|
||||
if key is not None and key not in state.data[entry]:
|
||||
raise IndexError(f"{entry}.{key}")
|
||||
|
||||
state.parents = [] if snapshot is None else [snapshot]
|
||||
state.events = [event_digest]
|
||||
state.date = datetime.now()
|
||||
if key is None:
|
||||
state.data[entry] = obj
|
||||
else:
|
||||
state.update(entry, {key: obj})
|
||||
|
||||
new_snapshot = self.save_state(state)
|
||||
self.set_snapshot(new_snapshot)
|
||||
return new_snapshot
|
||||
|
||||
def list(self, entry, filter=None):
|
||||
"""
|
||||
Lists elements of entry 'entry'
|
||||
:param entry: name of the entry to list
|
||||
:param filter: filter to use
|
||||
:return: list of elements
|
||||
"""
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
if entry not in state.data:
|
||||
return []
|
||||
|
||||
elements = state.data[entry]
|
||||
|
||||
if isinstance(elements, dict):
|
||||
# manage when elements have a key
|
||||
filter_to_use = (lambda k, o: True) if filter is None else filter
|
||||
for key, element in elements.items():
|
||||
if filter_to_use(key, element):
|
||||
yield element
|
||||
else:
|
||||
# manage when no key is defined for the elements
|
||||
if not isinstance(elements, list) and not isinstance(elements, set):
|
||||
elements = [elements]
|
||||
|
||||
filter_to_use = (lambda o: True) if filter is None else filter
|
||||
for element in elements:
|
||||
if filter_to_use(element):
|
||||
yield element
|
||||
|
||||
def remove(self, event: Event, entry, filter=None):
|
||||
"""
|
||||
Removes elements under the entry 'entry'
|
||||
:param event: event that triggers the deletion
|
||||
:param entry:
|
||||
:param filter: filter to use
|
||||
:return: new sha256 of the state
|
||||
TODO: Remove by key
|
||||
"""
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
if entry not in state.data:
|
||||
raise IndexError(entry)
|
||||
|
||||
event_digest = self.save_event(event)
|
||||
|
||||
state.parents = [] if snapshot is None else [snapshot]
|
||||
state.events = [event_digest]
|
||||
state.date = datetime.now()
|
||||
state.remove(entry, filter)
|
||||
|
||||
new_snapshot = self.save_state(state)
|
||||
self.set_snapshot(new_snapshot)
|
||||
return new_snapshot
|
||||
|
||||
def get(self, entry, key=None):
|
||||
"""
|
||||
Retrieve an element by its key
|
||||
:param entry:
|
||||
:param key:
|
||||
:return:
|
||||
"""
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
if entry not in state.data:
|
||||
raise IndexError(entry)
|
||||
|
||||
if key is not None and key not in state.data[entry]:
|
||||
raise IndexError(f"{entry}.{key}")
|
||||
|
||||
return state.data[entry] if key is None else state.data[entry][key]
|
||||
|
||||
def get_safe(self, entry, key=None):
|
||||
"""
|
||||
Retrieve an element by its key. Return None if the element does not exist
|
||||
:param entry:
|
||||
:param key:
|
||||
:return:
|
||||
"""
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
if entry not in state.data:
|
||||
return None
|
||||
|
||||
if key is not None and key not in state.data[entry]:
|
||||
return None
|
||||
|
||||
return state.data[entry] if key is None else state.data[entry][key]
|
||||
|
||||
def exists(self, entry):
|
||||
"""
|
||||
Returns true if the entry is defined
|
||||
:param entry:
|
||||
:return:
|
||||
"""
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
return entry in state.data
|
||||
|
||||
def save_event(self, event: Event):
|
||||
"""
|
||||
return an event, given its digest
|
||||
:param event:
|
||||
:return: digest of the event
|
||||
"""
|
||||
digest = event.get_digest()
|
||||
target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest)
|
||||
if path.exists(target_path):
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(target_path)):
|
||||
os.makedirs(path.dirname(target_path))
|
||||
|
||||
with open(target_path, "wb") as f:
|
||||
f.write(self.serializer.serialize(event).read())
|
||||
|
||||
return digest
|
||||
|
||||
def load_event(self, digest):
|
||||
"""
|
||||
return an event, given its digest
|
||||
:param digest:
|
||||
:return:
|
||||
"""
|
||||
target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest)
|
||||
with open(target_path, "rb") as f:
|
||||
return self.serializer.deserialize(f)
|
||||
|
||||
def save_state(self, state: State):
|
||||
digest = state.get_digest()
|
||||
target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest)
|
||||
if path.exists(target_path):
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(target_path)):
|
||||
os.makedirs(path.dirname(target_path))
|
||||
|
||||
with open(target_path, "wb") as f:
|
||||
f.write(self.serializer.serialize(state).read())
|
||||
|
||||
return digest
|
||||
|
||||
def load_state(self, digest):
|
||||
if digest is None:
|
||||
return State()
|
||||
|
||||
target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest)
|
||||
with open(target_path, "rb") as f:
|
||||
return self.serializer.deserialize(f)
|
||||
|
||||
def get_cache_params(self, category, key):
|
||||
digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest()
|
||||
cache_path = path.join(self.root, SheerkaDataProvider.CacheFolder, digest[:24], digest)
|
||||
return digest, cache_path
|
||||
|
||||
def add_to_cache(self, category, key, obj, update=False):
|
||||
"""
|
||||
Save obj in the internal cache system
|
||||
:param category:
|
||||
:param key:
|
||||
:param obj:
|
||||
:param update:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
|
||||
if path.exists(cache_path) and not update:
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(cache_path)):
|
||||
os.makedirs(path.dirname(cache_path))
|
||||
|
||||
with open(cache_path, "wb") as f:
|
||||
f.write(zlib.compress(obj.encode("utf-8"), 9))
|
||||
|
||||
return digest
|
||||
|
||||
def load_from_cache(self, category, key):
|
||||
"""
|
||||
Reload a compress object from the cache
|
||||
:param category:
|
||||
:param key:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
|
||||
if not path.exists(cache_path):
|
||||
raise IndexError(f"{category}.{key}")
|
||||
|
||||
with open(cache_path, "rb") as f:
|
||||
return zlib.decompress(f.read()).decode("utf-8")
|
||||
|
||||
def remove_from_cache(self, category, key):
|
||||
"""
|
||||
|
||||
:param category:
|
||||
:param key:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
if path.exists(cache_path):
|
||||
os.remove(cache_path)
|
||||
|
||||
return digest
|
||||
|
||||
def in_cache(self, category, key):
|
||||
"""
|
||||
Returns true if the key is in cache
|
||||
:param category:
|
||||
:param key:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
return path.exists(cache_path)
|
||||
|
||||
def get_snapshot(self):
|
||||
head_file = path.join(self.root, SheerkaDataProvider.HeadFile)
|
||||
if not path.exists(head_file):
|
||||
return None
|
||||
with open(head_file, "r") as f:
|
||||
return f.read()
|
||||
|
||||
def set_snapshot(self, digest):
|
||||
head_file = path.join(self.root, SheerkaDataProvider.HeadFile)
|
||||
with open(head_file, "w") as f:
|
||||
return f.write(digest)
|
||||
|
||||
def load_keys(self):
|
||||
keys_file = path.join(self.root, SheerkaDataProvider.KeysFile)
|
||||
if not path.exists(keys_file):
|
||||
keys = {}
|
||||
else:
|
||||
with open(keys_file, "r") as f:
|
||||
keys = json.load(f)
|
||||
return keys
|
||||
|
||||
def save_keys(self, keys):
|
||||
keys_file = path.join(self.root, SheerkaDataProvider.KeysFile)
|
||||
with open(keys_file, "w") as f:
|
||||
json.dump(keys, f)
|
||||
|
||||
def get_next_key(self, entry):
|
||||
keys = self.load_keys()
|
||||
|
||||
next_key = keys.get(entry, 0) + 1
|
||||
keys[entry] = next_key
|
||||
|
||||
self.save_keys(keys)
|
||||
return str(next_key)
|
||||
|
||||
def set_key(self, entry, value):
|
||||
keys = self.load_keys()
|
||||
keys[entry] = value
|
||||
self.save_keys(keys)
|
||||
return str(value)
|
||||
|
||||
@@ -0,0 +1,167 @@
|
||||
import json
|
||||
import pickle
|
||||
import datetime
|
||||
import struct
|
||||
import io
|
||||
|
||||
|
||||
def json_default_converter(o):
|
||||
"""
|
||||
Default formatter for json
|
||||
It's used when the json serializer does not know
|
||||
how to serialise a type
|
||||
:param o:
|
||||
:return:
|
||||
"""
|
||||
if isinstance(o, (datetime.date, datetime.datetime)):
|
||||
return o.isoformat()
|
||||
|
||||
|
||||
class Serializer:
|
||||
HEADER_FORMAT = "cH"
|
||||
|
||||
def __init__(self):
|
||||
self._cache = []
|
||||
|
||||
# add builtin serializers
|
||||
self._cache.append(EventSerializer())
|
||||
self._cache.append(PickleSerializer())
|
||||
|
||||
def register(self, serializer):
|
||||
"""
|
||||
Register the list of all know serializers
|
||||
:param serializer:
|
||||
:return:
|
||||
"""
|
||||
self._cache.append(serializer)
|
||||
|
||||
def serialize(self, obj):
|
||||
"""
|
||||
Get the stream representation of an object
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
serializers = [s for s in self._cache if s.match(obj)]
|
||||
|
||||
if not serializers:
|
||||
raise TypeError(f"Don't know how to serialize {type(obj)}")
|
||||
|
||||
serializer = serializers[0]
|
||||
|
||||
stream = io.BytesIO()
|
||||
header = struct.pack(Serializer.HEADER_FORMAT, bytes(serializer.name, "utf-8"), serializer.version)
|
||||
stream.write(header)
|
||||
|
||||
return serializer.dump(stream, obj)
|
||||
|
||||
def deserialize(self, stream):
|
||||
"""
|
||||
Loads an object from its stream representation
|
||||
:param stream:
|
||||
:return:
|
||||
"""
|
||||
header = struct.unpack(Serializer.HEADER_FORMAT, stream.read(4))
|
||||
serializers = [s for s in self._cache if s.name == header[0].decode("utf-8") and s.version == header[1]]
|
||||
|
||||
if not serializers:
|
||||
raise TypeError(f"Don't know how serializer name={header[0]}, version={header[1]}")
|
||||
|
||||
serializer = serializers[0]
|
||||
return serializer.load(stream)
|
||||
|
||||
|
||||
class BaseSerializer:
|
||||
|
||||
def __init__(self, name, version):
|
||||
"""
|
||||
Create a serializer, given a name and a version
|
||||
:param name:
|
||||
:param version:
|
||||
:return:
|
||||
"""
|
||||
self.name = name
|
||||
self.version = version
|
||||
|
||||
@staticmethod
|
||||
def match(obj):
|
||||
"""
|
||||
Returns true if self can serialize obj
|
||||
:param obj:
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
def dump(self, stream, obj):
|
||||
"""
|
||||
Returns the byte representation of how the object should be serialized
|
||||
|
||||
:param stream: to write to
|
||||
:param obj:
|
||||
:return: stream of bytes
|
||||
"""
|
||||
pass
|
||||
|
||||
def load(self, stream):
|
||||
"""
|
||||
From a stream of bytes, create the object
|
||||
:param stream:
|
||||
:return: object
|
||||
"""
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get_class(kls):
|
||||
parts = kls.split('.')
|
||||
module = ".".join(parts[:-1])
|
||||
m = __import__(module)
|
||||
for comp in parts[1:]:
|
||||
m = getattr(m, comp)
|
||||
return m
|
||||
|
||||
@staticmethod
|
||||
def get_full_qualified_name(obj):
|
||||
module = obj.__class__.__module__
|
||||
if module is None or module == str.__class__.__module__:
|
||||
return obj.__class__.__name__ # Avoid reporting __builtin__
|
||||
else:
|
||||
return module + '.' + obj.__class__.__name__
|
||||
|
||||
|
||||
class EventSerializer(BaseSerializer):
|
||||
@staticmethod
|
||||
def match(obj):
|
||||
return BaseSerializer.get_full_qualified_name(obj) == "sdp.sheerkaDataProvider.Event"
|
||||
|
||||
def __init__(self):
|
||||
BaseSerializer.__init__(self, "E", 1)
|
||||
|
||||
def dump(self, stream, obj):
|
||||
stream.write(obj.to_json().encode("utf-8"))
|
||||
stream.seek(0)
|
||||
return stream
|
||||
|
||||
def load(self, stream):
|
||||
json_stream = stream.read().decode("utf-8")
|
||||
json_message = json.loads(json_stream)
|
||||
event = BaseSerializer.get_class("sdp.sheerkaDataProvider.Event")()
|
||||
event.from_json(json_message)
|
||||
return event
|
||||
|
||||
|
||||
class PickleSerializer(BaseSerializer):
|
||||
@staticmethod
|
||||
def match(obj):
|
||||
return BaseSerializer.get_full_qualified_name(obj) == "sdp.sheerkaDataProvider.State"
|
||||
|
||||
def __init__(self):
|
||||
BaseSerializer.__init__(self, "P", 1)
|
||||
|
||||
def dump(self, stream, obj):
|
||||
stream.write(pickle.dumps(obj))
|
||||
stream.seek(0)
|
||||
return stream
|
||||
|
||||
def load(self, stream):
|
||||
return pickle.loads(stream.read())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user