202 lines
5.3 KiB
Python
202 lines
5.3 KiB
Python
import copy
|
|
|
|
from core.handlers import handlers
|
|
from core.utils import has_tag, is_dictionary, is_list, is_object, is_set, is_tuple, is_primitive, importable_name, \
|
|
get_class, get_full_qualified_name, is_enum
|
|
|
|
TAG_ID = "__id__"
|
|
TAG_OBJECT = "__object__"
|
|
TAG_TUPLE = "__tuple__"
|
|
TAG_SET = "__set__"
|
|
TAG_REF = "__ref__"
|
|
TAG_ENUM = "__enum__"
|
|
|
|
|
|
class Serializer:
|
|
def __init__(self, ref_helper=None):
|
|
self.ref_helper = ref_helper
|
|
|
|
self.ids = {}
|
|
self.objs = []
|
|
self.id_count = 0
|
|
|
|
def serialize(self, obj, use_refs=None):
|
|
"""
|
|
From object to dictionary
|
|
:param obj:
|
|
:param use_refs: Sometimes it easier / quicker to use pickle !
|
|
:return:
|
|
"""
|
|
if use_refs:
|
|
use_refs = set("root." + path for path in use_refs)
|
|
|
|
return self._serialize(obj, use_refs or set(), "root")
|
|
|
|
def deserialize(self, obj: dict):
|
|
"""
|
|
From dictionary to object (or primitive)
|
|
:param obj:
|
|
:return:
|
|
"""
|
|
if has_tag(obj, TAG_REF):
|
|
return self.ref_helper.load_ref(obj[TAG_REF])
|
|
|
|
if has_tag(obj, TAG_ID):
|
|
return self._restore_id(obj)
|
|
|
|
if has_tag(obj, TAG_TUPLE):
|
|
return tuple([self.deserialize(v) for v in obj[TAG_TUPLE]])
|
|
|
|
if has_tag(obj, TAG_SET):
|
|
return set([self.deserialize(v) for v in obj[TAG_SET]])
|
|
|
|
if has_tag(obj, TAG_ENUM):
|
|
return self._deserialize_enum(obj)
|
|
|
|
if has_tag(obj, TAG_OBJECT):
|
|
return self._deserialize_obj_instance(obj)
|
|
|
|
if (handler := handlers.get_handler(obj)) is not None:
|
|
return handler.deserialize(obj)
|
|
|
|
if is_list(obj):
|
|
return [self.deserialize(v) for v in obj]
|
|
|
|
if is_dictionary(obj):
|
|
return {k: self.deserialize(v) for k, v in obj.items()}
|
|
|
|
return obj
|
|
|
|
def _serialize(self, obj, use_refs: set | None, path):
|
|
if use_refs is not None and path in use_refs:
|
|
digest = self.ref_helper.save_ref(obj)
|
|
return {TAG_REF: digest}
|
|
|
|
if is_primitive(obj):
|
|
return obj
|
|
|
|
if is_tuple(obj):
|
|
return {TAG_TUPLE: [self._serialize(v, use_refs, path) for v in obj]}
|
|
|
|
if is_set(obj):
|
|
return {TAG_SET: [self._serialize(v, use_refs, path) for v in obj]}
|
|
|
|
if is_list(obj):
|
|
return [self._serialize(v, use_refs, path) for v in obj]
|
|
|
|
if is_dictionary(obj):
|
|
return {k: self._serialize(v, use_refs, path) for k, v in obj.items()}
|
|
|
|
if is_enum(obj):
|
|
return self._serialize_enum(obj, use_refs, path)
|
|
|
|
if is_object(obj):
|
|
return self._serialize_obj_instance(obj, use_refs, path)
|
|
|
|
raise Exception(f"Cannot serialize '{obj}'")
|
|
|
|
def _serialize_enum(self, obj, use_refs: set | None, path):
|
|
# check if the object was already seen
|
|
if (seen := self._check_already_seen(obj)) is not None:
|
|
return seen
|
|
|
|
data = {}
|
|
class_name = get_full_qualified_name(obj)
|
|
data[TAG_ENUM] = class_name + "." + obj.name
|
|
return data
|
|
|
|
def _serialize_obj_instance(self, obj, use_refs: set | None, path):
|
|
# check if the object was already seen
|
|
if (seen := self._check_already_seen(obj)) is not None:
|
|
return seen
|
|
|
|
# try to manage use_refs
|
|
current_obj_use_refs = getattr(obj, "use_refs")() if hasattr(obj, "use_refs") else None
|
|
if current_obj_use_refs:
|
|
use_refs.update(f"{path}.{sub_path}" for sub_path in current_obj_use_refs)
|
|
|
|
if (handler := handlers.get_handler(obj)) is not None:
|
|
return handler.serialize(obj)
|
|
|
|
# flatten
|
|
data = {}
|
|
cls = obj.__class__ if hasattr(obj, '__class__') else type(obj)
|
|
class_name = importable_name(cls)
|
|
data[TAG_OBJECT] = class_name
|
|
|
|
if hasattr(obj, "__dict__"):
|
|
for k, v in obj.__dict__.items():
|
|
data[k] = self._serialize(v, use_refs, f"{path}.{k}")
|
|
|
|
return data
|
|
|
|
def _check_already_seen(self, obj):
|
|
_id = self._exist(obj)
|
|
if _id is not None:
|
|
return {TAG_ID: _id}
|
|
|
|
# else:
|
|
self.ids[id(obj)] = self.id_count
|
|
self.objs.append(obj)
|
|
self.id_count = self.id_count + 1
|
|
|
|
return None
|
|
|
|
def _deserialize_enum(self, obj):
|
|
cls_name, enum_name = obj[TAG_ENUM].rsplit(".", 1)
|
|
cls = get_class(cls_name)
|
|
obj = getattr(cls, enum_name)
|
|
self.objs.append(obj)
|
|
return obj
|
|
|
|
def _deserialize_obj_instance(self, obj):
|
|
|
|
cls = get_class(obj[TAG_OBJECT])
|
|
instance = cls.__new__(cls)
|
|
self.objs.append(instance)
|
|
|
|
for k, v in obj.items():
|
|
value = self.deserialize(v)
|
|
setattr(instance, k, value)
|
|
|
|
return instance
|
|
|
|
def _restore_id(self, obj):
|
|
try:
|
|
return self.objs[obj[TAG_ID]]
|
|
except IndexError:
|
|
pass
|
|
|
|
def _exist(self, obj):
|
|
try:
|
|
v = self.ids[id(obj)]
|
|
return v
|
|
except KeyError:
|
|
return None
|
|
|
|
|
|
class DebugSerializer(Serializer):
|
|
def __init__(self, ref_helper=None):
|
|
super().__init__(ref_helper)
|
|
|
|
def _deserialize_obj_instance(self, obj):
|
|
data = {TAG_OBJECT: obj[TAG_OBJECT]}
|
|
self.objs.append(data)
|
|
|
|
for k, v in obj.items():
|
|
value = self.deserialize(v)
|
|
data[k] = value
|
|
|
|
return data
|
|
|
|
def _deserialize_enum(self, obj):
|
|
cls_name, enum_name = obj[TAG_ENUM].rsplit(".", 1)
|
|
self.objs.append(enum_name)
|
|
return enum_name
|
|
|
|
def _restore_id(self, obj):
|
|
try:
|
|
return copy.deepcopy(self.objs[obj[TAG_ID]])
|
|
except IndexError:
|
|
pass
|