e41094f908
Fixed #12 Fixed #13 Fixed #14
1523 lines
65 KiB
Python
1523 lines
65 KiB
Python
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
|
|
from base import BaseTest
|
|
from caching.Cache import Cache
|
|
from caching.DictionaryCache import DictionaryCache
|
|
from caching.IncCache import IncCache
|
|
from caching.ListCache import ListCache
|
|
from caching.ListIfNeededCache import ListIfNeededCache
|
|
from common.global_symbols import NotFound, Removed
|
|
from helpers import get_metadata, get_metadatas
|
|
from ontologies.Exceptions import OntologyAlreadyExists, OntologyManagerCannotPopLatest, OntologyManagerFrozen, \
|
|
OntologyManagerNotFrozen, OntologyNotFound
|
|
from ontologies.SheerkaOntologyManager import Ontology, SheerkaOntologyManager
|
|
|
|
|
|
@dataclass
|
|
class DummyObj:
|
|
key: str
|
|
value: object
|
|
|
|
|
|
class TestSheerkaOntology(BaseTest):
|
|
@pytest.fixture()
|
|
def manager(self, sheerka):
|
|
return SheerkaOntologyManager(sheerka, "mem://")
|
|
|
|
@staticmethod
|
|
def init_by_id_and_by_name(manager):
|
|
"""
|
|
initialize a manager with concept caches for by id and by name
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
cache = Cache().auto_configure("by_id")
|
|
manager.register_concept_cache("by_id", cache, lambda obj: obj.id, use_ref=True)
|
|
cache = ListIfNeededCache().auto_configure("by_key")
|
|
manager.register_concept_cache("by_key", cache, lambda obj: obj.key, use_ref=True)
|
|
manager.freeze()
|
|
return manager
|
|
|
|
def test_i_can_create_ontology_manager(self, manager):
|
|
assert len(manager.ontologies) == 1
|
|
assert manager.ontologies_names == [SheerkaOntologyManager.ROOT_ONTOLOGY_NAME]
|
|
|
|
# current cache manager and current sdp point to the top of the list
|
|
assert id(manager.current_cache_manager()) == id(manager.ontologies[0].cache_manager)
|
|
assert id(manager.current_sdp()) == id(manager.ontologies[0].cache_manager.sdp)
|
|
|
|
# No cache defined by default
|
|
assert manager.current_cache_manager().caches == {}
|
|
|
|
def test_i_can_register_a_cache_and_get_data(self, context, manager):
|
|
cache = Cache()
|
|
manager.register_cache("test", cache)
|
|
|
|
assert id(cache._sdp) == id(manager.current_sdp()) # sdp is automatically added to the new Cache
|
|
|
|
manager.put("test", "key", "value")
|
|
assert manager.get("test", "key") == "value"
|
|
assert manager.current_sdp().get("test", "key") == NotFound # not yet committed
|
|
|
|
manager.commit(context)
|
|
assert manager.get("test", "key") == "value"
|
|
assert manager.current_sdp().get("test", "key") == "value"
|
|
|
|
def test_i_can_no_longer_register_cache_once_ontology_is_frozen(self, manager):
|
|
manager.freeze()
|
|
|
|
with pytest.raises(OntologyManagerFrozen):
|
|
manager.register_cache("test", Cache())
|
|
|
|
with pytest.raises(OntologyManagerFrozen):
|
|
manager.register_concept_cache("test", Cache(), lambda obj: obj.key, True)
|
|
|
|
def test_i_cannot_push_ontology_if_not_frozen(self, manager):
|
|
with pytest.raises(OntologyManagerNotFrozen):
|
|
manager.push_ontology("new_ontology")
|
|
|
|
def test_i_can_push_ontology_from_simple_caches(self, manager):
|
|
"""
|
|
Once registered, the same cache are created every time a new ontology is added or pushed
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
manager.register_cache("Cache", Cache(), persist=True, use_ref=True)
|
|
manager.register_cache("DictionaryCache", DictionaryCache(), persist=True, use_ref=False)
|
|
manager.register_cache("ListIfNeededCache", ListIfNeededCache(), persist=False, use_ref=False)
|
|
manager.register_cache("ListCache", ListCache(), persist=False, use_ref=True)
|
|
manager.register_cache("IncCache", IncCache(), False)
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("new_ontology")
|
|
|
|
assert len(manager.ontologies) == 2
|
|
ref_cache_manager = manager.ontologies[1].cache_manager
|
|
assert ref_cache_manager.sdp.name == "__default__"
|
|
|
|
cache_manager_0 = manager.ontologies[0].cache_manager
|
|
assert len(cache_manager_0.caches) == 5
|
|
assert cache_manager_0.concept_caches == []
|
|
|
|
# check that the definition of the newly created caches are the same thant the ref
|
|
for cache_name in ("Cache", "DictionaryCache", "ListIfNeededCache", "ListCache", "IncCache"):
|
|
cache_def = cache_manager_0.caches[cache_name]
|
|
assert cache_def.persist == ref_cache_manager.caches[cache_name].persist
|
|
assert cache_def.use_ref == ref_cache_manager.caches[cache_name].use_ref
|
|
assert type(cache_def.cache) == type(ref_cache_manager.caches[cache_name].cache)
|
|
assert cache_def.cache._sdp.name == "new_ontology"
|
|
|
|
def test_i_can_push_multiple_ontologies(self, manager):
|
|
manager.freeze()
|
|
manager.push_ontology("ontology 1")
|
|
manager.push_ontology("ontology 2")
|
|
|
|
assert len(manager.ontologies) == 3
|
|
assert manager.ontologies[0].name == "ontology 2"
|
|
assert manager.ontologies[0].depth == 2
|
|
|
|
assert manager.ontologies[1].name == "ontology 1"
|
|
assert manager.ontologies[1].depth == 1
|
|
|
|
assert manager.ontologies[2].name == SheerkaOntologyManager.ROOT_ONTOLOGY_NAME
|
|
assert manager.ontologies[2].depth == 0
|
|
|
|
def test_i_can_push_ontology_from_concept_caches(self, manager):
|
|
"""
|
|
Same test than before, but for cache definitions
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
self.init_by_id_and_by_name(manager)
|
|
|
|
manager.push_ontology("new_ontology")
|
|
assert len(manager.ontologies) == 2
|
|
|
|
ref_cache_manager = manager.ontologies[1].cache_manager
|
|
cache_manager_0 = manager.ontologies[0].cache_manager
|
|
|
|
assert len(cache_manager_0.caches) == 2
|
|
assert cache_manager_0.concept_caches == ref_cache_manager.concept_caches
|
|
assert cache_manager_0.sdp.name == "new_ontology"
|
|
|
|
for cache_name in ("by_key", "by_id"):
|
|
cache_def = cache_manager_0.caches[cache_name]
|
|
assert cache_def.persist == ref_cache_manager.caches[cache_name].persist
|
|
assert cache_def.use_ref == ref_cache_manager.caches[cache_name].use_ref
|
|
assert type(cache_def.cache) == type(ref_cache_manager.caches[cache_name].cache)
|
|
assert cache_def.cache._sdp.name == "new_ontology"
|
|
|
|
def test_i_can_get_database_value(self, context, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
assert not manager.current_cache_manager().has("cache_name", "key")
|
|
assert manager.get("cache_name", "key") == "value"
|
|
assert manager.current_cache_manager().has("cache_name", "key")
|
|
|
|
def test_i_cannot_pop_ontology_when_not_frozen(self, context, manager):
|
|
with pytest.raises(OntologyManagerNotFrozen):
|
|
manager.pop_ontology(context)
|
|
|
|
def test_i_cannot_pop_the_latest_cache_manager(self, context, manager):
|
|
"""
|
|
Cannot pop when there is only one ontology left
|
|
:param manager:
|
|
:type manager:
|
|
:param context:
|
|
:type context:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
manager.freeze()
|
|
with pytest.raises(OntologyManagerCannotPopLatest):
|
|
manager.pop_ontology(context)
|
|
|
|
def test_i_can_pop_ontology(self, context, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("ontology1")
|
|
manager.push_ontology("ontology2")
|
|
manager.push_ontology("ontology3")
|
|
assert len(manager.ontologies) == 4
|
|
|
|
manager.pop_ontology(context)
|
|
assert len(manager.ontologies) == 3
|
|
|
|
manager.pop_ontology(context)
|
|
manager.pop_ontology(context)
|
|
with pytest.raises(OntologyManagerCannotPopLatest):
|
|
manager.pop_ontology(context)
|
|
|
|
def test_i_can_revert_ontology(self, context, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("ontology1")
|
|
ontology = manager.push_ontology("ontology2")
|
|
manager.push_ontology("ontology3")
|
|
manager.push_ontology("ontology4")
|
|
|
|
manager.revert_ontology(context, ontology)
|
|
assert len(manager.ontologies) == 2
|
|
assert manager.current_ontology().name == "ontology1"
|
|
|
|
def test_i_cannot_revert_when_ontology_is_already_popped(self, context, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("ontology1")
|
|
ontology = manager.push_ontology("ontology2")
|
|
manager.pop_ontology(context)
|
|
|
|
with pytest.raises(OntologyNotFound) as err:
|
|
manager.revert_ontology(context, ontology)
|
|
|
|
assert err.value.ontology == ontology
|
|
|
|
def test_i_cannot_revert_when_ontology_that_does_not_exists(self, context, manager):
|
|
manager.freeze()
|
|
manager.push_ontology("ontology1")
|
|
|
|
ontology = Ontology("fake", 0, manager.current_cache_manager(), None)
|
|
with pytest.raises(OntologyNotFound) as err:
|
|
manager.revert_ontology(context, ontology)
|
|
|
|
assert err.value.ontology == ontology
|
|
|
|
def test_i_can_push_ontology_to_override_values(self, context, manager):
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
# sanity check
|
|
manager.put("cache_name", "key", "value1")
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
# push an ontology and override the value
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key", "value2")
|
|
assert manager.get("cache_name", "key") == "value2"
|
|
|
|
# The new value is discarded when the ontology is removed
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
def test_i_cannot_get_values_from_parent_ontologies_if_default_parameter_is_not_set(self, context, manager):
|
|
"""
|
|
If this test, the property called `default` of the `Cache` is not defined.
|
|
The `default` must be defined and be a callable in order to make the magic happen
|
|
:param context:
|
|
:type context:
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
manager.put("cache_name", "key", "value1")
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
# push an ontology and try to get the value
|
|
manager.push_ontology("new ontology")
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
def test_i_can_get_values_from_parent_ontologies_if_default_parameter_is_set(self, context, manager):
|
|
"""
|
|
`default` and `alt_sdp_get` must be defined in order to allow parent request
|
|
There is no special constraint for that. It's just that I don't want to check every possible cases
|
|
`default` and `alt_sdp_get` will always be defined in Sheerka, so it's useless to code situations where
|
|
there are not.
|
|
:param context:
|
|
:type context:
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
manager.register_cache("cache_name", Cache(
|
|
default=lambda sdp, key: sdp.get("cache_name", key),
|
|
alt_sdp_get=lambda sdp, key: sdp.get("cache_name", key)))
|
|
|
|
manager.freeze()
|
|
manager.put("cache_name", "key", "value1")
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
# push an ontology and try to get the value
|
|
manager.push_ontology("new ontology")
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
def test_i_can_access_values_after_push_and_pop(self, context, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# put value in DB, but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value1")
|
|
assert not manager.current_cache_manager().has("cache_name", "key") # value not in cache
|
|
assert manager.current_sdp().exists("cache_name", "key") # but value is in DB
|
|
|
|
# add an ontology layer, and put in DB again, not in cache
|
|
manager.push_ontology("new ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value2")
|
|
|
|
# At this point, the value is in DB, but not in cache
|
|
assert not manager.current_cache_manager().has("cache_name", "key")
|
|
assert manager.get("cache_name", "key") == "value2" # taken from the current ontology
|
|
assert manager.current_cache_manager().has("cache_name", "key")
|
|
|
|
# sanity check
|
|
# Let's check sdp values
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value2'}}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': 'value1'}}
|
|
|
|
# remove a layer
|
|
manager.pop_ontology(context)
|
|
assert not manager.current_cache_manager().has("cache_name", "key") # value is no longer in cache
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
assert manager.current_cache_manager().has("cache_name", "key") # it's now in cache
|
|
|
|
# sanity check
|
|
# Let's check sdp values
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value1'}}
|
|
|
|
def test_i_can_add_ontology(self, context, manager):
|
|
"""
|
|
In this test, I put back (using `SheerkaOntologyManager.add()`) a previously created ontology
|
|
:param context:
|
|
:type context:
|
|
:param manager:
|
|
:type manager:
|
|
:return:
|
|
:rtype:
|
|
"""
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# init the system
|
|
# key1 is in the cache and in sdp
|
|
# key2 is only in sdp
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key1", "value1")
|
|
manager.commit(context)
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key2", "value2")
|
|
|
|
# call key3 to check. This time there is no value, but there will be later
|
|
assert manager.get("cache_name", "key3") is NotFound
|
|
|
|
# now remove the ontology
|
|
new_ontology = manager.pop_ontology(context)
|
|
|
|
# add another ontology, with its own values
|
|
manager.push_ontology("another ontology")
|
|
manager.put("cache_name", "key1", "value1_from_another")
|
|
manager.put("cache_name", "key2", "value2_from_another")
|
|
manager.put("cache_name", "key3", "value3_from_another")
|
|
manager.commit(context)
|
|
|
|
# put back the ontology on top
|
|
manager.add_ontology(new_ontology)
|
|
|
|
assert manager.get("cache_name", "key1") == "value1"
|
|
assert manager.get("cache_name", "key2") == "value2"
|
|
assert manager.get("cache_name", "key3") == "value3_from_another"
|
|
|
|
def test_i_can_get_ontology(self, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("name1")
|
|
manager.push_ontology("name2")
|
|
manager.push_ontology("name3")
|
|
|
|
assert manager.get_ontology("name2").name == "name2"
|
|
assert manager.get_ontology().name == "name3"
|
|
|
|
with pytest.raises(KeyError):
|
|
assert manager.get_ontology("name4")
|
|
|
|
def test_i_can_manage_multiple_ontology_layers(self, context, manager):
|
|
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value1")
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value2")
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("another ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value3")
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("fourth ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value4")
|
|
|
|
assert manager.get("cache_name", "key") == "value4"
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value3"
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value2"
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value1"
|
|
|
|
def test_i_have_access_to_sub_layers_values(self, context, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
# I can get the low level value
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
# check that the value is copied on the top level cache
|
|
assert manager.current_cache_manager().has("cache_name", "key")
|
|
assert not manager.ontologies[1].cache_manager.has("cache_name", "key") # not the top level
|
|
assert manager.ontologies[2].cache_manager.has("cache_name", "key") # the data comes from it
|
|
|
|
def test_i_can_get_value_from_all_layers(self, context, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
def test_i_can_only_get_top_layer_values_when_dictionary_cache(self, context, manager):
|
|
manager.register_cache("cache_name", DictionaryCache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", False, {"key": "value"}) # add some values in default layer
|
|
|
|
# add some other values in another layer
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", False, {"key1": "value1"})
|
|
|
|
assert manager.get("cache_name", "key") is NotFound # other layer are not visible
|
|
assert manager.get("cache_name", "key1") == "value1" # I can only see the current layer
|
|
|
|
# I still can use get all
|
|
assert manager.get_all("cache_name") == {"key": "value", "key1": "value1"}
|
|
|
|
# I can get back my values after pop
|
|
manager.pop_ontology(context)
|
|
assert manager.copy("cache_name") == {"key": "value"}
|
|
|
|
def test_dictionary_caches_values_are_copied_when_a_new_ontology_is_pushed(self, manager):
|
|
manager.register_cache("cache_name", DictionaryCache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", False, {"key": "value"}) # add some values in default layer
|
|
manager.push_ontology("new ontology")
|
|
|
|
assert manager.copy("cache_name") == {"key": "value"}
|
|
assert manager.current_cache_manager().get_inner_cache("cache_name").to_add == set()
|
|
assert manager.current_cache_manager().get_inner_cache("cache_name").to_remove == set()
|
|
|
|
def test_initialized_key_are_correctly_managed_when_multiple_layers(self, context, manager):
|
|
manager.register_cache("c_name", Cache().auto_configure("c_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("c_name", "key", "value")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("c_name", "key2", "value2") # not in cache
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
manager.push_ontology("last ontology")
|
|
|
|
manager.get("c_name", "key") # == "value" but we don't care
|
|
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key"}
|
|
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
|
|
manager.get("c_name", "key2") # == "value2" but we don't care
|
|
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key", "key2"}
|
|
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == {"key2"}
|
|
|
|
manager.get("c_name", "no_key") # is NotFound but we don't care
|
|
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key", "key2", "no_key"}
|
|
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
|
|
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == {"key2", "no_key"}
|
|
|
|
def test_i_cannot_get_a_value_that_does_not_exists(self, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
def test_i_cannot_get_a_value_that_is_removed(self, manager):
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key", Removed)
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
def test_i_can_remove_an_entry(self, manager):
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.delete("cache_name", "key") # remove the entry in this ontology
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
# sanity check, the value still exist
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {'key': 'value'}
|
|
|
|
def test_i_cannot_get_value_that_is_removed_in_sub_level(self, manager):
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key", "value") # value exists
|
|
|
|
# add ontology layer
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key", Removed) # value is removed
|
|
|
|
# add another layer
|
|
manager.push_ontology("another ontology") # no indication
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
# check that the cache of the top level ontology is updated
|
|
assert manager.current_cache_manager().caches["cache_name"].cache.copy() == {"key": Removed}
|
|
|
|
def test_i_can_test_if_a_value_exists(self, context, manager):
|
|
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
# I can get the low level value
|
|
assert manager.exists("cache_name", "key")
|
|
|
|
# check that the value is not in cache (only in the low level database)
|
|
assert not manager.current_cache_manager().has("cache_name", "key")
|
|
assert not manager.ontologies[1].cache_manager.has("cache_name", "key")
|
|
assert not manager.ontologies[2].cache_manager.has("cache_name", "key")
|
|
|
|
def test_i_can_test_if_the_ontology_has_an_entry(self, context, manager):
|
|
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
|
|
assert manager.exists("cache_name", "key") # it can be seen by the exists()
|
|
assert not manager.exists_in_current("cache_name", "key") # but not by the has
|
|
|
|
# add the entry in the current sdp
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
assert manager.exists_in_current("cache_name", "key")
|
|
|
|
# add the entry only in cache
|
|
manager.put("cache_name", "key2", "value")
|
|
assert manager.exists_in_current("cache_name", "key") # it can be seen
|
|
|
|
def test_i_can_check_that_a_value_does_not_exist(self, manager):
|
|
|
|
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
assert not manager.exists("cache_name", "key")
|
|
|
|
def test_i_can_list_from_multiple_ontologies(self, context, manager):
|
|
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key1", DummyObj("key1", "value1"))
|
|
|
|
manager.push_ontology("new ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key2", DummyObj("key2", "value2"))
|
|
transaction.add("cache_name", "key1", DummyObj("key1", "value11")) # key1 is modified
|
|
|
|
manager.push_ontology("another ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key3", DummyObj("key3", "value3"))
|
|
|
|
assert manager.list("cache_name") == [DummyObj("key1", "value11"),
|
|
DummyObj("key2", "value2"),
|
|
DummyObj("key3", "value3")]
|
|
|
|
def test_i_can_list_from_multiple_ontologies_even_if_they_are_not_all_filled(self, context, manager):
|
|
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
# default layer
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key1", DummyObj("key1", "value1"))
|
|
|
|
manager.push_ontology("new ontology")
|
|
# nothing in this ontology
|
|
|
|
manager.push_ontology("another ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key3", DummyObj("key3", "value3"))
|
|
|
|
assert manager.list("cache_name") == [DummyObj("key1", "value1"),
|
|
DummyObj("key3", "value3")]
|
|
|
|
def test_i_can_list_when_no_items(self, manager):
|
|
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
assert manager.list("cache_name") == []
|
|
|
|
def test_i_can_put_entry(self, context, manager):
|
|
manager.register_cache("cache_name", Cache())
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
manager.put("cache_name", "key", "value")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") == "value"
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value'}}
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key", "value2")
|
|
manager.commit(context)
|
|
assert manager.get("cache_name", "key") == "value2"
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value2'}}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': 'value'}}
|
|
|
|
def test_i_can_put_in_a_list_cache(self, context, manager):
|
|
# in this test, sub layers have values.
|
|
# We need to check that those values are not lost when adding a new element
|
|
|
|
manager.register_cache("cache_name", ListCache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", ["val1"])
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key", "val2")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") == ["val1", "val2"]
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2']}}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1']}}
|
|
|
|
# and I can keep adding in another layer
|
|
manager.push_ontology("another ontology")
|
|
manager.put("cache_name", "key", "val3")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") == ["val1", "val2", "val3"]
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2', 'val3']}}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2']}}
|
|
assert manager.ontologies[2].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1']}}
|
|
|
|
def test_i_can_remove_an_entry_that_is_only_in_db(self, context, manager):
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
# value in DB but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
manager.delete("cache_name", "key")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
# sanity check, the entry is removed
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {}}
|
|
|
|
def test_i_can_remove_when_value_is_in_low_level(self, context, manager):
|
|
# In this test, there is a value in a lower level ontology
|
|
# After calling delete(), the value is no longer accessible, but not deleted
|
|
|
|
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key),
|
|
extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
# value in DB but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
manager.delete("cache_name", "key", "value")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
# sanity check, the entry is removed
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": Removed}
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": Removed}}
|
|
|
|
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {"key": "value"}}
|
|
|
|
# The entry still exists in lower ontology
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
def test_i_can_remove_when_value_is_in_both_low_and_current_level(self, context, manager):
|
|
# In this test, there is a value is in a lower level ontology and in the current ontology
|
|
# After calling delete(), the value is no longer accessible, but not deleted
|
|
|
|
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key),
|
|
extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
# value in DB but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value2")
|
|
|
|
manager.delete("cache_name", "key", "value")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
# sanity check, the entry is removed
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": Removed}
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": Removed}}
|
|
|
|
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {"key": "value"}}
|
|
|
|
# The entry still exists in lower ontology
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == "value"
|
|
|
|
def test_i_can_remove_when_value_is_not_low_level(self, context, manager):
|
|
# In this test, there is a value is only in the current level
|
|
# The value is deleted
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
# add an ontology layer
|
|
# so that the value does not exist in the lower level ontology
|
|
manager.push_ontology("new ontology")
|
|
|
|
# value in DB but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", "value")
|
|
|
|
manager.delete("cache_name", "key")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") is NotFound
|
|
|
|
# sanity check, the entry is removed
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {}}
|
|
|
|
def test_i_can_remove_list_if_needed_when_value_is_in_low_level(self, context, manager):
|
|
# In this test, there are multiple values in a low level ontology
|
|
# We remove only one,
|
|
# So the top level ontology must be a copy minus the removed value
|
|
|
|
cache = ListIfNeededCache().auto_configure("cache_name")
|
|
|
|
manager.register_cache("cache_name", cache)
|
|
manager.freeze()
|
|
|
|
# default ontology
|
|
# value in DB but not in cache
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("cache_name", "key", ["value", "value2"])
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
manager.delete("cache_name", "key", "value")
|
|
manager.commit(context)
|
|
|
|
assert manager.get("cache_name", "key") == "value2"
|
|
# sanity check, the entry is removed
|
|
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": "value2"}
|
|
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": "value2"}}
|
|
|
|
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {}
|
|
|
|
assert manager.ontologies[2].cache_manager.caches["cache_name"].cache.copy() == {"key": ["value", "value2"]}
|
|
assert manager.ontologies[2].cache_manager.sdp.state.data == {'cache_name': {"key": ["value", "value2"]}}
|
|
|
|
# The entry still exists in lower ontology
|
|
manager.pop_ontology(context)
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key") == ["value", "value2"]
|
|
|
|
def test_i_can_add_concept_default_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_key", foo.key) == foo
|
|
assert manager.get("by_id", foo.id) == foo
|
|
|
|
assert manager.current_sdp().get("by_key", foo.key) == foo
|
|
assert manager.current_sdp().get("by_id", foo.id) == foo
|
|
|
|
def test_i_can_add_concept_in_top_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_key", foo.key) == foo
|
|
assert manager.get("by_id", foo.id) == foo
|
|
|
|
assert manager.current_sdp().get("by_key", foo.key) == foo
|
|
assert manager.current_sdp().get("by_id", foo.id) == foo
|
|
|
|
# sanity check
|
|
assert list(manager.ontologies[0].cache_manager.sdp.state.data.keys()) == ['by_id', 'by_key']
|
|
assert manager.ontologies[1].cache_manager.sdp.state.data == {}
|
|
|
|
def test_i_can_add_the_concepts_with_the_same_key_from_different_layers(self, context, manager, next_id):
|
|
foo1 = get_metadata("foo x", body="x + 1", variables=["x"]).auto_init(next_id)
|
|
foo2 = get_metadata("foo x", body="x + 1", variables=["x"]).auto_init(next_id)
|
|
|
|
cache = ListIfNeededCache().auto_configure("by_key")
|
|
manager.register_concept_cache("by_key", cache, lambda obj: obj.key, use_ref=True)
|
|
manager.freeze()
|
|
|
|
manager.add_concept(foo1)
|
|
manager.commit(context)
|
|
|
|
manager.push_ontology("new ontology")
|
|
manager.add_concept(foo2)
|
|
manager.commit(context)
|
|
|
|
assert manager.current_sdp().get("by_key", foo1.key) == [foo1, foo2]
|
|
|
|
def test_i_can_update_concept_in_default_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo", body="body").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
|
|
modified = foo.clone(body="new body")
|
|
assert foo != modified
|
|
|
|
manager.update_concept(foo, modified)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_key", foo.key) == modified
|
|
assert manager.get("by_id", foo.id) == modified
|
|
|
|
def test_i_can_update_concept_in_top_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
# add an ontology layer
|
|
manager.push_ontology("new ontology")
|
|
|
|
modified = foo.clone(body="new body")
|
|
assert foo != modified
|
|
|
|
manager.update_concept(foo, modified)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_key", foo.key) == modified
|
|
assert manager.get("by_id", foo.id) == modified
|
|
|
|
# sanity check.
|
|
# make sure that the previous values are kept
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == modified
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
|
|
|
|
# so I can get the old values when I pop ontology
|
|
manager.pop_ontology(context)
|
|
assert manager.get("by_key", foo.key) == foo
|
|
assert manager.get("by_id", foo.id) == foo
|
|
|
|
def test_i_can_update_when_concept_in_both_top_and_bottom_layers(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
# add an ontology layer
|
|
# and modify the concept
|
|
# The database is updated, but not the internal cache
|
|
manager.push_ontology("new ontology")
|
|
modified1 = foo.clone(body="new body")
|
|
assert foo != modified1
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
transaction.add("by_key", foo.key, modified1)
|
|
transaction.add("by_id", foo.id, modified1)
|
|
|
|
# modify the top layer a second time
|
|
modified2 = foo.clone(body="body", pre="True")
|
|
manager.update_concept(foo, modified2)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_key", foo.key) == modified2
|
|
assert manager.get("by_id", foo.id) == modified2
|
|
|
|
# sanity check.
|
|
# make sure that the previous values are kept
|
|
# sanity check
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == modified2
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified2
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
|
|
|
|
# so I can get the old values when I pop ontology
|
|
manager.pop_ontology(context)
|
|
assert manager.get("by_key", foo.key) == foo
|
|
assert manager.get("by_id", foo.id) == foo
|
|
|
|
def test_i_can_update_when_the_key_changes(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo", id="1", key="my_key")
|
|
|
|
# create an entry
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
# add a new layer, and modify the concept
|
|
manager.push_ontology("new ontology")
|
|
modified = foo.clone(key="another key")
|
|
manager.update_concept(foo, modified)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_id", modified.id) == modified
|
|
assert manager.get("by_key", modified.key) == modified
|
|
assert manager.get("by_key", foo.key) == NotFound
|
|
|
|
# sanity
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == Removed
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', modified.key) == modified
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', modified.key) == NotFound
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
|
|
|
|
def test_i_can_update_when_key_changes_and_there_are_lists(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo, foo2, bar = get_metadatas("foo", get_metadata("foo", body="x"), "bar", next_id=next_id)
|
|
|
|
# create entries
|
|
manager.add_concept(foo)
|
|
manager.add_concept(foo2)
|
|
manager.add_concept(bar)
|
|
manager.commit(context)
|
|
|
|
# add a new layer, and modify the concept
|
|
manager.push_ontology("new ontology")
|
|
modified = foo.clone(key="bar")
|
|
manager.update_concept(foo, modified)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_id", modified.id) == modified
|
|
assert manager.get("by_key", modified.key) == [bar, modified]
|
|
assert manager.get("by_key", foo.key) == foo2
|
|
|
|
# sanity check
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == foo2
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_key', modified.key) == [bar, modified]
|
|
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == [foo, foo2]
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_key', modified.key) == bar
|
|
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
|
|
|
|
def test_i_can_remove_concept_from_default_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
manager.remove_concept(foo)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_id", foo.id) == NotFound
|
|
assert manager.get("by_key", foo.key) == NotFound
|
|
|
|
# sanity check
|
|
assert manager.current_sdp().get("by_key") == {}
|
|
assert manager.current_sdp().get("by_id") == {}
|
|
|
|
def test_i_can_remove_concept_from_top_layer(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
|
|
manager.add_concept(foo)
|
|
manager.commit(context)
|
|
|
|
# add a new layer, and remove the concept
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
manager.remove_concept(foo)
|
|
manager.commit(context)
|
|
|
|
assert manager.get("by_id", foo.id) == NotFound
|
|
assert manager.get("by_key", foo.key) == NotFound
|
|
|
|
# sanity check
|
|
assert manager.current_sdp().get("by_id") == {foo.id: Removed}
|
|
assert manager.current_sdp().get("by_key") == {foo.key: Removed}
|
|
assert manager.ontologies[1].cache_manager.sdp.get("by_id") == NotFound
|
|
assert manager.ontologies[1].cache_manager.sdp.get("by_key") == NotFound
|
|
assert manager.ontologies[2].cache_manager.sdp.get("by_id") == {foo.id: foo}
|
|
assert manager.ontologies[2].cache_manager.sdp.get("by_key") == {foo.key: foo}
|
|
|
|
# So I can pop
|
|
manager.pop_ontology(context)
|
|
assert manager.get("by_id", foo.id) == foo
|
|
assert manager.get("by_key", foo.key) == foo
|
|
|
|
# and pop again
|
|
manager.pop_ontology(context)
|
|
assert manager.get("by_id", foo.id) == foo
|
|
assert manager.get("by_key", foo.key) == foo
|
|
|
|
def test_i_can_get_all(self, context, manager):
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key_to_remove1", "value1")
|
|
manager.put("cache_name", "key_to_remove2", "value1")
|
|
manager.put("cache_name", "key1", "value1")
|
|
manager.put("cache_name", "key2", "value2_in_sdp")
|
|
manager.put("cache_name", "key3", "value3")
|
|
manager.commit(context)
|
|
manager.put("cache_name", "key2", "value2_in_cache") # in cache, but not in remote sdp
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key1", "value1_from_new_ontology")
|
|
manager.put("cache_name", "key2", "value2_from_new_ontology")
|
|
manager.put("cache_name", "key4", "value4_in_sdp")
|
|
manager.commit(context)
|
|
manager.put("cache_name", "key4", "value4_in_cache")
|
|
manager.put("cache_name", "key_to_remove1", Removed)
|
|
|
|
manager.push_ontology("another ontology")
|
|
with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
# so that value is only in sdp, not in cache
|
|
transaction.add("cache_name", "key5", "value5")
|
|
transaction.add("cache_name", "key_to_remove2", Removed)
|
|
|
|
assert manager.get_all("cache_name") == {
|
|
"key1": "value1_from_new_ontology",
|
|
"key2": "value2_from_new_ontology",
|
|
"key3": "value3",
|
|
"key4": "value4_in_cache",
|
|
"key5": "value5"
|
|
}
|
|
|
|
def test_i_can_keep_track_of_created_concepts_by_ontologies(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
manager.add_concept(foo)
|
|
|
|
def from_cache(entry):
|
|
return manager.internal_cache_manager.copy(entry)
|
|
|
|
def from_db(entry):
|
|
return manager.internal_cache_manager.sdp.get(entry)
|
|
|
|
# check that the new concept is tracked
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'}}
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__'}
|
|
|
|
# add a new ontology and make sure the new concepts are tracked
|
|
manager.push_ontology("new ontology")
|
|
bar = get_metadata("bar").auto_init(next_id)
|
|
manager.add_concept(bar)
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
|
|
'new ontology': {'1002'}}
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
|
|
'1002': 'new ontology'}
|
|
|
|
# commit the info and check the DB
|
|
manager.commit(context)
|
|
assert from_db(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
|
|
'new ontology': {'1002'}}
|
|
assert from_db(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
|
|
'1002': 'new ontology'}
|
|
|
|
# remove a concept a check
|
|
manager.remove_concept(foo)
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'new ontology': {'1002'}, }
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1002': 'new ontology', }
|
|
|
|
manager.remove_concept(bar)
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {}
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {}
|
|
|
|
# commit again and check
|
|
manager.commit(context)
|
|
assert from_db(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {}
|
|
assert from_db(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {}
|
|
|
|
@pytest.mark.skip("No rule yet !")
|
|
def test_i_can_keep_track_of_created_rules_by_ontologies(self, manager):
|
|
sheerka, context, rule1 = self.init_format_rules(("rule1", "id.attr == 'value'", "True"))
|
|
|
|
def rules_by_ontology_from_cache():
|
|
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
|
|
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
|
|
return res
|
|
|
|
def ontologies_from_cache():
|
|
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
|
|
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
|
|
|
|
def rules_by_ontology_from_db():
|
|
res = sheerka.om.internal_cache_manager.sdp.get(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
|
|
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
|
|
return res
|
|
|
|
def ontologies_from_db():
|
|
res = sheerka.om.internal_cache_manager.sdp.get(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
|
|
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
|
|
|
|
assert rules_by_ontology_from_cache() == {"#unit_test#": {rule1.id}}
|
|
assert ontologies_from_cache() == {rule1.id: "#unit_test#"}
|
|
|
|
# add a new rule from a new ontology and check
|
|
sheerka.push_ontology(context, "new ontology")
|
|
rule2 = Rule(ACTION_TYPE_EXEC, "rule2", "id2.attr2 == 'value'", "True")
|
|
sheerka.create_new_rule(context, rule2)
|
|
|
|
assert rules_by_ontology_from_cache() == {"#unit_test#": {rule1.id}, "new ontology": {rule2.id}}
|
|
assert ontologies_from_cache() == {rule1.id: "#unit_test#", rule2.id: "new ontology"}
|
|
|
|
# commit and check the result
|
|
sheerka.om.commit(context)
|
|
assert rules_by_ontology_from_db() == {"#unit_test#": {rule1.id}, "new ontology": {rule2.id}}
|
|
assert ontologies_from_db() == {rule1.id: "#unit_test#", rule2.id: "new ontology"}
|
|
|
|
sheerka.remove_rule(context, rule1)
|
|
assert rules_by_ontology_from_cache() == {"new ontology": {rule2.id}}
|
|
assert ontologies_from_cache() == {rule2.id: "new ontology"}
|
|
|
|
# remove the last rule
|
|
sheerka.remove_rule(context, rule2)
|
|
assert rules_by_ontology_from_cache() == {}
|
|
assert ontologies_from_cache() == {}
|
|
|
|
# commit and check the db
|
|
sheerka.om.commit(context)
|
|
assert rules_by_ontology_from_db() == {}
|
|
assert ontologies_from_db() == {}
|
|
|
|
def test_i_can_keep_track_of_created_concept_on_ontology_pop(self, context, manager, next_id):
|
|
self.init_by_id_and_by_name(manager)
|
|
foo = get_metadata("foo").auto_init(next_id)
|
|
manager.add_concept(foo)
|
|
|
|
def from_cache(entry):
|
|
return manager.internal_cache_manager.copy(entry)
|
|
|
|
manager.push_ontology("new ontology")
|
|
bar, baz = get_metadatas("bar", "baz", next_id=next_id)
|
|
manager.add_concept(bar)
|
|
manager.add_concept(baz)
|
|
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
|
|
'new ontology': {'1002', '1003'}}
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
|
|
'1002': 'new ontology',
|
|
'1003': 'new ontology'}
|
|
|
|
manager.pop_ontology(context)
|
|
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'}}
|
|
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__'}
|
|
|
|
@pytest.mark.skip("No rule yet !")
|
|
def test_i_can_keep_track_of_created_rules_on_ontology_pop(self, manager):
|
|
sheerka, context, rule1 = self.init_format_rules(("rule1", "id.attr == 'value'", "True"))
|
|
|
|
events_raised = set()
|
|
sheerka.subscribe(EVENT_RULE_ID_DELETED, lambda ctx, r: events_raised.add(r))
|
|
|
|
def rules_by_ontology_from_cache():
|
|
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
|
|
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
|
|
return res
|
|
|
|
def ontologies_from_cache():
|
|
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
|
|
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
|
|
|
|
sheerka.push_ontology(context, "new ontology")
|
|
sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule2", "id2.attr2 == 'value'", "True"))
|
|
sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule3", "id3.attr3 == 'value'", "True"))
|
|
|
|
sheerka.pop_ontology(context)
|
|
assert rules_by_ontology_from_cache() == {'#unit_test#': {'11'}}
|
|
assert ontologies_from_cache() == {'11': '#unit_test#'}
|
|
|
|
# check that the 'rule is deleted' events are raised
|
|
assert events_raised == {'12', '13'}
|
|
|
|
def test_i_can_get_call_when_a_cache_is_cleared(self, manager):
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key1", "value1")
|
|
manager.put("cache_name", "key2", "value2")
|
|
manager.put("cache_name", "key3", "value3")
|
|
|
|
# add ontology layers
|
|
manager.push_ontology("new ontology")
|
|
manager.clear("cache_name")
|
|
manager.put("cache_name", "key1", "new value1")
|
|
manager.put("cache_name", "key4", "value4")
|
|
|
|
manager.push_ontology("another ontology")
|
|
manager.put("cache_name", "key5", "value5")
|
|
|
|
assert manager.get_all("cache_name") == {
|
|
"key1": "new value1",
|
|
"key4": "value4",
|
|
"key5": "value5"
|
|
}
|
|
|
|
def test_i_can_get_all_when_inc_cache(self, manager):
|
|
|
|
manager.register_cache("cache_name", IncCache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
assert manager.get("cache_name", "key1") == 1
|
|
assert manager.get("cache_name", "key1") == 2
|
|
|
|
manager.push_ontology("new ontology")
|
|
assert manager.get("cache_name", "key1") == 3
|
|
assert manager.get("cache_name", "key2") == 1
|
|
assert manager.get("cache_name", "key2") == 2
|
|
|
|
assert manager.get_all("cache_name") == {
|
|
"key1": 3,
|
|
"key2": 2,
|
|
}
|
|
|
|
# a second time, to make sure that nothing was incremented
|
|
assert manager.get_all("cache_name") == {
|
|
"key1": 3,
|
|
"key2": 2,
|
|
}
|
|
|
|
@pytest.mark.parametrize("all_ontologies, expected_in_layer_1", [
|
|
(False, {}),
|
|
(True, {'key1': DummyObj(key='key1', value='value1'),
|
|
'key2': DummyObj(key='key2', value='value2')}),
|
|
])
|
|
def test_i_can_populate(self, context, manager, all_ontologies, expected_in_layer_1):
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key1", DummyObj("key1", "value1"))
|
|
manager.put("cache_name", "key2", DummyObj("key2", "value2"))
|
|
manager.commit(context)
|
|
manager.clear("cache_name")
|
|
|
|
manager.push_ontology("new ontology")
|
|
manager.put("cache_name", "key2", DummyObj("key2", "value22"))
|
|
manager.put("cache_name", "key3", DummyObj("key3", "value3"))
|
|
manager.commit(context)
|
|
manager.clear("cache_name")
|
|
|
|
# sanity check
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
|
|
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == {}
|
|
|
|
manager.populate("cache_name",
|
|
lambda sdp: sdp.list("cache_name"),
|
|
lambda obj: obj.key,
|
|
all_ontologies=all_ontologies)
|
|
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {
|
|
'key2': DummyObj(key='key2', value='value22'),
|
|
'key3': DummyObj(key='key3', value='value3')}
|
|
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == expected_in_layer_1
|
|
|
|
def test_i_can_clear_when_multiple_ontology_layers(self, context, manager):
|
|
|
|
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
manager.freeze()
|
|
|
|
manager.put("cache_name", "key1", "value1")
|
|
manager.put("cache_name", "key2", "value2")
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': 'value1',
|
|
'key2': 'value2'}
|
|
|
|
# I can clear in another layer
|
|
manager.push_ontology("new ontology")
|
|
manager.clear("cache_name")
|
|
assert manager.get("cache_name", "key1") is NotFound
|
|
assert manager.get("cache_name", "key2") is NotFound
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
|
|
|
|
manager.put("cache_name", "key1", "new value1")
|
|
assert manager.get("cache_name", "key1") == "new value1"
|
|
assert manager.get("cache_name", "key2") is NotFound
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1"}
|
|
|
|
manager.push_ontology("another ontology")
|
|
manager.put("cache_name", "key2", "new value2")
|
|
assert manager.get("cache_name", "key1") == "new value1"
|
|
assert manager.get("cache_name", "key2") == "new value2"
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1",
|
|
'key2': "new value2"}
|
|
|
|
manager.clear("cache_name")
|
|
assert manager.get("cache_name", "key1") is NotFound
|
|
assert manager.get("cache_name", "key2") is NotFound
|
|
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
|
|
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1"}
|
|
assert manager.ontologies[2].cache_manager.get_inner_cache("cache_name").copy() == {'key1': 'value1',
|
|
'key2': 'value2'}
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key1") == "new value1"
|
|
assert manager.get("cache_name", "key2") is NotFound
|
|
|
|
manager.pop_ontology(context)
|
|
assert manager.get("cache_name", "key1") == "value1"
|
|
assert manager.get("cache_name", "key2") == "value2"
|
|
|
|
def test_already_on_the_top(self, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("new ontology")
|
|
|
|
assert manager.already_on_top("new ontology")
|
|
assert not manager.already_on_top("another ontology")
|
|
|
|
def test_already_on_the_top_when_the_ontology_already_exists(self, manager):
|
|
manager.freeze()
|
|
|
|
manager.push_ontology("new ontology")
|
|
manager.push_ontology("another ontology")
|
|
|
|
with pytest.raises(OntologyAlreadyExists):
|
|
assert manager.already_on_top("new ontology")
|
|
|
|
# class TestSheerkaOntologyWithFileBasedSheerka(UsingFileBasedSheerka):
|
|
# def test_i_can_put_back_ontology(self, manager):
|
|
# sheerka = self.get_sheerka()
|
|
#
|
|
#
|
|
#
|
|
# manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
|
|
# manager.freeze()
|
|
#
|
|
# # default layer
|
|
# with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
# transaction.add("cache_name", "key", "value1")
|
|
#
|
|
# # add a layer
|
|
# manager.push_ontology("new ontology")
|
|
# with manager.current_sdp().get_transaction(context.event) as transaction:
|
|
# transaction.add("cache_name", "key", "value2")
|
|
#
|
|
# assert manager.get("cache_name", "key") == "value2"
|
|
#
|
|
# manager.pop_ontology(context)
|
|
# assert manager.get("cache_name", "key") == "value1"
|
|
#
|
|
# # put back the previous ontology
|
|
# manager.push_ontology("new ontology")
|
|
# assert manager.get("cache_name", "key") == "value2"
|
|
#
|
|
# def test_i_can_remember_concept_and_rules_by_ontology(self, manager):
|
|
# sheerka, context, foo, r1 = self.init_test().with_concepts(
|
|
# "foo",
|
|
# create_new=True
|
|
# ).with_format_rules(
|
|
# ("rule1", "__ret", "True"),
|
|
# ).unpack()
|
|
# sheerka.om.commit(context)
|
|
#
|
|
# sheerka = self.new_sheerka_instance(False)
|
|
#
|
|
#
|
|
# sheerka.create_new_concept(context, Concept("bar"))
|
|
# r2 = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule2", "__ret.status", "True")).body.body
|
|
# sheerka.om.commit(context)
|
|
#
|
|
# sheerka.push_ontology(context, "new ontology")
|
|
# sheerka.create_new_concept(context, Concept("baz"))
|
|
# sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule3", "id3.attr3 == 'value'", "True"))
|
|
# sheerka.om.commit(context)
|
|
#
|
|
# sheerka = self.new_sheerka_instance(False)
|
|
#
|
|
#
|
|
# sheerka.push_ontology(context, "another ontology")
|
|
# sheerka.create_new_concept(context, Concept("qux"))
|
|
# r4 = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule4", "id4.attr4", "True")).body.body
|
|
# sheerka.remove_concept(context, foo)
|
|
# sheerka.remove_rule(context, r2)
|
|
# sheerka.om.commit(context)
|
|
#
|
|
# assert sheerka.om.self_cache_manager.copy(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {
|
|
# '#unit_test#': {'1002'},
|
|
# 'another ontology': {'1004'},
|
|
# }
|
|
#
|
|
# assert sheerka.om.self_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY) == {
|
|
# '#unit_test#': {r1.id},
|
|
# 'another ontology': {r4.id},
|
|
# }
|
|
#
|
|
# # in db
|
|
# assert sheerka.om.self_cache_manager.sdp.get(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {
|
|
# '#unit_test#': {'1002'},
|
|
# 'another ontology': {'1004'},
|
|
# 'new ontology': {'1003'}}
|
|
#
|
|
# rules_from_db = sheerka.om.self_cache_manager.sdp.get(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
|
|
# del rules_from_db["__default__"]
|
|
# assert rules_from_db == {
|
|
# '#unit_test#': {'11'},
|
|
# 'another ontology': {'14'},
|
|
# 'new ontology': {'13'}}
|