2315 lines
78 KiB
Python
2315 lines
78 KiB
Python
# import hashlib
|
|
# import json
|
|
# import os
|
|
# import shutil
|
|
# from datetime import date, datetime
|
|
# from os import path
|
|
#
|
|
# import core.utils
|
|
# import pytest
|
|
# from sdp.sheerkaDataProvider import SheerkaDataProvider, Event, SheerkaDataProviderError, \
|
|
# SheerkaDataProviderDuplicateKeyError, SheerkaDataProviderResult, SheerkaDataProviderRef
|
|
# from sdp.sheerkaSerializer import JsonSerializer, Serializer, PickleSerializer
|
|
#
|
|
# tests_root = path.abspath("../../build/tests")
|
|
# evt_digest = "3a571cb6034ef6fc8d7fe91948d0d29728eed74de02bac7968b0e9facca2c2d7"
|
|
#
|
|
#
|
|
# def read_json_file(sdp, file_name):
|
|
# with sdp.io.open(file_name, "r") as f:
|
|
# return json.load(f)
|
|
#
|
|
#
|
|
# class ObjWithKey:
|
|
# """
|
|
# Object where the key can be resolved using get_key()
|
|
# Not suitable for Json dump as there is no to_dict() method
|
|
# """
|
|
#
|
|
# def __init__(self, a, b):
|
|
# self.a = a
|
|
# self.b = b
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjWithKey) and \
|
|
# self.a == obj.a and \
|
|
# self.b == obj.b
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjWithKey({self.a}, {self.b})"
|
|
#
|
|
# def get_key(self):
|
|
# return self.a
|
|
#
|
|
#
|
|
# class ObjSetKey:
|
|
# """
|
|
# Object where the key can be be automatically set thanks to set_key()
|
|
# Not suitable for Json dump as there is no to_dict() method
|
|
# """
|
|
#
|
|
# def __init__(self, value, key=None):
|
|
# self.value = value
|
|
# self.key = key
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjSetKey) and \
|
|
# self.key == obj.key and \
|
|
# self.value == obj.value
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjSetKey({self.key}, {self.value})"
|
|
#
|
|
# def set_key(self, key):
|
|
# self.key = key
|
|
#
|
|
#
|
|
# class ObjNoKey:
|
|
# """
|
|
# Object with no key, they won't be ordered
|
|
# Not suitable for Json dump as there is no to_dict() method
|
|
# """
|
|
#
|
|
# def __init__(self, a, b):
|
|
# self.a = a
|
|
# self.b = b
|
|
#
|
|
# def __hash__(self):
|
|
# return hash((self.a, self.b))
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjNoKey) and \
|
|
# self.a == obj.a and \
|
|
# self.b == obj.b
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjNoKey({self.a}, {self.b})"
|
|
#
|
|
#
|
|
# class ObjDumpJson:
|
|
# """
|
|
# Object where the key can be resolved using get_key()
|
|
# that can be used to dump as Json
|
|
# """
|
|
#
|
|
# def __init__(self, key=None, value=None):
|
|
# self.key = key
|
|
# self.value = value
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjDumpJson) and \
|
|
# self.key == obj.key and \
|
|
# self.value == obj.value
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjDumpJson({self.key}, {self.value})"
|
|
#
|
|
# def get_key(self):
|
|
# return self.key
|
|
#
|
|
# def get_digest(self):
|
|
# """
|
|
# Returns the digest of the event
|
|
# :return: hexa form of the sha256
|
|
# """
|
|
# return hashlib.sha256(f"Concept:{self.key}{self.value}".encode("utf-8")).hexdigest()
|
|
#
|
|
# def to_dict(self):
|
|
# return self.__dict__
|
|
#
|
|
# def from_dict(self, as_dict):
|
|
# self.value = as_dict["value"]
|
|
# self.key = as_dict["key"]
|
|
#
|
|
#
|
|
# class ObjDumpJsonNoDigest:
|
|
# """
|
|
# Object where the key can be resolved using get_key()
|
|
# that can be used to dump as Json,
|
|
# But with no builtin digest computation
|
|
# """
|
|
#
|
|
# def __init__(self, key=None, value=None):
|
|
# self.key = key
|
|
# self.value = value
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjDumpJsonNoDigest) and \
|
|
# self.key == obj.key and \
|
|
# self.value == obj.value
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjDumpJsonNoDigest({self.key}, {self.value})"
|
|
#
|
|
# def get_key(self):
|
|
# return self.key
|
|
#
|
|
# def to_dict(self):
|
|
# return self.__dict__
|
|
#
|
|
# def from_dict(self, as_dict):
|
|
# self.value = as_dict["value"]
|
|
# self.key = as_dict["key"]
|
|
#
|
|
#
|
|
# class ObjWithDigestNoKey:
|
|
# """
|
|
# Object that can compute its digest.
|
|
# It can be used to test objects sharing the same entry (but that are different)
|
|
# Not suitable for Json dump as there is no to_dict() method
|
|
# """
|
|
#
|
|
# def __init__(self, a, b):
|
|
# self.a = a
|
|
# self.b = b
|
|
#
|
|
# def __hash__(self):
|
|
# return hash((self.a, self.b))
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjNoKey) and \
|
|
# self.a == obj.a and \
|
|
# self.b == obj.b
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjWithDigestNoKey({self.a}, {self.b})"
|
|
#
|
|
# def get_digest(self):
|
|
# return str(self.a) + str(self.b)
|
|
#
|
|
#
|
|
# class ObjWithDigestWithKey:
|
|
# """
|
|
# Object with a key that can compute its digest.
|
|
# It can be used to test objects sharing the same key (but that are different)
|
|
# Not suitable for Json dump as there is no to_dict() method
|
|
# """
|
|
#
|
|
# def __init__(self, a, b):
|
|
# self.a = a
|
|
# self.b = b
|
|
#
|
|
# def __hash__(self):
|
|
# return hash((self.a, self.b))
|
|
#
|
|
# def __eq__(self, obj):
|
|
# return isinstance(obj, ObjWithDigestWithKey) and \
|
|
# self.a == obj.a and \
|
|
# self.b == obj.b
|
|
#
|
|
# def __repr__(self):
|
|
# return f"ObjWithDigestWithKey({self.a}, {self.b})"
|
|
#
|
|
# def get_key(self):
|
|
# return self.a
|
|
#
|
|
# def get_digest(self):
|
|
# return str(self.a) + str(self.b)
|
|
#
|
|
#
|
|
# @pytest.fixture(autouse=True)
|
|
# def init_test():
|
|
# if path.exists(tests_root):
|
|
# shutil.rmtree(tests_root)
|
|
#
|
|
# if not path.exists(tests_root):
|
|
# os.makedirs(tests_root)
|
|
# current_pwd = os.getcwd()
|
|
# os.chdir(tests_root)
|
|
#
|
|
# yield None
|
|
#
|
|
# os.chdir(current_pwd)
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root, expected", [
|
|
# (".sheerka", path.abspath(path.join(tests_root, ".sheerka"))),
|
|
# ("mem://", "")
|
|
# ])
|
|
# def test_i_can_init_the_data_provider(root, expected):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# assert sdp.io.root == expected
|
|
# assert sdp.io.exists(sdp.io.root)
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_save_and_load_an_event(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# event = Event("hello world", date=date(year=2007, month=9, day=10), user="kodjo")
|
|
#
|
|
# evt_digest = sdp.save_event(event)
|
|
# evt = sdp.load_event(evt_digest)
|
|
#
|
|
# assert evt.version == 1
|
|
# assert evt.date == datetime(year=2007, month=9, day=10)
|
|
# assert evt.user_id == "kodjo"
|
|
# assert evt.message == "hello world"
|
|
# assert evt.parents is None
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.EventFolder, evt_digest[0:24], evt_digest))
|
|
#
|
|
# # I can get the last event
|
|
# evt = sdp.load_event()
|
|
# assert evt.message == "hello world"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_event_history(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# event = Event("hello world", date=date(year=2007, month=9, day=10), user="kodjo")
|
|
# event2 = Event("hello world 2", date=date(year=2007, month=9, day=10), user="kodjo")
|
|
#
|
|
# evt_digest1 = sdp.save_event(event)
|
|
# evt_digest2 = sdp.save_event(event2)
|
|
#
|
|
# evt = sdp.load_event(evt_digest2)
|
|
# assert evt.version == 1
|
|
# assert evt.date == datetime(year=2007, month=9, day=10)
|
|
# assert evt.user_id == "kodjo"
|
|
# assert evt.message == "hello world 2"
|
|
# assert evt.parents == [evt_digest1]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_load_events(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# for i in range(15):
|
|
# sdp.save_event(Event(f"Hello {i}"))
|
|
#
|
|
# events = list(sdp.load_events(10)) # first ten
|
|
# assert len(events) == 10
|
|
# assert events[0].message == "Hello 14"
|
|
# assert events[9].message == "Hello 5"
|
|
#
|
|
# events = list(sdp.load_events(10, 5)) # skip first 5, then take 10
|
|
# assert len(events) == 10
|
|
# assert events[0].message == "Hello 9"
|
|
# assert events[9].message == "Hello 0"
|
|
#
|
|
# events = list(sdp.load_events(20, 10)) # skip first 10, take 20,(but only 5 remaining)
|
|
# assert len(events) == 5
|
|
# assert events[0].message == "Hello 4"
|
|
# assert events[4].message == "Hello 0"
|
|
#
|
|
# events = list(sdp.load_events(1, 20)) # skip first 20, take one
|
|
# assert len(events) == 0
|
|
#
|
|
# events = list(sdp.load_events(0)) # all
|
|
# assert len(events) == 15
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_load_events_when_no_event(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# events = list(sdp.load_events(1))
|
|
# assert len(events) == 0
|
|
#
|
|
# events = list(sdp.load_events(1, 5))
|
|
# assert len(events) == 0
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_an_string(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = "foo => bar"
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj)
|
|
# last_commit = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
# state = sdp.load_state(last_commit)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
# assert loaded == obj
|
|
#
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, last_commit[0:24], last_commit))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert state.date is not None
|
|
# assert state.parents == []
|
|
# assert state.events == [evt_digest]
|
|
# assert state.data == {"entry": "foo => bar"}
|
|
#
|
|
# assert sdp.io.read_text(path.join(sdp.io.root, SheerkaDataProvider.HeadFile)) == last_commit
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_several_strings_if_allow_multiple_is_true(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", "foo")
|
|
# sdp.add(evt_digest, "entry", "foo")
|
|
# result = sdp.add(evt_digest, "entry", "bar")
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert result.obj == "bar"
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
# assert loaded == ["foo", "foo", "bar"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_several_strings_if_allow_multiple_is_false(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(IndexError) as index_error:
|
|
# sdp.add(evt_digest, "entry", "foo", False)
|
|
# sdp.add(evt_digest, "entry", "foo", False)
|
|
# assert index_error.value.args[0] == "entry"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_an_object_with_no_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjNoKey("a", "b")
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj)
|
|
# last_commit = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
# state = sdp.load_state(last_commit)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
#
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, last_commit[0:24], last_commit))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert state.date is not None
|
|
# assert state.parents == []
|
|
# assert state.events == [evt_digest]
|
|
# assert state.data == {"entry": ObjNoKey("a", "b")}
|
|
#
|
|
# assert sdp.io.read_text(path.join(sdp.io.root, SheerkaDataProvider.HeadFile)) == last_commit
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_several_obj_no_key_if_allow_multiple_is_true(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "b"))
|
|
# result = sdp.add(evt_digest, "entry", ObjNoKey("c", "d"))
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert result.obj == ObjNoKey("c", "d")
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
# assert loaded == [ObjNoKey("a", "b"), ObjNoKey("a", "b"), ObjNoKey("c", "d")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_several_obj_no_key_if_allow_multiple_is_false(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(IndexError) as index_error:
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "b"), False)
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("c", "d"), False)
|
|
# assert index_error.value.args[0] == "entry"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_a_dict(root):
|
|
# """
|
|
# Adding a dictionary.
|
|
# Note that there is no key when adding a dictionary
|
|
#
|
|
# If you add {'my_key': 'my_value'}
|
|
# 'my_key is not considered as the key of the entry'
|
|
#
|
|
# Because if you add {'my_key': 'my_value', 'my_key2': 'my_value2'}
|
|
# There are now multiple keys.
|
|
#
|
|
# So for dictionary entries, the key is not managed
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = {"my_key": "my_value"}
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj)
|
|
# last_commit = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
# state = sdp.load_state(last_commit)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# loaded_value = sdp.get(result.entry, "my_key") # we can retrieve by key
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None # we return None as dict may contains several entries
|
|
# assert result.digest is None
|
|
#
|
|
# assert loaded == obj
|
|
# assert loaded_value == "my_value"
|
|
#
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, last_commit[0:24], last_commit))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert state.date is not None
|
|
# assert state.parents == []
|
|
# assert state.events == [evt_digest]
|
|
# assert state.data == {"entry": obj}
|
|
#
|
|
# assert sdp.io.read_text(path.join(sdp.io.root, SheerkaDataProvider.HeadFile)) == last_commit
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_multiple_entries_at_once_with_dict(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = {"my_key1": "value1", "my_key2": "value2"}
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
# loaded_value1 = sdp.get(result.entry, "my_key1")
|
|
# loaded_value2 = sdp.get(result.entry, "my_key2")
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None # we return None as dict may contains several entries
|
|
# assert result.digest is None
|
|
#
|
|
# assert loaded == {"my_key1": "value1", "my_key2": "value2"}
|
|
# assert loaded_value1 == "value1"
|
|
# assert loaded_value2 == "value2"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_same_key_with_dict_if_allow_multiple_is_true(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", {"my_key": "my_value"})
|
|
# result = sdp.add(evt_digest, "entry", {"my_key": "my_value"})
|
|
# loaded1 = sdp.get(result.entry, result.key)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", {"my_key": "my_value2"})
|
|
# loaded2 = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert loaded1 == {"my_key": ["my_value", "my_value"]}
|
|
# assert loaded2 == {"my_key": ["my_value", "my_value", "my_value2"]}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_same_key_with_dict_if_allow_multiple_is_false(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(IndexError) as index_error:
|
|
# sdp.add(evt_digest, "entry", {"my_key": "my_value"}, False)
|
|
# sdp.add(evt_digest, "entry", {"my_key": "my_value2"}, False)
|
|
# assert index_error.value.args[0] == "entry.my_key"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_object_with_different_key_if_allow_multiple_is_false(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", {"my_key": "a"}, False)
|
|
# sdp.add(evt_digest, "entry", {"my_key2": "b"}, False)
|
|
#
|
|
# assert sdp.get("entry", "my_key") == "a"
|
|
# assert sdp.get("entry", "my_key2") == "b"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_obj_with_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj1 = ObjWithKey("key1", "b")
|
|
# obj2 = ObjSetKey("c", key="key2")
|
|
#
|
|
# result1 = sdp.add(evt_digest, "entry", obj1) # test when key is taken from obj.get_key()
|
|
# result2 = sdp.add(evt_digest, "entry2", obj2) # test when key is taken from obj.key
|
|
# last_commit = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
# state = sdp.load_state(last_commit)
|
|
#
|
|
# loaded1 = sdp.get(result1.entry, result1.key)
|
|
# loaded2 = sdp.get(result2.entry, result2.key)
|
|
#
|
|
# assert result1.obj == obj1
|
|
# assert result1.entry == "entry"
|
|
# assert result1.key == "key1"
|
|
# assert result1.digest is None
|
|
#
|
|
# assert result2.obj == obj2
|
|
# assert result2.entry == "entry2"
|
|
# assert result2.key == "key2"
|
|
# assert result2.digest is None
|
|
#
|
|
# assert loaded1 == ObjWithKey("key1", "b")
|
|
# assert loaded2 == ObjSetKey("c", key="key2")
|
|
#
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, last_commit[0:24], last_commit))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert state.date is not None
|
|
# assert len(state.parents) == 1
|
|
# assert state.events == [evt_digest]
|
|
# assert state.data == {"entry": {"key1": obj1}, "entry2": {"key2": obj2}}
|
|
#
|
|
# assert sdp.io.read_text(path.join(sdp.io.root, SheerkaDataProvider.HeadFile)) == last_commit
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_objects_with_same_key_if_allow_multiple_is_true(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("my_key", "b"))
|
|
# result = sdp.add(evt_digest, "entry", ObjSetKey("c", key="my_key"))
|
|
# loaded1 = sdp.get(result.entry, result.key)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", ObjSetKey("c", key="my_key"))
|
|
# sdp.add(evt_digest, "entry", ObjSetKey("c", key="my_key2")) # to prove that it does not melt everything
|
|
# loaded2 = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert loaded1 == [ObjWithKey("my_key", "b"), ObjSetKey("c", key="my_key")]
|
|
# assert loaded2 == [ObjWithKey("my_key", "b"), ObjSetKey("c", key="my_key"), ObjSetKey("c", key="my_key")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_object_with_same_key_if_allow_multiple_is_false(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(IndexError) as index_error:
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("my_key", "b"), False)
|
|
# sdp.add(evt_digest, "entry", ObjSetKey("c", key="my_key"), False)
|
|
# assert index_error.value.args[0] == "entry.my_key"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_obj_with_key_to_a_list(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", "foo")
|
|
# sdp.add(evt_digest, "entry", "bar") # entry is now a list
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "b")) # this entry must no be taken as a object with a key
|
|
#
|
|
# loaded = sdp.get("entry")
|
|
# assert loaded == ["foo", "bar", ObjWithKey("a", "b")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_a_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# obj1 = ObjWithDigestWithKey(1, "foo")
|
|
# result1 = sdp.add(evt_digest, "entry", obj1, use_ref=True)
|
|
# result3 = sdp.add(evt_digest, "entry_by_ref", SheerkaDataProviderRef(obj1.b, obj1.get_digest()))
|
|
#
|
|
# # another object
|
|
# obj2 = ObjWithDigestWithKey(2, "bar")
|
|
# sdp.add(evt_digest, "entry", obj2, use_ref=True)
|
|
# sdp.add(evt_digest, "entry_by_ref", SheerkaDataProviderRef(obj2.b, obj2.get_digest()))
|
|
#
|
|
# assert result1.obj == obj1
|
|
# assert result1.entry == "entry"
|
|
# assert result1.key == str(obj1.get_key())
|
|
# assert result1.digest == obj1.get_digest()
|
|
#
|
|
# assert result3.obj == SheerkaDataProviderRef(obj1.b, obj1.get_digest())
|
|
# assert result3.entry == "entry_by_ref"
|
|
# assert result3.key == "foo"
|
|
# assert result3.digest is None
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {
|
|
# "entry": {
|
|
# "1": '##REF##:' + obj1.get_digest(),
|
|
# "2": '##REF##:' + obj2.get_digest(),
|
|
# },
|
|
# "entry_by_ref": {
|
|
# "foo": SheerkaDataProviderRef(obj1.b, obj1.get_digest()),
|
|
# "bar": SheerkaDataProviderRef(obj2.b, obj2.get_digest())
|
|
# },
|
|
# }
|
|
#
|
|
# # make sure that I can load back
|
|
# loaded1 = sdp.get("entry_by_ref", "foo")
|
|
# assert loaded1 == ObjWithDigestWithKey(1, "foo")
|
|
# assert getattr(loaded1, Serializer.ORIGIN) == obj1.get_digest()
|
|
#
|
|
# loaded2 = sdp.get("entry_by_ref", "bar")
|
|
# assert loaded2 == ObjWithDigestWithKey(2, "bar")
|
|
# assert getattr(loaded2, Serializer.ORIGIN) == obj2.get_digest()
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_have_multiple_is_ref_to_the_same_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# ref_result1 = sdp.add(evt_digest, "entry", ObjWithDigestWithKey(1, "foo"), use_ref=True)
|
|
# ref_result2 = sdp.add(evt_digest, "entry", ObjWithDigestWithKey(2, "bar"), use_ref=True)
|
|
#
|
|
# sdp.add(evt_digest, "entry_ref", SheerkaDataProviderRef("1", ref_result1.digest))
|
|
# sdp.add(evt_digest, "entry_ref", SheerkaDataProviderRef("1", ref_result2.digest))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {'entry': {'1': '##REF##:1foo', '2': '##REF##:2bar'},
|
|
# 'entry_ref': {'1': [SheerkaDataProviderRef("1", ref_result1.digest),
|
|
# SheerkaDataProviderRef("1", ref_result2.digest)]},
|
|
# }
|
|
#
|
|
# loaded = sdp.get("entry_ref", "1")
|
|
# assert len(loaded) == 2
|
|
# assert loaded[0] == ObjWithDigestWithKey(1, "foo")
|
|
# assert loaded[1] == ObjWithDigestWithKey(2, "bar")
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_obj_with_no_key_when_then_entry_has_keys(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError) as error:
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", "foo")
|
|
#
|
|
# assert error.value.obj == "foo"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_string_using_auto_generated_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# key_file = path.join(sdp.io.root, SheerkaDataProvider.KeysFile)
|
|
#
|
|
# result1 = sdp.add_with_auto_key(evt_digest, "entry1", "foo")
|
|
# result2 = sdp.add_with_auto_key(evt_digest, "entry1", "bar")
|
|
# result3 = sdp.add_with_auto_key(evt_digest, "entry2", "baz")
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert sdp.io.exists(key_file)
|
|
# assert read_json_file(sdp, key_file) == {"entry1": 2, "entry2": 1}
|
|
# assert state.data == {"entry1": {"1": "foo", "2": "bar"}, "entry2": {"1": "baz"}}
|
|
# assert result1.obj == "foo"
|
|
# assert result2.obj == "bar"
|
|
# assert result3.obj == "baz"
|
|
# assert result1.entry == "entry1"
|
|
# assert result2.entry == "entry1"
|
|
# assert result3.entry == "entry2"
|
|
# assert result1.digest is None
|
|
# assert result2.digest is None
|
|
# assert result3.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_the_same_digest_twice_in_the_same_entry(root):
|
|
# """
|
|
# If get_digest() is implemented, checks for duplicates
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderDuplicateKeyError) as error:
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestNoKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestNoKey("a", "b"))
|
|
#
|
|
# assert error.value.obj.get_digest() == ObjWithDigestNoKey("a", "b").get_digest()
|
|
# assert error.value.key == "entry"
|
|
# assert error.value.args[0] == "Duplicate object."
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_the_same_digest_twice_in_the_same_entry2(root):
|
|
# """
|
|
# If get_digest() is implemented, checks for duplicates in list when no key
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderDuplicateKeyError) as error:
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestNoKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestNoKey("a", "c"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestNoKey("a", "b"))
|
|
#
|
|
# assert error.value.obj.get_digest() == ObjWithDigestNoKey("a", "b").get_digest()
|
|
# assert error.value.key == "entry"
|
|
# assert error.value.args[0] == "Duplicate object."
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_the_same_digest_twice_in_the_same_entry3(root):
|
|
# """
|
|
# If get_digest() is implemented, checks for duplicates when the key is provided
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderDuplicateKeyError) as error:
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestWithKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestWithKey("a", "b"))
|
|
#
|
|
# assert error.value.obj.get_digest() == ObjWithDigestWithKey("a", "b").get_digest()
|
|
# assert error.value.key == "entry.a"
|
|
# assert error.value.args[0] == "Duplicate object."
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_add_the_same_digest_twice_in_the_same_entry4(root):
|
|
# """
|
|
# If get_digest() is implemented, checks for duplicates in list when the key is provided
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderDuplicateKeyError) as error:
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestWithKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestWithKey("a", "c"))
|
|
# sdp.add(evt_digest, "entry", ObjWithDigestWithKey("a", "b"))
|
|
#
|
|
# assert error.value.obj.get_digest() == ObjWithDigestWithKey("a", "b").get_digest()
|
|
# assert error.value.key == "entry.a"
|
|
# assert error.value.args[0] == "Duplicate object."
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_and_set_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# key_file = path.join(sdp.io.root, SheerkaDataProvider.KeysFile)
|
|
# sdp.set_key("entry1", 1000)
|
|
#
|
|
# sdp.get_next_key("entry1")
|
|
# sdp.get_next_key("entry1")
|
|
# sdp.get_next_key("entry1")
|
|
# sdp.get_next_key("entry2")
|
|
# sdp.get_next_key("entry2")
|
|
#
|
|
# assert sdp.io.exists(key_file)
|
|
# assert read_json_file(sdp, key_file) == {"entry1": 1003, "entry2": 2}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_object_using_auto_generated_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# key_file = path.join(sdp.io.root, SheerkaDataProvider.KeysFile)
|
|
#
|
|
# result1 = sdp.add_with_auto_key(evt_digest, "entry1", ObjNoKey("a", "b"))
|
|
# result2 = sdp.add_with_auto_key(evt_digest, "entry1", ObjNoKey("a", "b"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert sdp.io.exists(key_file)
|
|
# assert read_json_file(sdp, key_file) == {"entry1": 2}
|
|
# assert state.data == {"entry1": {"1": ObjNoKey("a", "b"), "2": ObjNoKey("a", "b")}}
|
|
#
|
|
# assert result1.obj == ObjNoKey("a", "b")
|
|
# assert result2.obj == ObjNoKey("a", "b")
|
|
# assert result1.entry == "entry1"
|
|
# assert result2.entry == "entry1"
|
|
# assert result1.key == "1"
|
|
# assert result2.key == "2"
|
|
# assert result1.digest is None
|
|
# assert result2.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_object_key_is_updated_when_possible_using_auto_generated_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# key_file = path.join(sdp.io.root, SheerkaDataProvider.KeysFile)
|
|
#
|
|
# result1 = sdp.add_with_auto_key(evt_digest, "entry1", ObjSetKey("foo"))
|
|
# result2 = sdp.add_with_auto_key(evt_digest, "entry1", ObjSetKey("foo"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert sdp.io.exists(key_file)
|
|
# assert read_json_file(sdp, key_file) == {"entry1": 2}
|
|
# assert state.data == {"entry1": {"1": ObjSetKey("foo", "1"), "2": ObjSetKey("foo", "2")}}
|
|
#
|
|
# assert result1.obj == ObjSetKey("foo", "1")
|
|
# assert result2.obj == ObjSetKey("foo", "2")
|
|
# assert result1.entry == "entry1"
|
|
# assert result2.entry == "entry1"
|
|
# assert result1.key == "1"
|
|
# assert result2.key == "2"
|
|
# assert result1.digest is None
|
|
# assert result2.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_set_objects_with_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey(1, "foo"))
|
|
# result = sdp.set(evt_digest, "entry", ObjWithKey(2, "foo"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"2": ObjWithKey(2, "foo")}}
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "2"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_set_objects_with_no_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjNoKey(1, "foo"))
|
|
# result = sdp.set(evt_digest, "entry", ObjNoKey(2, "foo"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": ObjNoKey(2, "foo")}
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_set_from_list_to_dict(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.set(evt_digest, "entry", [ObjNoKey(1, "foo"), ObjNoKey(2, "foo")])
|
|
# result = sdp.set(evt_digest, "entry", {"1": ObjNoKey(1, "foo"), "2": ObjNoKey(2, "foo")})
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"1": ObjNoKey(1, "foo"), "2": ObjNoKey(2, "foo")}}
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_set_using_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithKey)))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey(1, "foo"))
|
|
# result = sdp.set(evt_digest, "entry", ObjWithKey(2, "foo"), use_ref=True)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"2": '##REF##:43f07065c7bad051cdd726bdfa4de7f8d754c31486c65ddb31d6b6548dec3db9'}}
|
|
#
|
|
# assert result.obj == ObjWithKey(2, "foo")
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "2"
|
|
# assert result.digest == "43f07065c7bad051cdd726bdfa4de7f8d754c31486c65ddb31d6b6548dec3db9"
|
|
#
|
|
# assert sdp.io.exists(sdp.io.get_obj_path(SheerkaDataProvider.ObjectsFolder,
|
|
# "43f07065c7bad051cdd726bdfa4de7f8d754c31486c65ddb31d6b6548dec3db9"))
|
|
#
|
|
# # sanity check, make sure that I can load back
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
# assert loaded == ObjWithKey(2, "foo")
|
|
# assert getattr(loaded, Serializer.ORIGIN) == "43f07065c7bad051cdd726bdfa4de7f8d754c31486c65ddb31d6b6548dec3db9"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_set_a_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# obj = ObjWithDigestWithKey(1, "foo")
|
|
# sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# sdp.set(evt_digest, "entry_by_value", {obj.b: obj.get_digest()}, is_ref=True)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {
|
|
# "entry": {"1": '##REF##:' + obj.get_digest()},
|
|
# "entry_by_value": {"foo": '##REF##:' + obj.get_digest()},
|
|
# }
|
|
#
|
|
# # sanity check, make sure that I can load back
|
|
# loaded = sdp.get("entry_by_value", "foo")
|
|
# assert loaded == ObjWithDigestWithKey(1, "foo")
|
|
# assert getattr(loaded, Serializer.ORIGIN) == obj.get_digest()
|
|
#
|
|
#
|
|
# def test_i_cannot_set_using_use_ref_and_is_ref():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError):
|
|
# sdp.set(evt_digest, "entry", ObjWithDigestWithKey("a", "b"), use_ref=True, is_ref=True)
|
|
#
|
|
#
|
|
# def test_i_cannot_set_using_is_ref_if_obj_is_not_a_dictionary():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError):
|
|
# sdp.set(evt_digest, "entry", ObjWithDigestWithKey("a", "b"), is_ref=True)
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_an_object_with_a_key_as_a_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# digest = state.data["entry"]["my_key"][len(SheerkaDataProvider.REF_PREFIX):]
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key == obj.key
|
|
# assert result.digest == obj.get_digest()
|
|
# assert digest == result.digest
|
|
# assert state.data == {'entry': {'my_key': f"{SheerkaDataProvider.REF_PREFIX}{digest}"}}
|
|
#
|
|
# loaded = sdp.load_obj(digest)
|
|
# assert loaded == obj
|
|
# assert getattr(loaded, Serializer.ORIGIN) == digest
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_a_dictionary_as_a_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = {"my_key": "value1"}
|
|
#
|
|
# # No need to register a serializer for dictionaries
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# digest = state.data["entry"][len(SheerkaDataProvider.REF_PREFIX):]
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key is None # we return None as dict may contains several entries
|
|
# assert result.digest == "1790cae3f354ecb6b419faaa2ee2c374ff33efb8cddafda9960924036ac04c1f" # a digest is created
|
|
# assert digest == result.digest
|
|
#
|
|
# assert state.data == {'entry': f"{SheerkaDataProvider.REF_PREFIX}{digest}"}
|
|
#
|
|
# loaded = sdp.load_obj(digest)
|
|
# assert loaded["my_key"] == obj["my_key"]
|
|
# assert loaded[Serializer.ORIGIN] == digest
|
|
# assert len(loaded) == 2
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_an_object_with_no_builtin_digest_as_a_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjDumpJsonNoDigest("a", "b")
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# digest = state.data["entry"][obj.get_key()][len(SheerkaDataProvider.REF_PREFIX):]
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key == obj.get_key()
|
|
# assert result.digest is not None
|
|
# assert digest == result.digest
|
|
#
|
|
# assert state.data == {'entry': {obj.key: f"{SheerkaDataProvider.REF_PREFIX}{result.digest}"}}
|
|
#
|
|
# loaded = sdp.load_obj(digest)
|
|
# assert getattr(loaded, Serializer.ORIGIN) == digest
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_unique(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# result = sdp.add_unique(evt_digest, "entry", ObjNoKey(1, "foo"))
|
|
# assert result == SheerkaDataProviderResult(ObjNoKey(1, "foo"), "entry", None, None, False)
|
|
#
|
|
# result = sdp.add_unique(evt_digest, "entry", ObjNoKey(1, "foo"))
|
|
# assert result == SheerkaDataProviderResult(ObjNoKey(1, "foo"), "entry", None, None, True)
|
|
#
|
|
# result = sdp.add_unique(evt_digest, "entry", ObjNoKey(2, "bar"))
|
|
# assert result == SheerkaDataProviderResult(ObjNoKey(2, "bar"), "entry", None, None, False)
|
|
#
|
|
# result = sdp.add_unique(evt_digest, "entry", ObjNoKey(2, "bar"))
|
|
# assert result == SheerkaDataProviderResult(ObjNoKey(2, "bar"), "entry", None, None, True)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {ObjNoKey(1, "foo"), ObjNoKey(2, "bar")}}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_keep_state_history(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# event1 = Event("cmd add 'foo => bar'")
|
|
# event_digest1 = sdp.save_event(event1)
|
|
# obj1 = "foo => bar"
|
|
# sdp.add(event_digest1, "entry1", obj1)
|
|
# state_digest1 = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
#
|
|
# event2 = Event("cmd add 'foo => baz'")
|
|
# event_digest2 = sdp.save_event(event2)
|
|
# obj2 = "foo => baz"
|
|
# sdp.add(event_digest2, "entry2", obj2)
|
|
# state_digest2 = sdp.get_snapshot(SheerkaDataProvider.HeadFile)
|
|
#
|
|
# state2 = sdp.load_state(state_digest2)
|
|
#
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.EventFolder, event_digest1[0:24], event_digest1))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, state_digest1[0:24], state_digest1))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.EventFolder, event_digest2[0:24], event_digest2))
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.StateFolder, state_digest2[0:24], state_digest2))
|
|
# assert state2.date is not None
|
|
# assert state2.parents == [state_digest1]
|
|
# assert state2.events == [event_digest2]
|
|
# assert state2.data == {"entry1": "foo => bar", "entry2": "foo => baz"}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_elements_when_there_is_nothing_to_list(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# result = sdp.list("entry")
|
|
#
|
|
# assert list(result) == []
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_no_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda obj: isinstance(obj, str)))
|
|
#
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry1", "bar")
|
|
# sdp.add(evt_digest, "entry1", "baz", use_ref=True)
|
|
# sdp.add(evt_digest, "entry2", "xyz")
|
|
#
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == ["foo", "bar", "baz"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda obj: isinstance(obj, ObjWithKey)))
|
|
#
|
|
# sdp.add(evt_digest, "entry1", {"1": "foo"})
|
|
# sdp.add(evt_digest, "entry1", {"2": "bar"})
|
|
# sdp.add(evt_digest, "entry1", ObjWithKey("3", "value"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry2", {"4": "xxx"})
|
|
#
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == ["foo", "bar", ObjWithKey("3", "value")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_one_element(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry2", "baz")
|
|
#
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == ["foo"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_multiple_entries_under_the_same_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "c"))
|
|
#
|
|
# result = sdp.list("entry")
|
|
# assert list(result) == [[ObjWithKey("a", "b"), ObjWithKey("a", "c")]]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_multiple_entries_under_the_same_key_when_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda obj: isinstance(obj, ObjWithKey)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "b"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("a", "c"), use_ref=True)
|
|
#
|
|
# result = sdp.list("entry")
|
|
# assert list(result) == [[ObjWithKey("a", "b"), ObjWithKey("a", "c")]]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_multiple_entries_under_the_same_entry(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "b"))
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "c"))
|
|
#
|
|
# result = sdp.list("entry")
|
|
# assert list(result) == [ObjNoKey("a", "b"), ObjNoKey("a", "c")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_list_when_multiple_entries_under_the_same_entry_when_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda obj: isinstance(obj, ObjNoKey)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "b"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjNoKey("a", "c"), use_ref=True)
|
|
#
|
|
# result = sdp.list("entry")
|
|
# assert list(result) == [ObjNoKey("a", "b"), ObjNoKey("a", "c")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_on_key_for_dict(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": "foo"})
|
|
# sdp.add(evt_digest, "entry1", {"2": "bar"})
|
|
#
|
|
# result = sdp.list("entry1", lambda k, o: k == "1")
|
|
#
|
|
# assert list(result) == ["foo"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_on_key_for_objects(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", ObjWithKey("a1", "b1"))
|
|
# sdp.add(evt_digest, "entry1", ObjWithKey("a2", "b2"))
|
|
#
|
|
# result = sdp.list("entry1", lambda k, o: k == "a1")
|
|
#
|
|
# assert list(result) == [ObjWithKey("a1", "b1")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_on_attribute_for_dict(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": {"a": "a1", "b": "b1"}})
|
|
# sdp.add(evt_digest, "entry1", {"2": {"a": "a2", "b": "b2"}})
|
|
#
|
|
# result = sdp.list("entry1", lambda k, o: o["a"] == "a2")
|
|
#
|
|
# assert list(result) == [{"a": "a2", "b": "b2"}]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_on_attribute_for_object(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", ObjWithKey("a1", "b1"))
|
|
# sdp.add(evt_digest, "entry1", ObjWithKey("a2", "b2"))
|
|
#
|
|
# result = sdp.list("entry1", lambda k, o: o.b == "b2")
|
|
#
|
|
# assert list(result) == [ObjWithKey("a2", "b2")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_a_list(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry1", "bar")
|
|
#
|
|
# result = sdp.list("entry1", lambda o: o == "bar")
|
|
#
|
|
# assert list(result) == ["bar"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_filter_a_list_of_object(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", ObjNoKey("a1", "b1"))
|
|
# sdp.add(evt_digest, "entry1", ObjNoKey("a2", "b2"))
|
|
#
|
|
# result = sdp.list("entry1", lambda o: o.b == "b1")
|
|
#
|
|
# assert list(result) == [ObjNoKey("a1", "b1")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_remove_all_elements(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry1", "bar")
|
|
#
|
|
# state_digest = sdp.remove(evt_digest, "entry1")
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert sdp.io.read_text(path.join(sdp.io.root, SheerkaDataProvider.HeadFile)) == state_digest
|
|
# assert list(result) == []
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_remove_a_element(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry1", "bar")
|
|
#
|
|
# sdp.remove(evt_digest, "entry1", lambda o: o == "foo")
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == ["bar"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_remove_dict_by_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": ObjNoKey("a1", "b1")})
|
|
# sdp.add(evt_digest, "entry1", {"2": ObjNoKey("a2", "b2")})
|
|
#
|
|
# sdp.remove(evt_digest, "entry1", lambda k, o: k == "2")
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == [ObjNoKey("a1", "b1")]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_remove_when_only_one_element(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
#
|
|
# sdp.remove(evt_digest, "entry1", lambda o: o == "foo")
|
|
# result = sdp.list("entry1")
|
|
#
|
|
# assert list(result) == []
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_remove_if_entry_does_not_exist(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# with pytest.raises(IndexError) as e:
|
|
# sdp.remove(evt_digest, "entry", silent_remove=False)
|
|
# assert str(e.value) == "entry"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_no_exception_is_raise_when_remove_in_silent_mode(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.remove(evt_digest, "entry", silent_remove=True) # default
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_modify_an_entry_without_a_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError) as error:
|
|
# sdp.modify(evt_digest, "entry", None, "baz")
|
|
#
|
|
# assert error.value.args[0] == "Key is mandatory."
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_dict_with_a_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", {"key1": "foo"})
|
|
# sdp.add(evt_digest, "entry", {"key2": "bar"})
|
|
#
|
|
# result = sdp.modify(evt_digest, "entry", "key1", "baz")
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"key1": "baz", "key2": "bar"}}
|
|
# assert result.obj == "baz"
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_with_a_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key1", "foo"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key2", "bar"))
|
|
#
|
|
# result = sdp.modify(evt_digest, "entry", "key1", ObjWithKey("key1", "baz"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
#
|
|
# assert state.data == {"entry": {"key1": ObjWithKey("key1", "baz"), "key2": ObjWithKey("key2", "bar")}}
|
|
# assert result.obj == ObjWithKey("key1", "baz")
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_while_changing_the_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key1", "foo"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key2", "bar"))
|
|
#
|
|
# result = sdp.modify(evt_digest, "entry", "key1", ObjWithKey("key3", "baz"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"key2": ObjWithKey("key2", "bar"), "key3": ObjWithKey("key3", "baz")}}
|
|
# assert result.obj == ObjWithKey("key3", "baz")
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key3"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_while_changing_the_key_to_an_existing_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key1", "foo"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key2", "bar"))
|
|
#
|
|
# result = sdp.modify(evt_digest, "entry", "key2", ObjWithKey("key1", "bar"))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"key1": [ObjWithKey("key1", "foo"), ObjWithKey("key1", "bar")]}}
|
|
# assert result.obj == ObjWithKey("key1", "bar")
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_while_changing_the_key_to_an_existing_when_list(root):
|
|
# """
|
|
# In this example, the item to modify is within a list, and its key has changed
|
|
# and in the new key, there is already a list
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key1", "value11"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key1", "value12"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value21"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value22"))
|
|
#
|
|
# new_value = ObjDumpJson("key1", "value13")
|
|
# setattr(new_value, Serializer.ORIGIN, ObjDumpJson("key2", "value21").get_digest())
|
|
# result = sdp.modify(evt_digest, "entry", "key2", new_value)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# "key1": [ObjDumpJson("key1", "value11"), ObjDumpJson("key1", "value12"), ObjDumpJson("key1", "value13")],
|
|
# "key2": [ObjDumpJson("key2", "value22")]
|
|
# }}
|
|
# assert result.obj == new_value
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_while_changing_the_key_to_an_existing_when_nothing(root):
|
|
# """
|
|
# In this example, the item to modify is within a list, and its key has changed
|
|
# and in the new key, there is nothing (the new key does not exist)
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjDumpJson)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value21"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value22"))
|
|
#
|
|
# new_value = ObjDumpJson("key1", "value13")
|
|
# setattr(new_value, Serializer.ORIGIN, ObjDumpJson("key2", "value21").get_digest())
|
|
# result = sdp.modify(evt_digest, "entry", "key2", new_value)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# "key1": ObjDumpJson("key1", "value13"),
|
|
# "key2": [ObjDumpJson("key2", "value22")]
|
|
# }}
|
|
# assert result.obj == new_value
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_while_changing_the_key_to_an_existing_when_one_item(root):
|
|
# """
|
|
# In this example, the item to modify is within a list, and its key has changed
|
|
# and in the new key, there is only one element
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjDumpJson)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key1", "value11"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value21"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key2", "value22"))
|
|
#
|
|
# new_value = ObjDumpJson("key1", "value13")
|
|
# setattr(new_value, Serializer.ORIGIN, ObjDumpJson("key2", "value21").get_digest())
|
|
# result = sdp.modify(evt_digest, "entry", "key2", new_value)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# "key1": [ObjDumpJson("key1", "value11"), ObjDumpJson("key1", "value13")],
|
|
# "key2": [ObjDumpJson("key2", "value22")]
|
|
# }}
|
|
# assert result.obj == new_value
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_a_object_saved_by_ref(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithKey)))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key1", "foo"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key2", "bar"), use_ref=True)
|
|
#
|
|
# result = sdp.modify(evt_digest, "entry", "key2", ObjWithKey("key2", "baz"))
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# "key1": ObjWithKey("key1", "foo"),
|
|
# "key2": "##REF##:041d3cca905b51bc2c66251e73e56b836aae7b9435ee3d7eb05d44bb67ff575e"}}
|
|
# assert result.obj == ObjWithKey("key2", "baz")
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key2"
|
|
# assert result.digest == "041d3cca905b51bc2c66251e73e56b836aae7b9435ee3d7eb05d44bb67ff575e"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_an_object_saved_by_ref_in_a_list(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjDumpJsonNoDigest)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJsonNoDigest("key1", "value11"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjDumpJsonNoDigest("key1", "value12"), use_ref=True)
|
|
# result = sdp.add(evt_digest, "entry", ObjDumpJsonNoDigest("key2", "value21"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjDumpJsonNoDigest("key2", "value22"), use_ref=True)
|
|
#
|
|
# new_value = ObjDumpJsonNoDigest("key1", "value13")
|
|
# setattr(new_value, Serializer.ORIGIN, result.digest)
|
|
# result = sdp.modify(evt_digest, "entry", "key2", new_value)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# 'key1': ['##REF##:f80a0c0aceb1a7a3d238c0cff2d86d6bd3a62e0c1a65c5b505f43b10c4604bd8',
|
|
# '##REF##:239a8238d188c37afa10b1bcc312ca8a0e78f6e75d688ca65d08e16717ff68b0',
|
|
# '##REF##:9d0a2bf9d4081de0b14837ea46bc7a1cfb6b7562f7ae86255ea9bd0ac53a6437'],
|
|
# 'key2': ['##REF##:df8a38b07f469f2ff8001ea6a70f77f4f9ce85d69c530091fcaf4b380f1500d3']
|
|
# }}
|
|
# assert result.obj == new_value
|
|
# assert result.entry == "entry"
|
|
# assert result.key == "key1"
|
|
# assert result.digest is not None
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_a_data_provider_ref(root):
|
|
# # first, create a valid entry
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# obj = ObjWithDigestWithKey("1", "foo")
|
|
# sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# sdp.add(evt_digest, "entry_ref", SheerkaDataProviderRef(obj.b, obj.get_digest()))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {
|
|
# "entry": {"1": "##REF##:1foo"},
|
|
# "entry_ref": {"foo": SheerkaDataProviderRef(obj.b, obj.get_digest())}}
|
|
#
|
|
# # modify this entry
|
|
# obj_new = ObjWithDigestWithKey("1", "bar")
|
|
# sdp.modify(evt_digest, "entry", obj_new.a, obj_new)
|
|
# result = sdp.modify(evt_digest, "entry_ref", "foo", SheerkaDataProviderRef(obj.b, obj_new.get_digest()))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {
|
|
# "entry": {"1": "##REF##:1bar"},
|
|
# "entry_ref": {"foo": SheerkaDataProviderRef(obj.b, obj_new.get_digest())}}
|
|
#
|
|
# assert result.obj == SheerkaDataProviderRef(obj.b, obj_new.get_digest())
|
|
# assert result.entry == "entry_ref"
|
|
# assert result.key == "foo"
|
|
# assert result.digest is None # digest is not set as what is saved (the digest) is not saved by ref
|
|
#
|
|
# # sanity check, I can load the modified entry
|
|
# loaded = sdp.get("entry_ref", "foo")
|
|
# assert loaded == ObjWithDigestWithKey("1", "bar")
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_is_ref_when_in_list(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# ref_result1 = sdp.add(evt_digest, "entry", ObjWithDigestWithKey(1, "foo"), use_ref=True)
|
|
# ref_result2 = sdp.add(evt_digest, "entry", ObjWithDigestWithKey(2, "bar"), use_ref=True)
|
|
#
|
|
# sdp.add(evt_digest, "entry_ref", SheerkaDataProviderRef("1", ref_result1.digest))
|
|
# sdp.add(evt_digest, "entry_ref", SheerkaDataProviderRef("1", ref_result2.digest))
|
|
#
|
|
# ref_result3 = sdp.add(evt_digest, "entry", ObjWithDigestWithKey(3, "baz"), use_ref=True)
|
|
#
|
|
# result = sdp.modify(
|
|
# evt_digest,
|
|
# "entry_ref",
|
|
# "1",
|
|
# SheerkaDataProviderRef("1", ref_result3.digest, ref_result2.digest))
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {'entry': {'1': '##REF##:1foo', '2': '##REF##:2bar', '3': '##REF##:3baz'},
|
|
# 'entry_ref': {'1': [
|
|
# SheerkaDataProviderRef("1", ref_result1.digest),
|
|
# SheerkaDataProviderRef("1", ref_result3.digest, ref_result2.digest)]}}
|
|
#
|
|
# loaded = sdp.get("entry_ref", "1")
|
|
# assert len(loaded) == 2
|
|
# assert loaded[0] == ObjWithDigestWithKey(1, "foo")
|
|
# assert loaded[1] == ObjWithDigestWithKey(3, "baz")
|
|
#
|
|
# assert result.obj == SheerkaDataProviderRef("1", ref_result3.digest, ref_result2.digest)
|
|
# assert result.entry == "entry_ref"
|
|
# assert result.key == "1"
|
|
# assert result.digest is None # digest is not set as what is saved (the digest) is not saved by ref
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_modify_an_entry_that_does_not_exist(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# with pytest.raises(IndexError) as e:
|
|
# sdp.modify(evt_digest, "entry", "key", "foo")
|
|
#
|
|
# assert str(e.value) == "entry"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_modify_a_key_that_does_not_exist(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": "foo"})
|
|
#
|
|
# with pytest.raises(IndexError) as e:
|
|
# sdp.modify(evt_digest, "entry1", "2", "bar")
|
|
# assert str(e.value) == "entry1.2"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_modify_a_list_when_origin_is_unknown(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key", "value1"))
|
|
# sdp.add(evt_digest, "entry", ObjWithKey("key", "value2")) # same they
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError) as error:
|
|
# sdp.modify(evt_digest, "entry", "key", ObjWithKey("key", "value2"))
|
|
#
|
|
# assert error.value.obj == ObjWithKey("key", "value2")
|
|
# assert error.value.args[0] == "Multiple entries under 'entry.key'"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_a_list_when_the_origin_is_known(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value1"))
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value2")) # same they
|
|
#
|
|
# new_value = ObjDumpJson("key", "value3")
|
|
# setattr(new_value, Serializer.ORIGIN, ObjDumpJson("key", "value1").get_digest())
|
|
#
|
|
# sdp.modify(evt_digest, "entry", "key", new_value)
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"key": [ObjDumpJson("key", "value3"), ObjDumpJson("key", "value2")]}}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_modify_a_list_when_the_origin_is_known_2(root):
|
|
# """
|
|
# This time, we check that the origin is automatically set when the object was saved as a reference
|
|
# We also check that all objects are still persisted as reference
|
|
# :return:
|
|
# """
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjDumpJson)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value1"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value2"), use_ref=True) # same they
|
|
#
|
|
# objs = sdp.get("entry", "key") # origin is automatically set to the loaded objects
|
|
# objs[0].value = "value3"
|
|
#
|
|
# sdp.modify(evt_digest, "entry", "key", objs[0])
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {"key": [
|
|
# "##REF##:621771a3af6a331e9abb3a63fb25e0cac4b13df0b292dfa30db6bd89031bfad0",
|
|
# "##REF##:5fe085e8366d35c5f04a18b2d3dada376128b246e07c66de5872830b00f5f517"]}}
|
|
#
|
|
# # checks that all objects are (still) persisted
|
|
# sdp.io.exists(
|
|
# sdp.io.get_obj_path(sdp.ObjectsFolder, "621771a3af6a331e9abb3a63fb25e0cac4b13df0b292dfa30db6bd89031bfad0"))
|
|
# sdp.io.exists(
|
|
# sdp.io.get_obj_path(sdp.ObjectsFolder, "5fe085e8366d35c5f04a18b2d3dada376128b246e07c66de5872830b00f5f517"))
|
|
# sdp.io.exists(
|
|
# sdp.io.get_obj_path(sdp.ObjectsFolder, "1aac9e0d5c74c3bb989fd0f9def792bba36c5595d32f61be7cbb1a38dcf75327"))
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_the_entire_entry(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
# sdp.add(evt_digest, "entry1", "bar")
|
|
#
|
|
# result = sdp.get("entry1")
|
|
# result_safe = sdp.get_safe("entry1")
|
|
#
|
|
# assert result == ["foo", "bar"]
|
|
# assert result_safe == ["foo", "bar"]
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_an_entry_with_on_object(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", "foo")
|
|
#
|
|
# result = sdp.get("entry1")
|
|
# result_safe = sdp.get_safe("entry1")
|
|
#
|
|
# assert result == "foo"
|
|
# assert result_safe == "foo"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_an_entry_by_key(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": "foo"})
|
|
# sdp.add(evt_digest, "entry1", {"2": "bar"})
|
|
#
|
|
# result = sdp.get("entry1", "2")
|
|
# result_safe = sdp.get_safe("entry1", "2")
|
|
#
|
|
# assert result == "bar"
|
|
# assert result_safe == "bar"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_object_saved_by_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
# sdp.serializer.register(JsonSerializer(core.utils.get_full_qualified_name(obj)))
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert loaded == obj
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_get_objects_from_list_when_saved_by_reference(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjDumpJson)))
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value1"), use_ref=True)
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("key", "value2"), use_ref=True) # same they
|
|
#
|
|
# objs = sdp.get("entry", "key")
|
|
#
|
|
# assert objs[0] == ObjDumpJson("key", "value1")
|
|
# assert objs[1] == ObjDumpJson("key", "value2")
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_get_an_entry_that_does_not_exist(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# assert sdp.get_safe("entry") is None
|
|
# with pytest.raises(IndexError) as e:
|
|
# sdp.get("entry")
|
|
# assert str(e.value) == "entry"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_cannot_get_a_key_that_does_not_exist(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.add(evt_digest, "entry1", {"1": "foo"})
|
|
#
|
|
# assert sdp.get_safe("entry1", "2") is None
|
|
# with pytest.raises(IndexError) as e:
|
|
# sdp.get("entry1", "2")
|
|
# assert str(e.value) == "entry1.2"
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_save_and_retrieve_cache(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# txt = "foo bar baz foo bar baz foo bar baz"
|
|
# key = "key_to_use"
|
|
# category = "cache_category"
|
|
#
|
|
# assert not sdp.in_cache(category, key)
|
|
# digest = sdp.add_to_cache(category, key, txt)
|
|
# assert sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.CacheFolder, digest[0:24], digest))
|
|
# assert sdp.in_cache(category, key)
|
|
#
|
|
# from_cache = sdp.load_from_cache(category, key)
|
|
# assert from_cache == txt
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_cache_is_not_updated_by_default(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# txt = "foo bar baz foo bar baz foo bar baz"
|
|
# txt2 = "foo foo foo foo foo foo foo foo foo"
|
|
# key = "key_to_use"
|
|
# category = "cache_category"
|
|
#
|
|
# sdp.add_to_cache(category, key, txt)
|
|
# sdp.add_to_cache(category, key, txt2)
|
|
#
|
|
# from_cache = sdp.load_from_cache(category, key)
|
|
# assert from_cache == txt
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_update_cache(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# txt = "foo bar baz foo bar baz foo bar baz"
|
|
# txt2 = "foo foo foo foo foo foo foo foo foo"
|
|
# key = "key_to_use"
|
|
# category = "cache_category"
|
|
#
|
|
# sdp.add_to_cache(category, key, txt)
|
|
# sdp.add_to_cache(category, key, txt2, update=True)
|
|
#
|
|
# from_cache = sdp.load_from_cache(category, key)
|
|
# assert from_cache == txt2
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_remove_from_cache(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# txt = "foo bar baz foo bar baz foo bar baz"
|
|
# key = "key_to_use"
|
|
# category = "cache_category"
|
|
#
|
|
# sdp.add_to_cache(category, key, txt)
|
|
# digest = sdp.remove_from_cache(category, key)
|
|
# assert not sdp.io.exists(path.join(sdp.io.root, SheerkaDataProvider.CacheFolder, digest[0:24], digest))
|
|
# assert not sdp.in_cache(category, key)
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_test_than_an_entry_exists(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
#
|
|
# assert not sdp.exists("entry")
|
|
# sdp.add(evt_digest, "entry", "value")
|
|
# assert sdp.exists("entry")
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_test_if_a_key_exists(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjWithDigestWithKey("key", "value")
|
|
#
|
|
# assert not sdp.exists("entry")
|
|
# assert not sdp.exists("entry", obj.get_key())
|
|
#
|
|
# sdp.add(evt_digest, "entry", obj)
|
|
# assert not sdp.exists("entry", "wrong_key")
|
|
# assert sdp.exists("entry", obj.get_key())
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_test_that_the_object_exists(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjWithDigestWithKey("key", "value")
|
|
#
|
|
# assert not sdp.exists("entry")
|
|
# assert not sdp.exists("entry", obj.get_key())
|
|
# assert not sdp.exists("entry", obj.get_key(), obj.get_digest())
|
|
#
|
|
# # test for a single item under the key
|
|
# sdp.add(evt_digest, "entry", obj)
|
|
# assert not sdp.exists("entry", obj.get_key(), "wrong_digest")
|
|
# assert sdp.exists("entry", obj.get_key(), obj.get_digest())
|
|
#
|
|
# # test for a list item under the key
|
|
# sdp.add(evt_digest, "entry2", ObjWithDigestWithKey("key", "value2"))
|
|
# assert not sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
# sdp.add(evt_digest, "entry2", ObjWithDigestWithKey("key", "value3"))
|
|
# assert not sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
# sdp.add(evt_digest, "entry2", obj)
|
|
# assert sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_test_than_the_object_exists_when_using_references(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# sdp.serializer.register(PickleSerializer(lambda o: isinstance(o, ObjWithDigestWithKey)))
|
|
# obj = ObjWithDigestWithKey("key", "value")
|
|
#
|
|
# assert not sdp.exists("entry")
|
|
# assert not sdp.exists("entry", obj.get_key())
|
|
# assert not sdp.exists("entry", obj.get_key(), obj.get_digest())
|
|
#
|
|
# # test for a single item under the key
|
|
# sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# assert not sdp.exists("entry", obj.get_key(), "wrong_digest")
|
|
# assert sdp.exists("entry", obj.get_key(), obj.get_digest())
|
|
#
|
|
# # test for a list item under the key
|
|
# sdp.add(evt_digest, "entry2", ObjWithDigestWithKey("key", "value2"), use_ref=True)
|
|
# assert not sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
# sdp.add(evt_digest, "entry2", ObjWithDigestWithKey("key", "value3"), use_ref=True)
|
|
# assert not sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
# sdp.add(evt_digest, "entry2", obj, use_ref=True)
|
|
# assert sdp.exists("entry2", obj.get_key(), obj.get_digest())
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_save_and_load_object_ref_with_history(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
# sdp.serializer.register(JsonSerializer(core.utils.get_full_qualified_name(obj)))
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
# history = getattr(loaded, Serializer.HISTORY)
|
|
#
|
|
# assert result.obj == obj
|
|
# assert result.entry == "entry"
|
|
# assert result.key == obj.key
|
|
# assert result.digest == obj.get_digest()
|
|
#
|
|
# assert loaded.key == obj.key
|
|
# assert loaded.value == obj.value
|
|
#
|
|
# assert history[Serializer.USERNAME] == "kodjo"
|
|
# assert history[Serializer.MODIFICATION_DATE] != ""
|
|
# assert history[Serializer.PARENTS] == []
|
|
#
|
|
# assert sdp.io.exists(sdp.io.get_obj_path(sdp.ObjectsFolder, obj.get_digest()))
|
|
#
|
|
# # save a second type with no modification
|
|
# previous_modification_time = history[Serializer.MODIFICATION_DATE]
|
|
# previous_parents = history[Serializer.PARENTS]
|
|
#
|
|
# sdp.modify(evt_digest, "entry", result.key, loaded)
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
# history = getattr(loaded, Serializer.HISTORY)
|
|
#
|
|
# assert history[Serializer.MODIFICATION_DATE] == previous_modification_time
|
|
# assert history[Serializer.PARENTS] == previous_parents
|
|
#
|
|
# # save again, but with a modification
|
|
# previous_digest = loaded.get_digest()
|
|
# loaded.value = "value2"
|
|
#
|
|
# sdp.modify(evt_digest, "entry", result.key, loaded)
|
|
# loaded2 = sdp.get(result.entry, result.key)
|
|
# history2 = getattr(loaded2, Serializer.HISTORY)
|
|
#
|
|
# assert loaded2.key == loaded.key
|
|
# assert loaded2.value == loaded.value
|
|
#
|
|
# assert history2[Serializer.USERNAME] == "kodjo"
|
|
# assert history2[Serializer.MODIFICATION_DATE] != ""
|
|
# assert history2[Serializer.PARENTS] == [previous_digest]
|
|
#
|
|
# state = sdp.load_state(sdp.get_snapshot(SheerkaDataProvider.HeadFile))
|
|
# assert state.data == {"entry": {
|
|
# "my_key": '##REF##:e6bf5b56428cfce0f08c94f2c3625dc3b3a8180d7229eaa9f8aa967fb16e5256'}}
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_can_add_obj_with_same_key_and_get_them_back(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj1 = ObjDumpJson("key", "value1")
|
|
# obj2 = ObjDumpJson("key", "value2")
|
|
# sdp.serializer.register(JsonSerializer(core.utils.get_full_qualified_name(obj1)))
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj1, use_ref=True)
|
|
# sdp.add(evt_digest, "entry", obj2, use_ref=True)
|
|
#
|
|
# loaded = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert len(loaded) == 2
|
|
# assert loaded[0] == obj1
|
|
# assert loaded[1] == obj2
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_get_safe_dictionary_without_origin(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = {"my_key": "value1"}
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# from_db = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert len(from_db) == 2
|
|
# assert from_db["my_key"] == obj["my_key"]
|
|
# assert Serializer.ORIGIN in from_db
|
|
#
|
|
# from_db_no_origin = sdp.get_safe(result.entry, result.key, load_origin=False)
|
|
# assert len(from_db_no_origin) == 1
|
|
# assert from_db_no_origin["my_key"] == obj["my_key"]
|
|
# assert Serializer.ORIGIN not in from_db_no_origin
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_get_dictionary_without_origin(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = {"my_key": "value1"}
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# from_db = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert len(from_db) == 2
|
|
# assert from_db["my_key"] == obj["my_key"]
|
|
# assert Serializer.ORIGIN in from_db
|
|
#
|
|
# from_db_no_origin = sdp.get(result.entry, result.key, load_origin=False)
|
|
# assert len(from_db_no_origin) == 1
|
|
# assert from_db_no_origin["my_key"] == obj["my_key"]
|
|
# assert Serializer.ORIGIN not in from_db_no_origin
|
|
#
|
|
#
|
|
# @pytest.mark.parametrize("root", [
|
|
# ".sheerka",
|
|
# "mem://"
|
|
# ])
|
|
# def test_i_get_safe_object_without_origin(root):
|
|
# sdp = SheerkaDataProvider(root)
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
# from_db = sdp.get(result.entry, result.key)
|
|
#
|
|
# assert from_db == obj
|
|
# assert hasattr(from_db, Serializer.ORIGIN)
|
|
#
|
|
# from_db_no_origin = sdp.get_safe(result.entry, result.key, load_origin=False)
|
|
# assert from_db_no_origin == obj
|
|
# assert not hasattr(from_db_no_origin, Serializer.ORIGIN)
|
|
#
|
|
#
|
|
# def test_i_can_get_ref():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(obj))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=True)
|
|
#
|
|
# ref = sdp.get_ref(result.entry, result.key)
|
|
# assert ref == "076f0df0f110c304982242a88088efacce71f361e49f065db75919a7f72c2821"
|
|
#
|
|
#
|
|
# def test_i_can_get_ref_when_list():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
#
|
|
# obj_serializer = JsonSerializer(core.utils.get_full_qualified_name(ObjDumpJson))
|
|
# sdp.serializer.register(obj_serializer)
|
|
#
|
|
# sdp.add(evt_digest, "entry", ObjDumpJson("my_key", "value1"), use_ref=True)
|
|
# result = sdp.add(evt_digest, "entry", ObjDumpJson("my_key", "value2"), use_ref=True)
|
|
#
|
|
# ref = sdp.get_ref(result.entry, result.key)
|
|
# assert ref == [
|
|
# "076f0df0f110c304982242a88088efacce71f361e49f065db75919a7f72c2821",
|
|
# "e6bf5b56428cfce0f08c94f2c3625dc3b3a8180d7229eaa9f8aa967fb16e5256"
|
|
# ]
|
|
#
|
|
#
|
|
# def test_i_cannot_get_ref_if_the_saved_item_is_not_a_ref():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
# obj = ObjDumpJson("my_key", "value1")
|
|
# result = sdp.add(evt_digest, "entry", obj, use_ref=False)
|
|
#
|
|
# with pytest.raises(SheerkaDataProviderError) as e:
|
|
# sdp.get_ref(result.entry, result.key)
|
|
#
|
|
# assert e.value.args[0] == "Not a reference"
|
|
# assert e.value.obj == f"{result.entry}.{result.key}"
|
|
#
|
|
#
|
|
# def test_i_cannot_get_ref_if_the_item_does_not_exist():
|
|
# sdp = SheerkaDataProvider("mem://")
|
|
# with pytest.raises(IndexError):
|
|
# sdp.get_ref("fake", "fake")
|