import pytest from base import BaseTest from caching.DictionaryCache import DictionaryCache from common.global_symbols import NotFound from tests.caching import FakeSdp class TestDictionaryCache(BaseTest): @pytest.mark.parametrize('key', [None, "str_value", 0, 1.0]) def test_key_must_be_true_or_false(self, key): cache = DictionaryCache() # key must be true False with pytest.raises(KeyError): cache.put("key", key) def test_value_must_be_a_dictionary(self): cache = DictionaryCache() with pytest.raises(ValueError): cache.put(True, "value") with pytest.raises(ValueError): cache.put(False, "value") def test_i_can_put_and_retrieve_value_from_dictionary_cache(self): cache = DictionaryCache() entry = {"key": "value", "key2": ["value21", "value22"]} cache.put(False, entry) assert len(cache) == 3 assert id(cache._cache) == id(entry) assert cache.get("key") == "value" assert cache.get("key2") == ["value21", "value22"] # I can append values cache.put(True, {"key": "another_value", "key3": "value3"}) assert len(cache) == 4 assert cache.get("key") == "another_value" assert cache.get("key2") == ["value21", "value22"] assert cache.get("key3") == "value3" # I can reset entry = {"key": "value", "key2": ["value21", "value22"]} cache.put(False, entry) assert len(cache) == 3 assert id(cache._cache) == id(entry) assert cache.get("key") == "value" assert cache.get("key2") == ["value21", "value22"] assert cache.get("key3") is NotFound assert cache.copy() == {'key': 'value', 'key2': ['value21', 'value22']} def test_i_can_get_a_value_that_does_not_exist_without_compromising_the_cache(self): cache = DictionaryCache() cache.put(False, {"key": "value"}) assert cache.get("key2") is NotFound assert cache.copy() == {"key": "value"} def test_i_can_append_to_a_dictionary_cache_even_if_it_is_new(self): cache = DictionaryCache() entry = {"key": "value", "key2": ["value21", "value22"]} cache.put(True, entry) assert len(cache) == 3 assert id(cache._cache) != id(entry) assert cache.get("key") == "value" assert cache.get("key2") == ["value21", "value22"] def test_exists_in_dictionary_cache(self): cache = DictionaryCache() assert not cache.exists("key") cache.put(True, {"key": "value"}) assert cache.exists("key") def test_default_for_dictionary_cache(self): cache = DictionaryCache(default={"key": "value", "key2": "value2"}) # cache is fully set when the value is found assert cache.get("key") == "value" assert cache.copy() == {"key": "value", "key2": "value2"} # cache is fully set when the value is not found cache.test_only_reset() assert cache.get("key3") is NotFound assert cache.copy() == {"key": "value", "key2": "value2"} # cache is not corrupted when value is found cache.put(True, {"key3": "value3", "key4": "value4"}) assert cache.get("key3") == "value3" assert cache.copy() == {"key": "value", "key2": "value2", "key3": "value3", "key4": "value4"} # cache is not corrupted when value is not found cache._cache["key"] = "another value" # operation that is normally not possible assert cache.get("key5") is NotFound assert cache.copy() == {"key": "value", "key2": "value2", "key3": "value3", "key4": "value4"} def test_default_callable_for_dictionary_cache(self): cache = DictionaryCache(default=lambda k: {"key": "value", "key2": "value2"}) assert cache.get("key") == "value" assert "key2" in cache assert len(cache) == 2 cache.clear() assert cache.get("key3") is NotFound assert len(cache) == 2 assert "key" in cache assert "key2" in cache def test_default_callable_with_internal_sdp_for_dictionary_cache(self): cache = DictionaryCache(default=lambda sdp, key: sdp.get("cache_name", key), sdp=FakeSdp(lambda entry, k: {"key": "value", "key2": "value2"})) assert cache.get("key") == "value" assert "key2" in cache assert len(cache) == 2 cache.clear() assert cache.get("key3") is NotFound assert len(cache) == 2 assert "key" in cache assert "key2" in cache def test_dictionary_cache_cannot_be_null(self): cache = DictionaryCache(default=lambda k: NotFound) assert cache.get("key") is NotFound assert cache._cache == {} cache = DictionaryCache(default=NotFound) assert cache.get("key") is NotFound assert cache._cache == {} cache = DictionaryCache(default=lambda k: None) assert cache.get("key") is NotFound assert cache._cache == {} cache = DictionaryCache(default=None) assert cache.get("key") is NotFound assert cache._cache == {} def test_auto_configure_retrieves_the_whole_remote_repository(self, sdp, context): cache = DictionaryCache(sdp=sdp).auto_configure("test") with sdp.get_transaction(context.event) as transaction: transaction.add("test", "key1", "value1") transaction.add("test", "key2", "value2") # when call for a value that is not in the cache, Dictionary cache is configured to retrieve the repo cache.get("value") assert cache.copy() == {'key1': 'value1', 'key2': 'value2'} def test_we_do_no_go_twice_in_repo_when_not_found(self, sdp, context): cache = DictionaryCache(sdp=sdp).auto_configure("test") assert cache.get("key") is NotFound # now add a value in remote repo with sdp.get_transaction(context.event) as transaction: transaction.add("test", "key", "value") assert cache.get("key") is NotFound # the key was previously requested