Refactored sheerka execution flow + Enhanced log management
This commit is contained in:
+14
-12
@@ -5,10 +5,8 @@ import zlib
|
||||
|
||||
from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO
|
||||
from sdp.sheerkaSerializer import Serializer, SerializerContext
|
||||
import logging
|
||||
from core.sheerka_logger import get_logger
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
init_log = logging.getLogger(__name__ + ".init")
|
||||
|
||||
def json_default_converter(o):
|
||||
"""
|
||||
@@ -276,7 +274,9 @@ class SheerkaDataProvider:
|
||||
REF_PREFIX = "##REF##:"
|
||||
|
||||
def __init__(self, root=None):
|
||||
init_log.debug("Initializing sdp.")
|
||||
self.log = get_logger(__name__)
|
||||
self.init_log = get_logger("init." + __name__)
|
||||
self.init_log.debug("Initializing sdp.")
|
||||
|
||||
self.io = SheerkaDataProviderIO.get(root)
|
||||
self.first_time = self.io.first_time
|
||||
@@ -323,7 +323,6 @@ class SheerkaDataProvider:
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_stream_digest(stream):
|
||||
sha256_hash = hashlib.sha256()
|
||||
@@ -353,14 +352,14 @@ class SheerkaDataProvider:
|
||||
snapshot = self.get_snapshot()
|
||||
state = self.load_state(snapshot)
|
||||
|
||||
log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})")
|
||||
self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})")
|
||||
|
||||
if not isinstance(obj, ObjToUpdate):
|
||||
obj = ObjToUpdate(obj)
|
||||
|
||||
# check uniqueness, cannot add the same key twice if allow_multiple == False
|
||||
key = obj.get_key()
|
||||
log.debug(f"key found : '{key}'") if key else log.debug("No key found")
|
||||
self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found")
|
||||
if not allow_multiple:
|
||||
if isinstance(obj.obj, dict):
|
||||
for k in obj.obj:
|
||||
@@ -505,7 +504,10 @@ class SheerkaDataProvider:
|
||||
filter_to_use = (lambda k, o: True) if filter is None else filter
|
||||
for key, element in elements.items():
|
||||
if filter_to_use(key, element):
|
||||
yield self.load_ref_if_needed(element)[0]
|
||||
if isinstance(element, list):
|
||||
yield [self.load_ref_if_needed(e)[0] for e in element]
|
||||
else:
|
||||
yield self.load_ref_if_needed(element)[0]
|
||||
else:
|
||||
# manage when no key is defined for the elements
|
||||
if not isinstance(elements, list) and not isinstance(elements, set):
|
||||
@@ -643,7 +645,7 @@ class SheerkaDataProvider:
|
||||
|
||||
def save_state(self, state: State):
|
||||
digest = state.get_digest()
|
||||
log.debug(f"Saving new state. digest={digest}")
|
||||
self.log.debug(f"Saving new state. digest={digest}")
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest)
|
||||
if self.io.exists(target_path):
|
||||
return digest
|
||||
@@ -660,18 +662,18 @@ class SheerkaDataProvider:
|
||||
return self.serializer.deserialize(f, None)
|
||||
|
||||
def save_obj(self, obj):
|
||||
log.debug(f"Saving '{obj}' as reference...")
|
||||
self.log.debug(f"Saving '{obj}' as reference...")
|
||||
stream = self.serializer.serialize(obj, SerializerContext(user_name="kodjo"))
|
||||
digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream)
|
||||
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest)
|
||||
if self.io.exists(target_path):
|
||||
log.debug(f"...already saved. digest is {digest}")
|
||||
self.log.debug(f"...already saved. digest is {digest}")
|
||||
return digest
|
||||
|
||||
self.io.write_binary(target_path, stream.read())
|
||||
|
||||
log.debug(f"...digest={digest}.")
|
||||
self.log.debug(f"...digest={digest}.")
|
||||
return digest
|
||||
|
||||
def load_obj(self, digest, add_origin=True):
|
||||
|
||||
Reference in New Issue
Block a user