Files
Sheerka-Old/sdp/sheerkaDataProvider.py
T

856 lines
27 KiB
Python

from datetime import datetime, date
import hashlib
import json
import zlib
from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO
from sdp.sheerkaSerializer import Serializer, SerializerContext
from core.sheerka_logger import get_logger
def json_default_converter(o):
"""
Default formatter for json
It's used when the json serializer does not know
how to serialise a type
:param o:
:return:
"""
if isinstance(o, (date, datetime)):
return o.isoformat()
class Event(object):
"""
Class that represents something that modifies the state of the system
"""
def __init__(self, message="", user="", date=datetime.now()):
self.version = 1
self.user = user
self.date = date
self.message = message
self._digest = None
def get_digest(self):
"""
Returns the digest of the event
:return: hexa form of the sha256
"""
if self._digest:
return self._digest
if self.message == "" and self.user == "":
self._digest = "xxx" # to speed unit tests
return self._digest
if not isinstance(self.message, str):
raise NotImplementedError
self._digest = hashlib.sha256(f"Event:{self.user}{self.date}{self.message}".encode("utf-8")).hexdigest()
return self._digest
def to_dict(self):
return self.__dict__
def from_dict(self, as_dict):
self.user = as_dict["user"]
self.date = datetime.fromisoformat(as_dict["date"])
self.message = as_dict["message"]
class ObjToUpdate:
"""
Internal key value class to hold the key (and the value)
when it is detected
It's created to distinguish from {key, value}
"""
def __init__(self, obj, key=None, digest=None):
self.obj = obj
self.has_key = None
self.has_digest = None
self._key = None
self._digest = None
if key is not None:
self.set_key(key)
if digest is not None:
self.set_digest(digest)
def get_key(self):
if self.has_key is None:
key = SheerkaDataProvider.get_obj_key(self.obj)
if key is None:
self.has_key = False
return None
else:
self.has_key = True
self._key = key
return key
elif not self.has_key:
return None
else:
return self._key
def get_digest(self):
if self.has_digest is None:
digest = SheerkaDataProvider.get_obj_digest(self.obj)
if digest is None:
self.has_digest = False
return None
else:
self.has_digest = True
self._digest = digest
return digest
elif not self.has_digest:
return None
else:
return self._digest
def set_digest(self, digest):
self.has_digest = True
self._digest = digest
def set_key(self, key):
self.has_key = True
self._key = key
class State:
"""
Class that represents the state of the system (dictionary of all known entries)
"""
def __init__(self):
self.version = 1
self.date = None
self.parents = []
self.events = []
self.data = {}
@staticmethod
def check_duplicate(items, obj: ObjToUpdate, key):
digest = obj.get_digest()
if digest is None:
return
if not hasattr(items, "__iter__"):
items = [items]
for item in items:
item_digest = SheerkaDataProvider.get_obj_digest(item)
if item_digest == digest:
raise SheerkaDataProviderDuplicateKeyError(key, obj.obj)
def update(self, entry, obj: ObjToUpdate, append=True):
"""
adds obj to entry
:param entry:
:param obj:
:param append: if True, duplicate keys will create lists
:return:
"""
obj_to_use = {obj.get_key(): obj.obj} if obj.has_key else obj.obj
if entry not in self.data:
self.data[entry] = obj_to_use
elif not append:
if isinstance(obj_to_use, dict):
self.data[entry].update(obj_to_use)
else:
self.data[entry] = obj_to_use
elif isinstance(self.data[entry], list):
self.check_duplicate(self.data[entry], obj, entry)
self.data[entry].append(obj.obj)
elif isinstance(obj_to_use, dict):
for k in obj_to_use:
if k not in self.data[entry]:
self.data[entry][k] = obj_to_use[k]
elif isinstance(self.data[entry][k], list):
self.check_duplicate(self.data[entry][k], obj, entry + "." + k)
self.data[entry][k].append(obj_to_use[k])
else:
self.check_duplicate(self.data[entry][k], obj, entry + "." + k)
self.data[entry][k] = [self.data[entry][k], obj_to_use[k]]
elif isinstance(self.data[entry], dict):
raise SheerkaDataProviderError(f"Cannot found key on '{obj.obj}' while all other elements have.", obj.obj)
else:
self.check_duplicate(self.data[entry], obj, entry)
self.data[entry] = [self.data[entry], obj_to_use]
def modify(self, entry, key, obj, obj_key):
# if the key changes, make sure to remove the previous entry
append = False
if obj_key != key:
self.remove(entry, lambda k, o: k == key) # modify from on object to another
append = True
self.update(entry, ObjToUpdate(obj, obj_key), append=append)
def modify_in_list(self, entry, key, obj, obj_key, obj_origin, load_ref_if_needed, save_ref_if_needed):
found = False
to_remove = None
for i in range(len(self.data[entry][key])):
item, is_ref = load_ref_if_needed(self.data[entry][key][i])
if not hasattr(item, "get_digest"):
continue
if item.get_digest() == obj_origin:
obj = save_ref_if_needed(is_ref, obj)
if obj_key == key:
self.data[entry][key][i] = obj
else:
to_remove = i
self.update(entry, ObjToUpdate(obj, obj_key), append=True)
found = True
break
if not found:
raise (SheerkaDataProviderError(f"Cannot modify '{entry}.{key}'. Item '{obj_origin}' not found.", obj))
if to_remove is not None:
del self.data[entry][key][to_remove]
def remove(self, entry, filter):
if filter is None:
del (self.data[entry])
elif isinstance(self.data[entry], dict):
keys_to_remove = []
for key, element in self.data[entry].items():
if filter(key, element):
keys_to_remove.append(key)
for key in keys_to_remove:
del (self.data[entry][key])
elif not isinstance(self.data[entry], list):
if filter(self.data[entry]):
del (self.data[entry])
else:
for element in self.data[entry]:
if filter(element):
self.data[entry].remove(element)
def get_digest(self):
as_json = json.dumps(self.__dict__, default=json_default_converter)
return hashlib.sha256(as_json.encode("utf-8")).hexdigest()
def contains(self, entry, key):
"""
if key is None, returns True if entry exists
if key has a value
returns True if entry is an dict and contains key
:param entry:
:param key:
:return:
"""
if entry not in self.data:
return False
if key is None:
return entry in self.data
if not isinstance(self.data[entry], dict):
return False
return key in self.data[entry]
class SheerkaDataProviderError(Exception):
def __init__(self, message, obj):
Exception.__init__(self, message)
self.obj = obj
class SheerkaDataProviderDuplicateKeyError(Exception):
def __init__(self, key, obj):
Exception.__init__(self, "Duplicate object.")
self.key = key
self.obj = obj
class SheerkaDataProvider:
"""Manages the state of the system"""
EventFolder = "events"
StateFolder = "state"
ObjectsFolder = "objects"
CacheFolder = "cache"
HeadFile = "HEAD"
KeysFile = "keys"
REF_PREFIX = "##REF##:"
def __init__(self, root=None):
self.log = get_logger(__name__)
self.init_log = get_logger("init." + __name__)
self.init_log.debug("Initializing sdp.")
self.io = SheerkaDataProviderIO.get(root)
self.first_time = self.io.first_time
self.serializer = Serializer()
@staticmethod
def get_obj_key(obj):
"""
Tries to find the key of an object
Look for .key, .get_key()
:param obj:
:return: String version of that is found, None otherwise
"""
return str(obj.key) if hasattr(obj, "key") \
else str(obj.get_key()) if hasattr(obj, "get_key") \
else None
@staticmethod
def get_obj_digest(obj):
"""
Tries to find the key of an object
Look for .digest, .get_digest()
:param obj:
:return: digest, None otherwise
"""
if isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX):
return obj[len(SheerkaDataProvider.REF_PREFIX):]
return obj.digest if hasattr(obj, "digest") \
else obj.get_digest() if hasattr(obj, "get_digest") \
else None
@staticmethod
def get_obj_origin(obj):
"""
Get the digest used to save obj if set
"""
if isinstance(obj, dict) and Serializer.ORIGIN in obj:
return obj[Serializer.ORIGIN]
if hasattr(obj, Serializer.ORIGIN):
return getattr(obj, Serializer.ORIGIN)
return None
@staticmethod
def get_stream_digest(stream):
sha256_hash = hashlib.sha256()
for byte_block in iter(lambda: stream.read(4096), b""):
sha256_hash.update(byte_block)
stream.seek(0)
return sha256_hash.hexdigest()
@staticmethod
def is_reference(obj):
return isinstance(obj, str) and obj.startswith(SheerkaDataProvider.REF_PREFIX)
def add(self, event_digest: str, entry, obj, allow_multiple=True, use_ref=False):
"""
Adds obj to the entry 'entry'
:param event_digest: digest of the event that triggers the modification of the state
:param entry: entry of the state to update
:param obj: obj to insert or add
:param allow_multiple: if set to true, the same key can be added several times.
All entries will be put in a list
:param use_ref: if True the actual object is saved under 'objects' folder,
only a reference is saved in the state
:return: (entry, key) to retrieve the object
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
self.log.debug(f"Adding obj '{obj}' in entry '{entry}' (allow_multiple={allow_multiple}, use_ref={use_ref})")
if not isinstance(obj, ObjToUpdate):
obj = ObjToUpdate(obj)
# check uniqueness, cannot add the same key twice if allow_multiple == False
key = obj.get_key()
self.log.debug(f"key found : '{key}'") if key else self.log.debug("No key found")
if not allow_multiple:
if isinstance(obj.obj, dict):
for k in obj.obj:
if state.contains(entry, k):
raise IndexError(f"{entry}.{k}")
else:
if state.contains(entry, key):
raise IndexError(f"{entry}.{key}" if key else entry)
state.parents = [] if snapshot is None else [snapshot]
state.events = [event_digest]
state.date = datetime.now()
if use_ref:
obj.set_digest(self.save_obj(obj.obj))
obj.obj = self.REF_PREFIX + obj.get_digest()
state.update(entry, obj)
new_snapshot = self.save_state(state)
self.set_snapshot(new_snapshot)
return entry, key
def add_with_auto_key(self, event_digest: str, entry, obj):
"""
Add obj to entry. An autogenerated key created for obj
:param event_digest:
:param entry:
:param obj:
:return:
"""
next_key = self.get_next_key(entry)
if hasattr(obj, "set_key"):
obj.set_key(next_key)
self.add(event_digest, entry, ObjToUpdate(obj, next_key))
return entry, next_key
def add_unique(self, event_digest: str, entry, obj):
"""Add an entry and make sure it's unique"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
state.parents = [] if snapshot is None else [snapshot]
state.events = [event_digest]
state.date = datetime.now()
if entry not in state.data:
state.data[entry] = {obj}
already_exist = False
else:
already_exist = obj in state.data[entry]
if not already_exist:
state.data[entry].add(obj)
new_snapshot = self.save_state(state)
self.set_snapshot(new_snapshot)
return (None if already_exist else entry), None
def set(self, event_digest, entry, obj, use_ref=False):
"""
Add or replace an entry. The entry is reinitialized.
If the previous value was dict, all keys are lost
:param event_digest:
:param entry:
:param obj:
:param use_ref:
:return:
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
state.parents = [] if snapshot is None else [snapshot]
state.events = [event_digest]
state.date = datetime.now()
key = self.get_obj_key(obj)
obj = self.save_ref_if_needed(use_ref, obj)
state.data[entry] = obj if key is None else {key: obj}
new_snapshot = self.save_state(state)
self.set_snapshot(new_snapshot)
return entry, key
def modify(self, event_digest, entry, key, obj):
"""
Replace an element
If the key is not provided, has the same effect than set eg, the entry is reset
:param event_digest:
:param entry:
:param key: key of the object to update
:param obj: new data
:return:
"""
if key is None:
raise SheerkaDataProviderError("Key is mandatory.", None)
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
if entry not in state.data:
raise IndexError(entry)
if key is not None and key not in state.data[entry]:
raise IndexError(f"{entry}.{key}")
state.parents = [] if snapshot is None else [snapshot]
state.events = [event_digest]
state.date = datetime.now()
# Gets obj original key, it will help to know if the key has changed
obj_key = self.get_obj_key(obj) or key
if isinstance(state.data[entry][key], list):
obj_origin = self.get_obj_origin(obj)
if obj_origin is None:
raise (SheerkaDataProviderError(f"Multiple entries under '{entry}.{key}'", obj))
state.modify_in_list(entry, key, obj, obj_key, obj_origin, self.load_ref_if_needed, self.save_ref_if_needed)
else:
obj = self.save_ref_if_needed(self.is_reference(state.data[entry][key]), obj)
state.modify(entry, key, obj, obj_key)
new_snapshot = self.save_state(state)
self.set_snapshot(new_snapshot)
return entry, obj_key
def list(self, entry, filter=None):
"""
Lists elements of entry 'entry'
:param entry: name of the entry to list
:param filter: filter to use
:return: list of elements
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
if entry not in state.data:
return []
elements = state.data[entry]
if isinstance(elements, dict):
# manage when elements have a key
filter_to_use = (lambda k, o: True) if filter is None else filter
for key, element in elements.items():
if filter_to_use(key, element):
if isinstance(element, list):
yield [self.load_ref_if_needed(e)[0] for e in element]
else:
yield self.load_ref_if_needed(element)[0]
else:
# manage when no key is defined for the elements
if not isinstance(elements, list) and not isinstance(elements, set):
elements = [elements]
filter_to_use = (lambda o: True) if filter is None else filter
for element in elements:
if filter_to_use(element):
yield self.load_ref_if_needed(element)[0]
def remove(self, event_digest, entry, filter=None):
"""
Removes elements under the entry 'entry'
:param event_digest: event that triggers the deletion
:param entry:
:param filter: filter to use
:return: new sha256 of the state
TODO: Remove by key
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
if entry not in state.data:
raise IndexError(entry)
state.parents = [] if snapshot is None else [snapshot]
state.events = [event_digest]
state.date = datetime.now()
state.remove(entry, filter)
new_snapshot = self.save_state(state)
self.set_snapshot(new_snapshot)
return new_snapshot
def get(self, entry, key=None, load_origin=True):
"""
Retrieve an element by its key
:param entry:
:param key:
:return:
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
if entry not in state.data:
raise IndexError(entry)
if key is not None and key not in state.data[entry]:
raise IndexError(f"{entry}.{key}")
item = state.data[entry] if key is None else state.data[entry][key]
if isinstance(item, list):
return [self.load_ref_if_needed(i, load_origin)[0] for i in item]
return self.load_ref_if_needed(item, load_origin)[0]
def get_safe(self, entry, key=None, load_origin=True):
"""
Retrieve an element by its key. Return None if the element does not exist
:param entry:
:param key:
:return:
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
if entry not in state.data:
return None
if key is not None and key not in state.data[entry]:
return None
item = state.data[entry] if key is None else state.data[entry][key]
if isinstance(item, list):
return [self.load_ref_if_needed(i, load_origin)[0] for i in item]
return self.load_ref_if_needed(item, load_origin)[0]
def exists(self, entry, key=None, digest=None):
"""
Returns true if the entry is defined
:param key:
:param entry:
:param digest: digest of the object, when several entries share the same key
:return:
"""
snapshot = self.get_snapshot()
state = self.load_state(snapshot)
exist = entry in state.data
if not exist or key is None:
return exist
items = state.data[entry]
exist = key in items
if not exist or digest is None:
return exist
items = items[key]
if not isinstance(items, list):
items = [items]
for item in items:
item_digest = SheerkaDataProvider.get_obj_digest(item)
if item_digest == digest:
return True
return False
def save_event(self, event: Event):
"""
return an event, given its digest
:param event:
:return: digest of the event
"""
digest = event.get_digest()
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest)
if self.io.exists(target_path):
return digest
self.io.write_binary(target_path, self.serializer.serialize(event, None).read())
return digest
def load_event(self, digest):
"""
return an event, given its digest
:param digest:
:return:
"""
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest)
with self.io.open(target_path, "rb") as f:
return self.serializer.deserialize(f, None)
def save_result(self, execution_context):
"""
Save the execution context associated with an event
To make a long story short,
for every single user input, there is an event (which is the first thing that is created)
and a result (the ExecutionContext created by sheerka.evaluate_user_input()
:param execution_context:
:return:
"""
digest = execution_context.event.get_digest()
self.log.debug(f"Saving execution context. digest={digest}")
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) + "_result"
if self.io.exists(target_path):
return digest
self.io.write_binary(target_path, self.serializer.serialize(execution_context, None).read())
return digest
def load_result(self, digest):
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest) + "_result"
with self.io.open(target_path, "rb") as f:
return self.serializer.deserialize(f, None)
def save_state(self, state: State):
digest = state.get_digest()
self.log.debug(f"Saving new state. digest={digest}")
target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest)
if self.io.exists(target_path):
return digest
self.io.write_binary(target_path, self.serializer.serialize(state, None).read())
return digest
def load_state(self, digest):
if digest is None:
return State()
target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest)
with self.io.open(target_path, "rb") as f:
return self.serializer.deserialize(f, None)
def save_obj(self, obj):
self.log.debug(f"Saving '{obj}' as reference...")
stream = self.serializer.serialize(obj, SerializerContext(user_name="kodjo"))
digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream)
target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest)
if self.io.exists(target_path):
self.log.debug(f"...already saved. digest is {digest}")
return digest
self.io.write_binary(target_path, stream.read())
self.log.debug(f"...digest={digest}.")
return digest
def load_obj(self, digest, add_origin=True):
if digest is None:
return None
target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest)
if not self.io.exists(target_path):
return None
with self.io.open(target_path, "rb") as f:
obj = self.serializer.deserialize(f, SerializerContext(origin=digest))
# set the origin of the object
if add_origin:
if isinstance(obj, dict):
obj[Serializer.ORIGIN] = digest
elif not isinstance(obj, str):
setattr(obj, Serializer.ORIGIN, digest)
return obj
def load_ref_if_needed(self, obj, load_origin=True):
if not isinstance(obj, str):
return obj, False
if not obj.startswith(SheerkaDataProvider.REF_PREFIX):
return obj, False
resolved = self.load_obj(obj[len(SheerkaDataProvider.REF_PREFIX):], load_origin)
if resolved is None:
return obj, False
return resolved, True
def save_ref_if_needed(self, save_ref, obj):
if not save_ref:
return obj
digest = self.save_obj(obj)
return self.REF_PREFIX + digest
def get_cache_params(self, category, key):
digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest()
cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest)
return digest, cache_path
def add_to_cache(self, category, key, obj, update=False):
"""
Save obj in the internal cache system
:param category:
:param key:
:param obj:
:param update:
:return:
"""
digest, cache_path = self.get_cache_params(category, key)
if self.io.exists(cache_path) and not update:
return digest
self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9))
return digest
def load_from_cache(self, category, key):
"""
Reload a compress object from the cache
:param category:
:param key:
:return:
"""
digest, cache_path = self.get_cache_params(category, key)
if not self.io.exists(cache_path):
raise IndexError(f"{category}.{key}")
with self.io.open(cache_path, "rb") as f:
return zlib.decompress(f.read()).decode("utf-8")
def remove_from_cache(self, category, key):
"""
:param category:
:param key:
:return:
"""
digest, cache_path = self.get_cache_params(category, key)
if self.io.exists(cache_path):
self.io.remove(cache_path)
return digest
def in_cache(self, category, key):
"""
Returns true if the key is in cache
:param category:
:param key:
:return:
"""
digest, cache_path = self.get_cache_params(category, key)
return self.io.exists(cache_path)
def get_snapshot(self):
head_file = self.io.path_join(SheerkaDataProvider.HeadFile)
if not self.io.exists(head_file):
return None
return self.io.read_text(head_file)
# with open(head_file, "r") as f:
# return f.read()
def set_snapshot(self, digest):
head_file = self.io.path_join(SheerkaDataProvider.HeadFile)
return self.io.write_text(head_file, digest)
# with open(head_file, "w") as f:
# return f.write(digest)
def load_keys(self):
keys_file = self.io.path_join(SheerkaDataProvider.KeysFile)
if not self.io.exists(keys_file):
keys = {}
else:
with self.io.open(keys_file, "r") as f:
keys = json.load(f)
return keys
def save_keys(self, keys):
keys_file = self.io.path_join(SheerkaDataProvider.KeysFile)
with self.io.open(keys_file, "w") as f:
json.dump(keys, f)
def get_next_key(self, entry):
keys = self.load_keys()
next_key = keys.get(entry, 0) + 1
keys[entry] = next_key
self.save_keys(keys)
return str(next_key)
def set_key(self, entry, value):
keys = self.load_keys()
keys[entry] = value
self.save_keys(keys)
return str(value)