Files
Sheerka/tests/ontologies/test_SheerkaOntoloyManager.py
T
kodjo e41094f908 Fixed #8
Fixed #12
Fixed #13
Fixed #14
2023-05-08 17:50:28 +02:00

1523 lines
65 KiB
Python

from dataclasses import dataclass
import pytest
from base import BaseTest
from caching.Cache import Cache
from caching.DictionaryCache import DictionaryCache
from caching.IncCache import IncCache
from caching.ListCache import ListCache
from caching.ListIfNeededCache import ListIfNeededCache
from common.global_symbols import NotFound, Removed
from helpers import get_metadata, get_metadatas
from ontologies.Exceptions import OntologyAlreadyExists, OntologyManagerCannotPopLatest, OntologyManagerFrozen, \
OntologyManagerNotFrozen, OntologyNotFound
from ontologies.SheerkaOntologyManager import Ontology, SheerkaOntologyManager
@dataclass
class DummyObj:
key: str
value: object
class TestSheerkaOntology(BaseTest):
@pytest.fixture()
def manager(self, sheerka):
return SheerkaOntologyManager(sheerka, "mem://")
@staticmethod
def init_by_id_and_by_name(manager):
"""
initialize a manager with concept caches for by id and by name
:param manager:
:type manager:
:return:
:rtype:
"""
cache = Cache().auto_configure("by_id")
manager.register_concept_cache("by_id", cache, lambda obj: obj.id, use_ref=True)
cache = ListIfNeededCache().auto_configure("by_key")
manager.register_concept_cache("by_key", cache, lambda obj: obj.key, use_ref=True)
manager.freeze()
return manager
def test_i_can_create_ontology_manager(self, manager):
assert len(manager.ontologies) == 1
assert manager.ontologies_names == [SheerkaOntologyManager.ROOT_ONTOLOGY_NAME]
# current cache manager and current sdp point to the top of the list
assert id(manager.current_cache_manager()) == id(manager.ontologies[0].cache_manager)
assert id(manager.current_sdp()) == id(manager.ontologies[0].cache_manager.sdp)
# No cache defined by default
assert manager.current_cache_manager().caches == {}
def test_i_can_register_a_cache_and_get_data(self, context, manager):
cache = Cache()
manager.register_cache("test", cache)
assert id(cache._sdp) == id(manager.current_sdp()) # sdp is automatically added to the new Cache
manager.put("test", "key", "value")
assert manager.get("test", "key") == "value"
assert manager.current_sdp().get("test", "key") == NotFound # not yet committed
manager.commit(context)
assert manager.get("test", "key") == "value"
assert manager.current_sdp().get("test", "key") == "value"
def test_i_can_no_longer_register_cache_once_ontology_is_frozen(self, manager):
manager.freeze()
with pytest.raises(OntologyManagerFrozen):
manager.register_cache("test", Cache())
with pytest.raises(OntologyManagerFrozen):
manager.register_concept_cache("test", Cache(), lambda obj: obj.key, True)
def test_i_cannot_push_ontology_if_not_frozen(self, manager):
with pytest.raises(OntologyManagerNotFrozen):
manager.push_ontology("new_ontology")
def test_i_can_push_ontology_from_simple_caches(self, manager):
"""
Once registered, the same cache are created every time a new ontology is added or pushed
:param manager:
:type manager:
:return:
:rtype:
"""
manager.register_cache("Cache", Cache(), persist=True, use_ref=True)
manager.register_cache("DictionaryCache", DictionaryCache(), persist=True, use_ref=False)
manager.register_cache("ListIfNeededCache", ListIfNeededCache(), persist=False, use_ref=False)
manager.register_cache("ListCache", ListCache(), persist=False, use_ref=True)
manager.register_cache("IncCache", IncCache(), False)
manager.freeze()
manager.push_ontology("new_ontology")
assert len(manager.ontologies) == 2
ref_cache_manager = manager.ontologies[1].cache_manager
assert ref_cache_manager.sdp.name == "__default__"
cache_manager_0 = manager.ontologies[0].cache_manager
assert len(cache_manager_0.caches) == 5
assert cache_manager_0.concept_caches == []
# check that the definition of the newly created caches are the same thant the ref
for cache_name in ("Cache", "DictionaryCache", "ListIfNeededCache", "ListCache", "IncCache"):
cache_def = cache_manager_0.caches[cache_name]
assert cache_def.persist == ref_cache_manager.caches[cache_name].persist
assert cache_def.use_ref == ref_cache_manager.caches[cache_name].use_ref
assert type(cache_def.cache) == type(ref_cache_manager.caches[cache_name].cache)
assert cache_def.cache._sdp.name == "new_ontology"
def test_i_can_push_multiple_ontologies(self, manager):
manager.freeze()
manager.push_ontology("ontology 1")
manager.push_ontology("ontology 2")
assert len(manager.ontologies) == 3
assert manager.ontologies[0].name == "ontology 2"
assert manager.ontologies[0].depth == 2
assert manager.ontologies[1].name == "ontology 1"
assert manager.ontologies[1].depth == 1
assert manager.ontologies[2].name == SheerkaOntologyManager.ROOT_ONTOLOGY_NAME
assert manager.ontologies[2].depth == 0
def test_i_can_push_ontology_from_concept_caches(self, manager):
"""
Same test than before, but for cache definitions
:param manager:
:type manager:
:return:
:rtype:
"""
self.init_by_id_and_by_name(manager)
manager.push_ontology("new_ontology")
assert len(manager.ontologies) == 2
ref_cache_manager = manager.ontologies[1].cache_manager
cache_manager_0 = manager.ontologies[0].cache_manager
assert len(cache_manager_0.caches) == 2
assert cache_manager_0.concept_caches == ref_cache_manager.concept_caches
assert cache_manager_0.sdp.name == "new_ontology"
for cache_name in ("by_key", "by_id"):
cache_def = cache_manager_0.caches[cache_name]
assert cache_def.persist == ref_cache_manager.caches[cache_name].persist
assert cache_def.use_ref == ref_cache_manager.caches[cache_name].use_ref
assert type(cache_def.cache) == type(ref_cache_manager.caches[cache_name].cache)
assert cache_def.cache._sdp.name == "new_ontology"
def test_i_can_get_database_value(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
assert not manager.current_cache_manager().has("cache_name", "key")
assert manager.get("cache_name", "key") == "value"
assert manager.current_cache_manager().has("cache_name", "key")
def test_i_cannot_pop_ontology_when_not_frozen(self, context, manager):
with pytest.raises(OntologyManagerNotFrozen):
manager.pop_ontology(context)
def test_i_cannot_pop_the_latest_cache_manager(self, context, manager):
"""
Cannot pop when there is only one ontology left
:param manager:
:type manager:
:param context:
:type context:
:return:
:rtype:
"""
manager.freeze()
with pytest.raises(OntologyManagerCannotPopLatest):
manager.pop_ontology(context)
def test_i_can_pop_ontology(self, context, manager):
manager.freeze()
manager.push_ontology("ontology1")
manager.push_ontology("ontology2")
manager.push_ontology("ontology3")
assert len(manager.ontologies) == 4
manager.pop_ontology(context)
assert len(manager.ontologies) == 3
manager.pop_ontology(context)
manager.pop_ontology(context)
with pytest.raises(OntologyManagerCannotPopLatest):
manager.pop_ontology(context)
def test_i_can_revert_ontology(self, context, manager):
manager.freeze()
manager.push_ontology("ontology1")
ontology = manager.push_ontology("ontology2")
manager.push_ontology("ontology3")
manager.push_ontology("ontology4")
manager.revert_ontology(context, ontology)
assert len(manager.ontologies) == 2
assert manager.current_ontology().name == "ontology1"
def test_i_cannot_revert_when_ontology_is_already_popped(self, context, manager):
manager.freeze()
manager.push_ontology("ontology1")
ontology = manager.push_ontology("ontology2")
manager.pop_ontology(context)
with pytest.raises(OntologyNotFound) as err:
manager.revert_ontology(context, ontology)
assert err.value.ontology == ontology
def test_i_cannot_revert_when_ontology_that_does_not_exists(self, context, manager):
manager.freeze()
manager.push_ontology("ontology1")
ontology = Ontology("fake", 0, manager.current_cache_manager(), None)
with pytest.raises(OntologyNotFound) as err:
manager.revert_ontology(context, ontology)
assert err.value.ontology == ontology
def test_i_can_push_ontology_to_override_values(self, context, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
# sanity check
manager.put("cache_name", "key", "value1")
assert manager.get("cache_name", "key") == "value1"
# push an ontology and override the value
manager.push_ontology("new ontology")
manager.put("cache_name", "key", "value2")
assert manager.get("cache_name", "key") == "value2"
# The new value is discarded when the ontology is removed
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value1"
def test_i_cannot_get_values_from_parent_ontologies_if_default_parameter_is_not_set(self, context, manager):
"""
If this test, the property called `default` of the `Cache` is not defined.
The `default` must be defined and be a callable in order to make the magic happen
:param context:
:type context:
:param manager:
:type manager:
:return:
:rtype:
"""
manager.register_cache("cache_name", Cache())
manager.freeze()
manager.put("cache_name", "key", "value1")
assert manager.get("cache_name", "key") == "value1"
# push an ontology and try to get the value
manager.push_ontology("new ontology")
assert manager.get("cache_name", "key") is NotFound
def test_i_can_get_values_from_parent_ontologies_if_default_parameter_is_set(self, context, manager):
"""
`default` and `alt_sdp_get` must be defined in order to allow parent request
There is no special constraint for that. It's just that I don't want to check every possible cases
`default` and `alt_sdp_get` will always be defined in Sheerka, so it's useless to code situations where
there are not.
:param context:
:type context:
:param manager:
:type manager:
:return:
:rtype:
"""
manager.register_cache("cache_name", Cache(
default=lambda sdp, key: sdp.get("cache_name", key),
alt_sdp_get=lambda sdp, key: sdp.get("cache_name", key)))
manager.freeze()
manager.put("cache_name", "key", "value1")
assert manager.get("cache_name", "key") == "value1"
# push an ontology and try to get the value
manager.push_ontology("new ontology")
assert manager.get("cache_name", "key") == "value1"
def test_i_can_access_values_after_push_and_pop(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# put value in DB, but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value1")
assert not manager.current_cache_manager().has("cache_name", "key") # value not in cache
assert manager.current_sdp().exists("cache_name", "key") # but value is in DB
# add an ontology layer, and put in DB again, not in cache
manager.push_ontology("new ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value2")
# At this point, the value is in DB, but not in cache
assert not manager.current_cache_manager().has("cache_name", "key")
assert manager.get("cache_name", "key") == "value2" # taken from the current ontology
assert manager.current_cache_manager().has("cache_name", "key")
# sanity check
# Let's check sdp values
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value2'}}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': 'value1'}}
# remove a layer
manager.pop_ontology(context)
assert not manager.current_cache_manager().has("cache_name", "key") # value is no longer in cache
assert manager.get("cache_name", "key") == "value1"
assert manager.current_cache_manager().has("cache_name", "key") # it's now in cache
# sanity check
# Let's check sdp values
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value1'}}
def test_i_can_add_ontology(self, context, manager):
"""
In this test, I put back (using `SheerkaOntologyManager.add()`) a previously created ontology
:param context:
:type context:
:param manager:
:type manager:
:return:
:rtype:
"""
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# init the system
# key1 is in the cache and in sdp
# key2 is only in sdp
manager.push_ontology("new ontology")
manager.put("cache_name", "key1", "value1")
manager.commit(context)
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key2", "value2")
# call key3 to check. This time there is no value, but there will be later
assert manager.get("cache_name", "key3") is NotFound
# now remove the ontology
new_ontology = manager.pop_ontology(context)
# add another ontology, with its own values
manager.push_ontology("another ontology")
manager.put("cache_name", "key1", "value1_from_another")
manager.put("cache_name", "key2", "value2_from_another")
manager.put("cache_name", "key3", "value3_from_another")
manager.commit(context)
# put back the ontology on top
manager.add_ontology(new_ontology)
assert manager.get("cache_name", "key1") == "value1"
assert manager.get("cache_name", "key2") == "value2"
assert manager.get("cache_name", "key3") == "value3_from_another"
def test_i_can_get_ontology(self, manager):
manager.freeze()
manager.push_ontology("name1")
manager.push_ontology("name2")
manager.push_ontology("name3")
assert manager.get_ontology("name2").name == "name2"
assert manager.get_ontology().name == "name3"
with pytest.raises(KeyError):
assert manager.get_ontology("name4")
def test_i_can_manage_multiple_ontology_layers(self, context, manager):
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key)))
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value1")
# add an ontology layer
manager.push_ontology("new ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value2")
# add an ontology layer
manager.push_ontology("another ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value3")
# add an ontology layer
manager.push_ontology("fourth ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value4")
assert manager.get("cache_name", "key") == "value4"
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value3"
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value2"
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value1"
def test_i_have_access_to_sub_layers_values(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
# I can get the low level value
assert manager.get("cache_name", "key") == "value"
# check that the value is copied on the top level cache
assert manager.current_cache_manager().has("cache_name", "key")
assert not manager.ontologies[1].cache_manager.has("cache_name", "key") # not the top level
assert manager.ontologies[2].cache_manager.has("cache_name", "key") # the data comes from it
def test_i_can_get_value_from_all_layers(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
assert manager.get("cache_name", "key") == "value"
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value"
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value"
def test_i_can_only_get_top_layer_values_when_dictionary_cache(self, context, manager):
manager.register_cache("cache_name", DictionaryCache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", False, {"key": "value"}) # add some values in default layer
# add some other values in another layer
manager.push_ontology("new ontology")
manager.put("cache_name", False, {"key1": "value1"})
assert manager.get("cache_name", "key") is NotFound # other layer are not visible
assert manager.get("cache_name", "key1") == "value1" # I can only see the current layer
# I still can use get all
assert manager.get_all("cache_name") == {"key": "value", "key1": "value1"}
# I can get back my values after pop
manager.pop_ontology(context)
assert manager.copy("cache_name") == {"key": "value"}
def test_dictionary_caches_values_are_copied_when_a_new_ontology_is_pushed(self, manager):
manager.register_cache("cache_name", DictionaryCache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", False, {"key": "value"}) # add some values in default layer
manager.push_ontology("new ontology")
assert manager.copy("cache_name") == {"key": "value"}
assert manager.current_cache_manager().get_inner_cache("cache_name").to_add == set()
assert manager.current_cache_manager().get_inner_cache("cache_name").to_remove == set()
def test_initialized_key_are_correctly_managed_when_multiple_layers(self, context, manager):
manager.register_cache("c_name", Cache().auto_configure("c_name"))
manager.freeze()
manager.put("c_name", "key", "value")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("c_name", "key2", "value2") # not in cache
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
manager.push_ontology("last ontology")
manager.get("c_name", "key") # == "value" but we don't care
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key"}
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == set()
manager.get("c_name", "key2") # == "value2" but we don't care
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key", "key2"}
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == {"key2"}
manager.get("c_name", "no_key") # is NotFound but we don't care
assert manager.ontologies[0].cache_manager.caches["c_name"].cache._initialized_keys == {"key", "key2", "no_key"}
assert manager.ontologies[1].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[2].cache_manager.caches["c_name"].cache._initialized_keys == set()
assert manager.ontologies[3].cache_manager.caches["c_name"].cache._initialized_keys == {"key2", "no_key"}
def test_i_cannot_get_a_value_that_does_not_exists(self, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
assert manager.get("cache_name", "key") is NotFound
def test_i_cannot_get_a_value_that_is_removed(self, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
manager.put("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
manager.put("cache_name", "key", Removed)
assert manager.get("cache_name", "key") is NotFound
def test_i_can_remove_an_entry(self, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
manager.put("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
manager.delete("cache_name", "key") # remove the entry in this ontology
assert manager.get("cache_name", "key") is NotFound
# sanity check, the value still exist
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {'key': 'value'}
def test_i_cannot_get_value_that_is_removed_in_sub_level(self, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key", "value") # value exists
# add ontology layer
manager.push_ontology("new ontology")
manager.put("cache_name", "key", Removed) # value is removed
# add another layer
manager.push_ontology("another ontology") # no indication
assert manager.get("cache_name", "key") is NotFound
# check that the cache of the top level ontology is updated
assert manager.current_cache_manager().caches["cache_name"].cache.copy() == {"key": Removed}
def test_i_can_test_if_a_value_exists(self, context, manager):
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
# I can get the low level value
assert manager.exists("cache_name", "key")
# check that the value is not in cache (only in the low level database)
assert not manager.current_cache_manager().has("cache_name", "key")
assert not manager.ontologies[1].cache_manager.has("cache_name", "key")
assert not manager.ontologies[2].cache_manager.has("cache_name", "key")
def test_i_can_test_if_the_ontology_has_an_entry(self, context, manager):
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
# add ontology layers
manager.push_ontology("new ontology")
assert manager.exists("cache_name", "key") # it can be seen by the exists()
assert not manager.exists_in_current("cache_name", "key") # but not by the has
# add the entry in the current sdp
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
assert manager.exists_in_current("cache_name", "key")
# add the entry only in cache
manager.put("cache_name", "key2", "value")
assert manager.exists_in_current("cache_name", "key") # it can be seen
def test_i_can_check_that_a_value_does_not_exist(self, manager):
manager.register_cache("cache_name", Cache(extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
manager.freeze()
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
assert not manager.exists("cache_name", "key")
def test_i_can_list_from_multiple_ontologies(self, context, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key1", DummyObj("key1", "value1"))
manager.push_ontology("new ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key2", DummyObj("key2", "value2"))
transaction.add("cache_name", "key1", DummyObj("key1", "value11")) # key1 is modified
manager.push_ontology("another ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key3", DummyObj("key3", "value3"))
assert manager.list("cache_name") == [DummyObj("key1", "value11"),
DummyObj("key2", "value2"),
DummyObj("key3", "value3")]
def test_i_can_list_from_multiple_ontologies_even_if_they_are_not_all_filled(self, context, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
# default layer
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key1", DummyObj("key1", "value1"))
manager.push_ontology("new ontology")
# nothing in this ontology
manager.push_ontology("another ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key3", DummyObj("key3", "value3"))
assert manager.list("cache_name") == [DummyObj("key1", "value1"),
DummyObj("key3", "value3")]
def test_i_can_list_when_no_items(self, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
assert manager.list("cache_name") == []
def test_i_can_put_entry(self, context, manager):
manager.register_cache("cache_name", Cache())
manager.freeze()
# default ontology
manager.put("cache_name", "key", "value")
manager.commit(context)
assert manager.get("cache_name", "key") == "value"
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value'}}
# add an ontology layer
manager.push_ontology("new ontology")
manager.put("cache_name", "key", "value2")
manager.commit(context)
assert manager.get("cache_name", "key") == "value2"
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': 'value2'}}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': 'value'}}
def test_i_can_put_in_a_list_cache(self, context, manager):
# in this test, sub layers have values.
# We need to check that those values are not lost when adding a new element
manager.register_cache("cache_name", ListCache().auto_configure("cache_name"))
manager.freeze()
# default ontology
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", ["val1"])
# add an ontology layer
manager.push_ontology("new ontology")
manager.put("cache_name", "key", "val2")
manager.commit(context)
assert manager.get("cache_name", "key") == ["val1", "val2"]
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2']}}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1']}}
# and I can keep adding in another layer
manager.push_ontology("another ontology")
manager.put("cache_name", "key", "val3")
manager.commit(context)
assert manager.get("cache_name", "key") == ["val1", "val2", "val3"]
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2', 'val3']}}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1', 'val2']}}
assert manager.ontologies[2].cache_manager.sdp.state.data == {'cache_name': {'key': ['val1']}}
def test_i_can_remove_an_entry_that_is_only_in_db(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# default ontology
# value in DB but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
manager.delete("cache_name", "key")
manager.commit(context)
assert manager.get("cache_name", "key") is NotFound
# sanity check, the entry is removed
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {}}
def test_i_can_remove_when_value_is_in_low_level(self, context, manager):
# In this test, there is a value in a lower level ontology
# After calling delete(), the value is no longer accessible, but not deleted
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key),
extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
manager.freeze()
# default ontology
# value in DB but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
# add an ontology layer
manager.push_ontology("new ontology")
manager.delete("cache_name", "key", "value")
manager.commit(context)
assert manager.get("cache_name", "key") is NotFound
# sanity check, the entry is removed
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": Removed}
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": Removed}}
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {"key": "value"}}
# The entry still exists in lower ontology
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value"
def test_i_can_remove_when_value_is_in_both_low_and_current_level(self, context, manager):
# In this test, there is a value is in a lower level ontology and in the current ontology
# After calling delete(), the value is no longer accessible, but not deleted
manager.register_cache("cache_name", Cache(default=lambda sdp, key: sdp.get("cache_name", key),
extend_exists=lambda sdp, key: sdp.exists("cache_name", key)))
manager.freeze()
# default ontology
# value in DB but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
# add an ontology layer
manager.push_ontology("new ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value2")
manager.delete("cache_name", "key", "value")
manager.commit(context)
assert manager.get("cache_name", "key") is NotFound
# sanity check, the entry is removed
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": Removed}
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": Removed}}
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[1].cache_manager.sdp.state.data == {'cache_name': {"key": "value"}}
# The entry still exists in lower ontology
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == "value"
def test_i_can_remove_when_value_is_not_low_level(self, context, manager):
# In this test, there is a value is only in the current level
# The value is deleted
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
# add an ontology layer
# so that the value does not exist in the lower level ontology
manager.push_ontology("new ontology")
# value in DB but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", "value")
manager.delete("cache_name", "key")
manager.commit(context)
assert manager.get("cache_name", "key") is NotFound
# sanity check, the entry is removed
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {}}
def test_i_can_remove_list_if_needed_when_value_is_in_low_level(self, context, manager):
# In this test, there are multiple values in a low level ontology
# We remove only one,
# So the top level ontology must be a copy minus the removed value
cache = ListIfNeededCache().auto_configure("cache_name")
manager.register_cache("cache_name", cache)
manager.freeze()
# default ontology
# value in DB but not in cache
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("cache_name", "key", ["value", "value2"])
# add ontology layers
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
manager.delete("cache_name", "key", "value")
manager.commit(context)
assert manager.get("cache_name", "key") == "value2"
# sanity check, the entry is removed
assert manager.ontologies[0].cache_manager.caches["cache_name"].cache.copy() == {"key": "value2"}
assert manager.ontologies[0].cache_manager.sdp.state.data == {'cache_name': {"key": "value2"}}
assert manager.ontologies[1].cache_manager.caches["cache_name"].cache.copy() == {}
assert manager.ontologies[1].cache_manager.sdp.state.data == {}
assert manager.ontologies[2].cache_manager.caches["cache_name"].cache.copy() == {"key": ["value", "value2"]}
assert manager.ontologies[2].cache_manager.sdp.state.data == {'cache_name': {"key": ["value", "value2"]}}
# The entry still exists in lower ontology
manager.pop_ontology(context)
manager.pop_ontology(context)
assert manager.get("cache_name", "key") == ["value", "value2"]
def test_i_can_add_concept_default_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
manager.commit(context)
assert manager.get("by_key", foo.key) == foo
assert manager.get("by_id", foo.id) == foo
assert manager.current_sdp().get("by_key", foo.key) == foo
assert manager.current_sdp().get("by_id", foo.id) == foo
def test_i_can_add_concept_in_top_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
# add an ontology layer
manager.push_ontology("new ontology")
manager.add_concept(foo)
manager.commit(context)
assert manager.get("by_key", foo.key) == foo
assert manager.get("by_id", foo.id) == foo
assert manager.current_sdp().get("by_key", foo.key) == foo
assert manager.current_sdp().get("by_id", foo.id) == foo
# sanity check
assert list(manager.ontologies[0].cache_manager.sdp.state.data.keys()) == ['by_id', 'by_key']
assert manager.ontologies[1].cache_manager.sdp.state.data == {}
def test_i_can_add_the_concepts_with_the_same_key_from_different_layers(self, context, manager, next_id):
foo1 = get_metadata("foo x", body="x + 1", variables=["x"]).auto_init(next_id)
foo2 = get_metadata("foo x", body="x + 1", variables=["x"]).auto_init(next_id)
cache = ListIfNeededCache().auto_configure("by_key")
manager.register_concept_cache("by_key", cache, lambda obj: obj.key, use_ref=True)
manager.freeze()
manager.add_concept(foo1)
manager.commit(context)
manager.push_ontology("new ontology")
manager.add_concept(foo2)
manager.commit(context)
assert manager.current_sdp().get("by_key", foo1.key) == [foo1, foo2]
def test_i_can_update_concept_in_default_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo", body="body").auto_init(next_id)
manager.add_concept(foo)
modified = foo.clone(body="new body")
assert foo != modified
manager.update_concept(foo, modified)
manager.commit(context)
assert manager.get("by_key", foo.key) == modified
assert manager.get("by_id", foo.id) == modified
def test_i_can_update_concept_in_top_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
manager.commit(context)
# add an ontology layer
manager.push_ontology("new ontology")
modified = foo.clone(body="new body")
assert foo != modified
manager.update_concept(foo, modified)
manager.commit(context)
assert manager.get("by_key", foo.key) == modified
assert manager.get("by_id", foo.id) == modified
# sanity check.
# make sure that the previous values are kept
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == modified
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
# so I can get the old values when I pop ontology
manager.pop_ontology(context)
assert manager.get("by_key", foo.key) == foo
assert manager.get("by_id", foo.id) == foo
def test_i_can_update_when_concept_in_both_top_and_bottom_layers(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
manager.commit(context)
# add an ontology layer
# and modify the concept
# The database is updated, but not the internal cache
manager.push_ontology("new ontology")
modified1 = foo.clone(body="new body")
assert foo != modified1
with manager.current_sdp().get_transaction(context.event) as transaction:
transaction.add("by_key", foo.key, modified1)
transaction.add("by_id", foo.id, modified1)
# modify the top layer a second time
modified2 = foo.clone(body="body", pre="True")
manager.update_concept(foo, modified2)
manager.commit(context)
assert manager.get("by_key", foo.key) == modified2
assert manager.get("by_id", foo.id) == modified2
# sanity check.
# make sure that the previous values are kept
# sanity check
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == modified2
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified2
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
# so I can get the old values when I pop ontology
manager.pop_ontology(context)
assert manager.get("by_key", foo.key) == foo
assert manager.get("by_id", foo.id) == foo
def test_i_can_update_when_the_key_changes(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo", id="1", key="my_key")
# create an entry
manager.add_concept(foo)
manager.commit(context)
# add a new layer, and modify the concept
manager.push_ontology("new ontology")
modified = foo.clone(key="another key")
manager.update_concept(foo, modified)
manager.commit(context)
assert manager.get("by_id", modified.id) == modified
assert manager.get("by_key", modified.key) == modified
assert manager.get("by_key", foo.key) == NotFound
# sanity
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == Removed
assert manager.ontologies[0].cache_manager.sdp.get('by_key', modified.key) == modified
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == foo
assert manager.ontologies[1].cache_manager.sdp.get('by_key', modified.key) == NotFound
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
def test_i_can_update_when_key_changes_and_there_are_lists(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo, foo2, bar = get_metadatas("foo", get_metadata("foo", body="x"), "bar", next_id=next_id)
# create entries
manager.add_concept(foo)
manager.add_concept(foo2)
manager.add_concept(bar)
manager.commit(context)
# add a new layer, and modify the concept
manager.push_ontology("new ontology")
modified = foo.clone(key="bar")
manager.update_concept(foo, modified)
manager.commit(context)
assert manager.get("by_id", modified.id) == modified
assert manager.get("by_key", modified.key) == [bar, modified]
assert manager.get("by_key", foo.key) == foo2
# sanity check
assert manager.ontologies[0].cache_manager.sdp.get('by_key', foo.key) == foo2
assert manager.ontologies[0].cache_manager.sdp.get('by_key', modified.key) == [bar, modified]
assert manager.ontologies[0].cache_manager.sdp.get('by_id', foo.id) == modified
assert manager.ontologies[1].cache_manager.sdp.get('by_key', foo.key) == [foo, foo2]
assert manager.ontologies[1].cache_manager.sdp.get('by_key', modified.key) == bar
assert manager.ontologies[1].cache_manager.sdp.get('by_id', foo.id) == foo
def test_i_can_remove_concept_from_default_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
manager.commit(context)
manager.remove_concept(foo)
manager.commit(context)
assert manager.get("by_id", foo.id) == NotFound
assert manager.get("by_key", foo.key) == NotFound
# sanity check
assert manager.current_sdp().get("by_key") == {}
assert manager.current_sdp().get("by_id") == {}
def test_i_can_remove_concept_from_top_layer(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
manager.commit(context)
# add a new layer, and remove the concept
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
manager.remove_concept(foo)
manager.commit(context)
assert manager.get("by_id", foo.id) == NotFound
assert manager.get("by_key", foo.key) == NotFound
# sanity check
assert manager.current_sdp().get("by_id") == {foo.id: Removed}
assert manager.current_sdp().get("by_key") == {foo.key: Removed}
assert manager.ontologies[1].cache_manager.sdp.get("by_id") == NotFound
assert manager.ontologies[1].cache_manager.sdp.get("by_key") == NotFound
assert manager.ontologies[2].cache_manager.sdp.get("by_id") == {foo.id: foo}
assert manager.ontologies[2].cache_manager.sdp.get("by_key") == {foo.key: foo}
# So I can pop
manager.pop_ontology(context)
assert manager.get("by_id", foo.id) == foo
assert manager.get("by_key", foo.key) == foo
# and pop again
manager.pop_ontology(context)
assert manager.get("by_id", foo.id) == foo
assert manager.get("by_key", foo.key) == foo
def test_i_can_get_all(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key_to_remove1", "value1")
manager.put("cache_name", "key_to_remove2", "value1")
manager.put("cache_name", "key1", "value1")
manager.put("cache_name", "key2", "value2_in_sdp")
manager.put("cache_name", "key3", "value3")
manager.commit(context)
manager.put("cache_name", "key2", "value2_in_cache") # in cache, but not in remote sdp
# add ontology layers
manager.push_ontology("new ontology")
manager.put("cache_name", "key1", "value1_from_new_ontology")
manager.put("cache_name", "key2", "value2_from_new_ontology")
manager.put("cache_name", "key4", "value4_in_sdp")
manager.commit(context)
manager.put("cache_name", "key4", "value4_in_cache")
manager.put("cache_name", "key_to_remove1", Removed)
manager.push_ontology("another ontology")
with manager.current_sdp().get_transaction(context.event) as transaction:
# so that value is only in sdp, not in cache
transaction.add("cache_name", "key5", "value5")
transaction.add("cache_name", "key_to_remove2", Removed)
assert manager.get_all("cache_name") == {
"key1": "value1_from_new_ontology",
"key2": "value2_from_new_ontology",
"key3": "value3",
"key4": "value4_in_cache",
"key5": "value5"
}
def test_i_can_keep_track_of_created_concepts_by_ontologies(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
def from_cache(entry):
return manager.internal_cache_manager.copy(entry)
def from_db(entry):
return manager.internal_cache_manager.sdp.get(entry)
# check that the new concept is tracked
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'}}
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__'}
# add a new ontology and make sure the new concepts are tracked
manager.push_ontology("new ontology")
bar = get_metadata("bar").auto_init(next_id)
manager.add_concept(bar)
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
'new ontology': {'1002'}}
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
'1002': 'new ontology'}
# commit the info and check the DB
manager.commit(context)
assert from_db(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
'new ontology': {'1002'}}
assert from_db(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
'1002': 'new ontology'}
# remove a concept a check
manager.remove_concept(foo)
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'new ontology': {'1002'}, }
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1002': 'new ontology', }
manager.remove_concept(bar)
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {}
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {}
# commit again and check
manager.commit(context)
assert from_db(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {}
assert from_db(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {}
@pytest.mark.skip("No rule yet !")
def test_i_can_keep_track_of_created_rules_by_ontologies(self, manager):
sheerka, context, rule1 = self.init_format_rules(("rule1", "id.attr == 'value'", "True"))
def rules_by_ontology_from_cache():
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
return res
def ontologies_from_cache():
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
def rules_by_ontology_from_db():
res = sheerka.om.internal_cache_manager.sdp.get(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
return res
def ontologies_from_db():
res = sheerka.om.internal_cache_manager.sdp.get(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
assert rules_by_ontology_from_cache() == {"#unit_test#": {rule1.id}}
assert ontologies_from_cache() == {rule1.id: "#unit_test#"}
# add a new rule from a new ontology and check
sheerka.push_ontology(context, "new ontology")
rule2 = Rule(ACTION_TYPE_EXEC, "rule2", "id2.attr2 == 'value'", "True")
sheerka.create_new_rule(context, rule2)
assert rules_by_ontology_from_cache() == {"#unit_test#": {rule1.id}, "new ontology": {rule2.id}}
assert ontologies_from_cache() == {rule1.id: "#unit_test#", rule2.id: "new ontology"}
# commit and check the result
sheerka.om.commit(context)
assert rules_by_ontology_from_db() == {"#unit_test#": {rule1.id}, "new ontology": {rule2.id}}
assert ontologies_from_db() == {rule1.id: "#unit_test#", rule2.id: "new ontology"}
sheerka.remove_rule(context, rule1)
assert rules_by_ontology_from_cache() == {"new ontology": {rule2.id}}
assert ontologies_from_cache() == {rule2.id: "new ontology"}
# remove the last rule
sheerka.remove_rule(context, rule2)
assert rules_by_ontology_from_cache() == {}
assert ontologies_from_cache() == {}
# commit and check the db
sheerka.om.commit(context)
assert rules_by_ontology_from_db() == {}
assert ontologies_from_db() == {}
def test_i_can_keep_track_of_created_concept_on_ontology_pop(self, context, manager, next_id):
self.init_by_id_and_by_name(manager)
foo = get_metadata("foo").auto_init(next_id)
manager.add_concept(foo)
def from_cache(entry):
return manager.internal_cache_manager.copy(entry)
manager.push_ontology("new ontology")
bar, baz = get_metadatas("bar", "baz", next_id=next_id)
manager.add_concept(bar)
manager.add_concept(baz)
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'},
'new ontology': {'1002', '1003'}}
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__',
'1002': 'new ontology',
'1003': 'new ontology'}
manager.pop_ontology(context)
assert from_cache(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {'__default__': {'1001'}}
assert from_cache(SheerkaOntologyManager.ONTOLOGY_BY_CONCEPT_ENTRY) == {'1001': '__default__'}
@pytest.mark.skip("No rule yet !")
def test_i_can_keep_track_of_created_rules_on_ontology_pop(self, manager):
sheerka, context, rule1 = self.init_format_rules(("rule1", "id.attr == 'value'", "True"))
events_raised = set()
sheerka.subscribe(EVENT_RULE_ID_DELETED, lambda ctx, r: events_raised.add(r))
def rules_by_ontology_from_cache():
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
del res[SheerkaOntologyManager.ROOT_ONTOLOGY_NAME] # discard builtin rules
return res
def ontologies_from_cache():
res = sheerka.om.internal_cache_manager.copy(SheerkaOntologyManager.ONTOLOGY_BY_RULE_ENTRY)
return {k: v for k, v in res.items() if v != SheerkaOntologyManager.ROOT_ONTOLOGY_NAME}
sheerka.push_ontology(context, "new ontology")
sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule2", "id2.attr2 == 'value'", "True"))
sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule3", "id3.attr3 == 'value'", "True"))
sheerka.pop_ontology(context)
assert rules_by_ontology_from_cache() == {'#unit_test#': {'11'}}
assert ontologies_from_cache() == {'11': '#unit_test#'}
# check that the 'rule is deleted' events are raised
assert events_raised == {'12', '13'}
def test_i_can_get_call_when_a_cache_is_cleared(self, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key1", "value1")
manager.put("cache_name", "key2", "value2")
manager.put("cache_name", "key3", "value3")
# add ontology layers
manager.push_ontology("new ontology")
manager.clear("cache_name")
manager.put("cache_name", "key1", "new value1")
manager.put("cache_name", "key4", "value4")
manager.push_ontology("another ontology")
manager.put("cache_name", "key5", "value5")
assert manager.get_all("cache_name") == {
"key1": "new value1",
"key4": "value4",
"key5": "value5"
}
def test_i_can_get_all_when_inc_cache(self, manager):
manager.register_cache("cache_name", IncCache().auto_configure("cache_name"))
manager.freeze()
assert manager.get("cache_name", "key1") == 1
assert manager.get("cache_name", "key1") == 2
manager.push_ontology("new ontology")
assert manager.get("cache_name", "key1") == 3
assert manager.get("cache_name", "key2") == 1
assert manager.get("cache_name", "key2") == 2
assert manager.get_all("cache_name") == {
"key1": 3,
"key2": 2,
}
# a second time, to make sure that nothing was incremented
assert manager.get_all("cache_name") == {
"key1": 3,
"key2": 2,
}
@pytest.mark.parametrize("all_ontologies, expected_in_layer_1", [
(False, {}),
(True, {'key1': DummyObj(key='key1', value='value1'),
'key2': DummyObj(key='key2', value='value2')}),
])
def test_i_can_populate(self, context, manager, all_ontologies, expected_in_layer_1):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key1", DummyObj("key1", "value1"))
manager.put("cache_name", "key2", DummyObj("key2", "value2"))
manager.commit(context)
manager.clear("cache_name")
manager.push_ontology("new ontology")
manager.put("cache_name", "key2", DummyObj("key2", "value22"))
manager.put("cache_name", "key3", DummyObj("key3", "value3"))
manager.commit(context)
manager.clear("cache_name")
# sanity check
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == {}
manager.populate("cache_name",
lambda sdp: sdp.list("cache_name"),
lambda obj: obj.key,
all_ontologies=all_ontologies)
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {
'key2': DummyObj(key='key2', value='value22'),
'key3': DummyObj(key='key3', value='value3')}
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == expected_in_layer_1
def test_i_can_clear_when_multiple_ontology_layers(self, context, manager):
manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
manager.freeze()
manager.put("cache_name", "key1", "value1")
manager.put("cache_name", "key2", "value2")
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': 'value1',
'key2': 'value2'}
# I can clear in another layer
manager.push_ontology("new ontology")
manager.clear("cache_name")
assert manager.get("cache_name", "key1") is NotFound
assert manager.get("cache_name", "key2") is NotFound
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
manager.put("cache_name", "key1", "new value1")
assert manager.get("cache_name", "key1") == "new value1"
assert manager.get("cache_name", "key2") is NotFound
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1"}
manager.push_ontology("another ontology")
manager.put("cache_name", "key2", "new value2")
assert manager.get("cache_name", "key1") == "new value1"
assert manager.get("cache_name", "key2") == "new value2"
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1",
'key2': "new value2"}
manager.clear("cache_name")
assert manager.get("cache_name", "key1") is NotFound
assert manager.get("cache_name", "key2") is NotFound
assert manager.ontologies[0].cache_manager.get_inner_cache("cache_name").copy() == {}
assert manager.ontologies[1].cache_manager.get_inner_cache("cache_name").copy() == {'key1': "new value1"}
assert manager.ontologies[2].cache_manager.get_inner_cache("cache_name").copy() == {'key1': 'value1',
'key2': 'value2'}
manager.pop_ontology(context)
assert manager.get("cache_name", "key1") == "new value1"
assert manager.get("cache_name", "key2") is NotFound
manager.pop_ontology(context)
assert manager.get("cache_name", "key1") == "value1"
assert manager.get("cache_name", "key2") == "value2"
def test_already_on_the_top(self, manager):
manager.freeze()
manager.push_ontology("new ontology")
assert manager.already_on_top("new ontology")
assert not manager.already_on_top("another ontology")
def test_already_on_the_top_when_the_ontology_already_exists(self, manager):
manager.freeze()
manager.push_ontology("new ontology")
manager.push_ontology("another ontology")
with pytest.raises(OntologyAlreadyExists):
assert manager.already_on_top("new ontology")
# class TestSheerkaOntologyWithFileBasedSheerka(UsingFileBasedSheerka):
# def test_i_can_put_back_ontology(self, manager):
# sheerka = self.get_sheerka()
#
#
#
# manager.register_cache("cache_name", Cache().auto_configure("cache_name"))
# manager.freeze()
#
# # default layer
# with manager.current_sdp().get_transaction(context.event) as transaction:
# transaction.add("cache_name", "key", "value1")
#
# # add a layer
# manager.push_ontology("new ontology")
# with manager.current_sdp().get_transaction(context.event) as transaction:
# transaction.add("cache_name", "key", "value2")
#
# assert manager.get("cache_name", "key") == "value2"
#
# manager.pop_ontology(context)
# assert manager.get("cache_name", "key") == "value1"
#
# # put back the previous ontology
# manager.push_ontology("new ontology")
# assert manager.get("cache_name", "key") == "value2"
#
# def test_i_can_remember_concept_and_rules_by_ontology(self, manager):
# sheerka, context, foo, r1 = self.init_test().with_concepts(
# "foo",
# create_new=True
# ).with_format_rules(
# ("rule1", "__ret", "True"),
# ).unpack()
# sheerka.om.commit(context)
#
# sheerka = self.new_sheerka_instance(False)
#
#
# sheerka.create_new_concept(context, Concept("bar"))
# r2 = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule2", "__ret.status", "True")).body.body
# sheerka.om.commit(context)
#
# sheerka.push_ontology(context, "new ontology")
# sheerka.create_new_concept(context, Concept("baz"))
# sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule3", "id3.attr3 == 'value'", "True"))
# sheerka.om.commit(context)
#
# sheerka = self.new_sheerka_instance(False)
#
#
# sheerka.push_ontology(context, "another ontology")
# sheerka.create_new_concept(context, Concept("qux"))
# r4 = sheerka.create_new_rule(context, Rule(ACTION_TYPE_EXEC, "rule4", "id4.attr4", "True")).body.body
# sheerka.remove_concept(context, foo)
# sheerka.remove_rule(context, r2)
# sheerka.om.commit(context)
#
# assert sheerka.om.self_cache_manager.copy(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {
# '#unit_test#': {'1002'},
# 'another ontology': {'1004'},
# }
#
# assert sheerka.om.self_cache_manager.copy(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY) == {
# '#unit_test#': {r1.id},
# 'another ontology': {r4.id},
# }
#
# # in db
# assert sheerka.om.self_cache_manager.sdp.get(SheerkaOntologyManager.CONCEPTS_BY_ONTOLOGY_ENTRY) == {
# '#unit_test#': {'1002'},
# 'another ontology': {'1004'},
# 'new ontology': {'1003'}}
#
# rules_from_db = sheerka.om.self_cache_manager.sdp.get(SheerkaOntologyManager.RULES_BY_ONTOLOGY_ENTRY)
# del rules_from_db["__default__"]
# assert rules_from_db == {
# '#unit_test#': {'11'},
# 'another ontology': {'14'},
# 'new ontology': {'13'}}