Files
Sheerka-Old/sdp/sheerkaSerializer.py
T

225 lines
6.3 KiB
Python

import json
import pickle
import datetime
import struct
import io
from dataclasses import dataclass
import logging
import core.utils
from core.concept import Concept
log = logging.getLogger(__name__)
def json_default_converter(o):
"""
Default formatter for json
It's used when the json serializer does not know
how to serialise a type
:param o:
:return:
"""
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
@dataclass()
class SerializerContext:
user_name: str = None
origin: str = None
class Serializer:
HEADER_FORMAT = "cH"
USERNAME = "user_name" # key to store user that as committed the snapshot
MODIFICATION_DATE = "modification_date" #
PARENTS = "parents"
ORIGIN = "origin"
HISTORY = "##history##"
def __init__(self):
log.debug("Initializing serializers")
self._cache = []
# add builtin serializers
self.register(EventSerializer())
self.register(StateSerializer())
self.register(ConceptSerializer())
def register(self, serializer):
"""
Register the list of all know serializers
:param serializer:
:return:
"""
log.debug(f"Adding serializer {serializer}")
self._cache.append(serializer)
def serialize(self, obj, context):
"""
Get the stream representation of an object
:param context:
:param obj:
:return:
"""
serializers = [s for s in self._cache if s.matches(obj)]
if not serializers:
raise TypeError(f"Don't know how to serialize {type(obj)}")
serializer = serializers[0]
stream = io.BytesIO()
header = struct.pack(Serializer.HEADER_FORMAT, bytes(serializer.name, "utf-8"), serializer.version)
stream.write(header)
return serializer.dump(stream, obj, context)
def deserialize(self, stream, context):
"""
Loads an object from its stream representation
:param context:
:param stream:
:return:
"""
header = struct.unpack(Serializer.HEADER_FORMAT, stream.read(4))
serializers = [s for s in self._cache if s.name == header[0].decode("utf-8") and s.version == header[1]]
if not serializers:
raise TypeError(f"Don't know how serializer name={header[0]}, version={header[1]}")
serializer = serializers[0]
return serializer.load(stream, context)
class BaseSerializer:
def __init__(self, name, version):
"""
Create a serializer, given a name and a version
:param name:
:param version:
:return:
"""
self.name = name
self.version = version
def matches(self, obj):
"""
Returns true if self can serialize obj
:param obj:
:return:
"""
pass
def dump(self, stream, obj, context):
"""
Returns the byte representation of how the object should be serialized
:param stream: to write to
:param obj: obj to serialize
:param context: additional info needed to dump
:return: stream of bytes
"""
pass
def load(self, stream, context):
"""
From a stream of bytes, create the object
:param stream:
:param context: additional info needed to load
:return: object
"""
pass
def __repr__(self):
return self.__class__.__name__ + ' (' + self.name + ", version=" + str(self.version) + ")"
class EventSerializer(BaseSerializer):
def __init__(self):
BaseSerializer.__init__(self, "E", 1)
def matches(self, obj):
return core.utils.get_full_qualified_name(obj) == "sdp.sheerkaDataProvider.Event"
def dump(self, stream, obj, context):
stream.write(json.dumps(obj.to_dict(), default=json_default_converter).encode("utf-8"))
stream.seek(0)
return stream
def load(self, stream, context):
json_stream = stream.read().decode("utf-8")
as_dict = json.loads(json_stream)
event = core.utils.get_class("sdp.sheerkaDataProvider.Event")()
event.from_dict(as_dict)
return event
class ObjectSerializer(BaseSerializer):
def __init__(self, fully_qualified_name, name="O", version=1):
BaseSerializer.__init__(self, name, version)
self.fully_qualified_name = fully_qualified_name
def matches(self, obj):
return core.utils.get_full_qualified_name(obj) == self.fully_qualified_name
def dump(self, stream, obj, context):
as_json = obj.to_dict()
as_json.update({
Serializer.HISTORY: {
Serializer.USERNAME: context.user_name,
Serializer.MODIFICATION_DATE: datetime.datetime.now().isoformat(),
Serializer.PARENTS: [getattr(obj, Serializer.ORIGIN)] if hasattr(obj, Serializer.ORIGIN) else []
}})
stream.write(json.dumps(as_json, default=json_default_converter).encode("utf-8"))
stream.seek(0)
return stream
def load(self, stream, context):
json_stream = stream.read().decode("utf-8")
json_message = json.loads(json_stream)
obj = core.utils.get_class(self.fully_qualified_name)()
obj.from_dict(json_message)
setattr(obj, Serializer.HISTORY, json_message[Serializer.HISTORY])
return obj
class PickleSerializer(BaseSerializer):
def __init__(self, predicate, name="P", version=1):
BaseSerializer.__init__(self, name, version)
self.predicate = predicate
def matches(self, obj):
return self.predicate(obj)
def dump(self, stream, obj, context):
stream.write(pickle.dumps(obj))
stream.seek(0)
return stream
def load(self, stream, context):
return pickle.loads(stream.read())
class StateSerializer(PickleSerializer):
def __init__(self, ):
PickleSerializer.__init__(self, lambda obj: core.utils.get_full_qualified_name(
obj) == "sdp.sheerkaDataProvider.State", "S", 1)
class ConceptSerializer(ObjectSerializer):
def __init__(self):
ObjectSerializer.__init__(self, "core.concept.Concept", "C", 1)
def matches(self, obj):
return isinstance(obj, Concept)
#
# class SheerkaSerializer(ObjectSerializer):
# def __init__(self):
# ObjectSerializer.__init__(self, "core.sheerka.Sheerka", "C", 1)