Implemented dictionnary based file system to speed up the tests
This commit is contained in:
+45
-70
@@ -1,9 +1,11 @@
|
||||
from os import path
|
||||
import os
|
||||
# from os import path
|
||||
# import os
|
||||
from datetime import datetime, date
|
||||
import hashlib
|
||||
import json
|
||||
import zlib
|
||||
|
||||
from sdp.sheerkaDataProviderIO import SheerkaDataProviderIO
|
||||
from sdp.sheerkaSerializer import Serializer, SerializerContext
|
||||
import logging
|
||||
|
||||
@@ -278,23 +280,11 @@ class SheerkaDataProvider:
|
||||
def __init__(self, root=None):
|
||||
log.debug("Initializing sdp.")
|
||||
|
||||
self.root = path.abspath(path.join(path.expanduser("~"), ".sheerka")) \
|
||||
if root is None \
|
||||
else path.abspath(root)
|
||||
log.debug("root is set to '" + self.root + "'")
|
||||
|
||||
if not path.exists(self.root):
|
||||
log.debug("root folder not found. Creating it.")
|
||||
os.makedirs(self.root)
|
||||
self.first_time = True
|
||||
else:
|
||||
self.first_time = False
|
||||
self.io = SheerkaDataProviderIO.get(root)
|
||||
self.first_time = self.io.first_time
|
||||
|
||||
self.serializer = Serializer()
|
||||
|
||||
def get_obj_path(self, object_type, digest):
|
||||
return path.join(self.root, object_type, digest[:24], digest)
|
||||
|
||||
@staticmethod
|
||||
def get_obj_key(obj):
|
||||
"""
|
||||
@@ -620,15 +610,11 @@ class SheerkaDataProvider:
|
||||
:return: digest of the event
|
||||
"""
|
||||
digest = event.get_digest()
|
||||
target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest)
|
||||
if path.exists(target_path):
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest)
|
||||
if self.io.exists(target_path):
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(target_path)):
|
||||
os.makedirs(path.dirname(target_path))
|
||||
|
||||
with open(target_path, "wb") as f:
|
||||
f.write(self.serializer.serialize(event, None).read())
|
||||
self.io.write_binary(target_path, self.serializer.serialize(event, None).read())
|
||||
|
||||
return digest
|
||||
|
||||
@@ -638,31 +624,27 @@ class SheerkaDataProvider:
|
||||
:param digest:
|
||||
:return:
|
||||
"""
|
||||
target_path = path.join(self.root, SheerkaDataProvider.EventFolder, digest[:24], digest)
|
||||
with open(target_path, "rb") as f:
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.EventFolder, digest)
|
||||
|
||||
with self.io.open(target_path, "rb") as f:
|
||||
return self.serializer.deserialize(f, None)
|
||||
|
||||
def save_state(self, state: State):
|
||||
digest = state.get_digest()
|
||||
log.debug(f"Saving new state. digest={digest}")
|
||||
target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest)
|
||||
if path.exists(target_path):
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest)
|
||||
if self.io.exists(target_path):
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(target_path)):
|
||||
os.makedirs(path.dirname(target_path))
|
||||
|
||||
with open(target_path, "wb") as f:
|
||||
f.write(self.serializer.serialize(state, None).read())
|
||||
|
||||
self.io.write_binary(target_path, self.serializer.serialize(state, None).read())
|
||||
return digest
|
||||
|
||||
def load_state(self, digest):
|
||||
if digest is None:
|
||||
return State()
|
||||
|
||||
target_path = path.join(self.root, SheerkaDataProvider.StateFolder, digest[:24], digest)
|
||||
with open(target_path, "rb") as f:
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.StateFolder, digest)
|
||||
with self.io.open(target_path, "rb") as f:
|
||||
return self.serializer.deserialize(f, None)
|
||||
|
||||
def save_obj(self, obj):
|
||||
@@ -670,16 +652,12 @@ class SheerkaDataProvider:
|
||||
stream = self.serializer.serialize(obj, SerializerContext(user_name="kodjo"))
|
||||
digest = obj.get_digest() if hasattr(obj, "get_digest") else self.get_stream_digest(stream)
|
||||
|
||||
target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest)
|
||||
if path.exists(target_path):
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest)
|
||||
if self.io.exists(target_path):
|
||||
log.debug(f"...already saved. digest is {digest}")
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(target_path)):
|
||||
os.makedirs(path.dirname(target_path))
|
||||
|
||||
with open(target_path, "wb") as f:
|
||||
f.write(stream.read())
|
||||
self.io.write_binary(target_path, stream.read())
|
||||
|
||||
log.debug(f"...digest={digest}.")
|
||||
return digest
|
||||
@@ -688,11 +666,11 @@ class SheerkaDataProvider:
|
||||
if digest is None:
|
||||
return None
|
||||
|
||||
target_path = path.join(self.root, SheerkaDataProvider.ObjectsFolder, digest[:24], digest)
|
||||
if not path.exists(target_path):
|
||||
target_path = self.io.get_obj_path(SheerkaDataProvider.ObjectsFolder, digest)
|
||||
if not self.io.exists(target_path):
|
||||
return None
|
||||
|
||||
with open(target_path, "rb") as f:
|
||||
with self.io.open(target_path, "rb") as f:
|
||||
obj = self.serializer.deserialize(f, SerializerContext(origin=digest))
|
||||
|
||||
# set the origin of the object
|
||||
@@ -721,7 +699,7 @@ class SheerkaDataProvider:
|
||||
|
||||
def get_cache_params(self, category, key):
|
||||
digest = hashlib.sha3_256(f"{category}:{key}".encode("utf-8")).hexdigest()
|
||||
cache_path = path.join(self.root, SheerkaDataProvider.CacheFolder, digest[:24], digest)
|
||||
cache_path = self.io.get_obj_path(SheerkaDataProvider.CacheFolder, digest)
|
||||
return digest, cache_path
|
||||
|
||||
def add_to_cache(self, category, key, obj, update=False):
|
||||
@@ -735,15 +713,10 @@ class SheerkaDataProvider:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
|
||||
if path.exists(cache_path) and not update:
|
||||
if self.io.exists(cache_path) and not update:
|
||||
return digest
|
||||
|
||||
if not path.exists(path.dirname(cache_path)):
|
||||
os.makedirs(path.dirname(cache_path))
|
||||
|
||||
with open(cache_path, "wb") as f:
|
||||
f.write(zlib.compress(obj.encode("utf-8"), 9))
|
||||
|
||||
self.io.write_binary(cache_path, zlib.compress(obj.encode("utf-8"), 9))
|
||||
return digest
|
||||
|
||||
def load_from_cache(self, category, key):
|
||||
@@ -755,10 +728,10 @@ class SheerkaDataProvider:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
|
||||
if not path.exists(cache_path):
|
||||
if not self.io.exists(cache_path):
|
||||
raise IndexError(f"{category}.{key}")
|
||||
|
||||
with open(cache_path, "rb") as f:
|
||||
with self.io.open(cache_path, "rb") as f:
|
||||
return zlib.decompress(f.read()).decode("utf-8")
|
||||
|
||||
def remove_from_cache(self, category, key):
|
||||
@@ -769,8 +742,8 @@ class SheerkaDataProvider:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
if path.exists(cache_path):
|
||||
os.remove(cache_path)
|
||||
if self.io.exists(cache_path):
|
||||
self.io.remove(cache_path)
|
||||
|
||||
return digest
|
||||
|
||||
@@ -782,32 +755,34 @@ class SheerkaDataProvider:
|
||||
:return:
|
||||
"""
|
||||
digest, cache_path = self.get_cache_params(category, key)
|
||||
return path.exists(cache_path)
|
||||
return self.io.exists(cache_path)
|
||||
|
||||
def get_snapshot(self):
|
||||
head_file = path.join(self.root, SheerkaDataProvider.HeadFile)
|
||||
if not path.exists(head_file):
|
||||
head_file = self.io.path_join(SheerkaDataProvider.HeadFile)
|
||||
if not self.io.exists(head_file):
|
||||
return None
|
||||
with open(head_file, "r") as f:
|
||||
return f.read()
|
||||
return self.io.read_text(head_file)
|
||||
# with open(head_file, "r") as f:
|
||||
# return f.read()
|
||||
|
||||
def set_snapshot(self, digest):
|
||||
head_file = path.join(self.root, SheerkaDataProvider.HeadFile)
|
||||
with open(head_file, "w") as f:
|
||||
return f.write(digest)
|
||||
head_file = self.io.path_join(SheerkaDataProvider.HeadFile)
|
||||
return self.io.write_text(head_file, digest)
|
||||
# with open(head_file, "w") as f:
|
||||
# return f.write(digest)
|
||||
|
||||
def load_keys(self):
|
||||
keys_file = path.join(self.root, SheerkaDataProvider.KeysFile)
|
||||
if not path.exists(keys_file):
|
||||
keys_file = self.io.path_join(SheerkaDataProvider.KeysFile)
|
||||
if not self.io.exists(keys_file):
|
||||
keys = {}
|
||||
else:
|
||||
with open(keys_file, "r") as f:
|
||||
with self.io.open(keys_file, "r") as f:
|
||||
keys = json.load(f)
|
||||
return keys
|
||||
|
||||
def save_keys(self, keys):
|
||||
keys_file = path.join(self.root, SheerkaDataProvider.KeysFile)
|
||||
with open(keys_file, "w") as f:
|
||||
keys_file = self.io.path_join(SheerkaDataProvider.KeysFile)
|
||||
with self.io.open(keys_file, "w") as f:
|
||||
json.dump(keys, f)
|
||||
|
||||
def get_next_key(self, entry):
|
||||
|
||||
@@ -0,0 +1,189 @@
|
||||
import io
|
||||
from os import path
|
||||
import os
|
||||
from fs.memoryfs import MemoryFS
|
||||
import logging
|
||||
|
||||
|
||||
class SheerkaDataProviderIO:
|
||||
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
|
||||
def exists(self, file_path):
|
||||
pass
|
||||
|
||||
def open(self, file_path, mode):
|
||||
pass
|
||||
|
||||
def read_text(self, file_path):
|
||||
pass
|
||||
|
||||
def read_binary(self, file_path):
|
||||
pass
|
||||
|
||||
def write_text(self, file_path, content):
|
||||
pass
|
||||
|
||||
def write_binary(self, file_path, content):
|
||||
pass
|
||||
|
||||
def remove(self, file_path):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def get(root):
|
||||
if root == "mem://":
|
||||
return SheerkaDataProviderDictionaryIO()
|
||||
else:
|
||||
return SheerkaDataProviderFileIO(root)
|
||||
|
||||
def get_obj_path(self, object_type, digest):
|
||||
return path.join(self.root, object_type, digest[:24], digest)
|
||||
|
||||
def path_join(self, *paths):
|
||||
return path.join(self.root, *paths)
|
||||
|
||||
|
||||
class SheerkaDataProviderFileIO(SheerkaDataProviderIO):
|
||||
log = logging.getLogger("FileIO")
|
||||
|
||||
def __init__(self, root):
|
||||
root = path.abspath(path.join(path.expanduser("~"), ".sheerka")) \
|
||||
if root is None \
|
||||
else path.abspath(root)
|
||||
super().__init__(root)
|
||||
|
||||
self.log.debug("root is set to '" + self.root + "'")
|
||||
|
||||
if not path.exists(self.root):
|
||||
self.log.debug("root folder not found. Creating it.")
|
||||
os.makedirs(self.root)
|
||||
self.first_time = True
|
||||
else:
|
||||
self.first_time = False
|
||||
|
||||
def open(self, file_path, mode):
|
||||
return open(file_path, mode)
|
||||
|
||||
def read_text(self, file_path):
|
||||
with open(file_path) as f:
|
||||
return f.read()
|
||||
|
||||
def read_binary(self, file_path):
|
||||
with open(file_path, "rb") as f:
|
||||
return f.read()
|
||||
|
||||
def write_text(self, file_path, content):
|
||||
self._write(file_path, content, "w")
|
||||
|
||||
def write_binary(self, file_path, content):
|
||||
self._write(file_path, content, "wb")
|
||||
|
||||
def exists(self, file_path):
|
||||
return path.exists(file_path)
|
||||
|
||||
def remove(self, file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
@staticmethod
|
||||
def _write(file_path, content, mode):
|
||||
if not path.exists(path.dirname(file_path)):
|
||||
os.makedirs(path.dirname(file_path))
|
||||
|
||||
with open(file_path, mode) as f:
|
||||
f.write(content)
|
||||
|
||||
|
||||
class SheerkaDataProviderMemoryIO(SheerkaDataProviderIO):
|
||||
log = logging.getLogger("MemoryIO")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("")
|
||||
self.mem_fs = MemoryFS()
|
||||
self.log.debug("Initializing memory file.")
|
||||
self.first_time = True
|
||||
|
||||
def open(self, file_path, mode):
|
||||
return self.mem_fs.open(file_path, mode)
|
||||
|
||||
def exists(self, file_path):
|
||||
return self.mem_fs.exists(file_path)
|
||||
|
||||
def read_text(self, file_path):
|
||||
return self.mem_fs.readtext(file_path)
|
||||
|
||||
def read_binary(self, file_path):
|
||||
return self.mem_fs.readbytes(file_path)
|
||||
|
||||
def write_binary(self, file_path, content):
|
||||
self._ensure_parent_folder(file_path)
|
||||
self.mem_fs.writebytes(file_path, content)
|
||||
|
||||
def write_text(self, file_path, content):
|
||||
self._ensure_parent_folder(file_path)
|
||||
self.mem_fs.writetext(file_path, content)
|
||||
|
||||
def remove(self, file_path):
|
||||
self.mem_fs.remove(file_path)
|
||||
|
||||
def _ensure_parent_folder(self, file_path):
|
||||
if not self.mem_fs.exists(path.dirname(file_path)):
|
||||
self.mem_fs.makedirs(path.dirname(file_path))
|
||||
|
||||
|
||||
class SheerkaDataProviderDictionaryIO(SheerkaDataProviderIO):
|
||||
log = logging.getLogger("DictionaryIO")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("")
|
||||
self.cache = {}
|
||||
self.log.debug("Initializing dictionary file.")
|
||||
self.first_time = True
|
||||
|
||||
def exists(self, file_path):
|
||||
if file_path == "":
|
||||
return True
|
||||
|
||||
return file_path in self.cache
|
||||
|
||||
def read_text(self, file_path):
|
||||
return self.cache[file_path]
|
||||
|
||||
def read_binary(self, file_path):
|
||||
return self.cache[file_path]
|
||||
|
||||
def write_binary(self, file_path, content):
|
||||
self.cache[file_path] = content
|
||||
|
||||
def write_text(self, file_path, content):
|
||||
self.cache[file_path] = content
|
||||
|
||||
def remove(self, file_path):
|
||||
del (self.cache[file_path])
|
||||
|
||||
def open(self, file_path, mode):
|
||||
if "w" in mode:
|
||||
stream = io.BytesIO() if "b" in mode else io.StringIO()
|
||||
stream.close = on_close(self, file_path, stream)(stream.close)
|
||||
return stream
|
||||
|
||||
return io.BytesIO(self.cache[file_path]) if "b" in mode else io.StringIO(self.cache[file_path])
|
||||
|
||||
|
||||
def on_close(dictionary_io, file_path, stream):
|
||||
"""
|
||||
Decorator to intercept the close.
|
||||
I guess that there are solution that are more elegant
|
||||
:param dictionary_io:
|
||||
:param file_path:
|
||||
:param stream:
|
||||
:return:
|
||||
"""
|
||||
def decorator(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
stream.seek(0)
|
||||
dictionary_io.cache[file_path] = stream.read()
|
||||
func(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorator
|
||||
Reference in New Issue
Block a user