from threading import RLock class BaseCache: """ An in memory FIFO cache object When the max_size is reach the first element that was put is removed When you put the same key twice, the previous element is overridden """ def __init__(self, max_size=None, default=None, extend_exists=None): self._cache = {} self._max_size = max_size self._default = default # default value to return when key is not found. It can be a callable of key self._extend_exists = extend_exists # search in remote self._lock = RLock() self._current_size = 0 self._initialized_keys = set() self.to_add = set() self.to_remove = set() def __len__(self): """ Return the number of items in the cache :return: """ with self._lock: return self._current_size def __contains__(self, key): with self._lock: return key in self._cache def __iter__(self): with self._lock: keys = self._cache.copy() yield from keys def __next__(self): return next(iter(self._cache)) def __repr__(self): return f"{self.__class__.__name__}(size={self._current_size}, #keys={len(self._cache)})" def configure(self, max_size=None, default=None, extend_exists=None): if max_size is not None: self._max_size = max_size if default is not None: self._default = default if extend_exists is not None: self._extend_exists = extend_exists def disable_default(self): self._default = None def put(self, key, value): """ Add a new entry in cache :param key: :param value: :return: """ with self._lock: if self._max_size and self._current_size >= self._max_size: self.evict(self._max_size - self._current_size + 1) if self._put(key, value): self._current_size += 1 def get(self, key): """ Retrieve an entry from the cache If the entry does not exist, will use the 'default' value or delegate :param key: :return: """ with self._lock: self._initialized_keys.add(key) return self._get(key) def inner_get(self, key): return self._cache[key] def update(self, old_key, old_value, new_key, new_value): """ Update an entry in the cache :param old_key: key of the previous version of the entry :param old_value: previous version of the entry :param new_key: key of the entry :param new_value: new value :return: """ with self._lock: self._update(old_key, old_value, new_key, new_value) def delete(self, key, value=None): with self._lock: try: self._delete(key, value) except KeyError: pass def has(self, key): """ Return True if the key is in the cache Never use extend_exist :param key: :return: """ with self._lock: return key in self._cache def exists(self, key): """ Return True if the key is in the cache Can use extend_exist :param key: :return: """ with self._lock: if key in self._cache: return True return self._extend_exists(key) if self._extend_exists else False def evict(self, nb_items): """ Remove nb_items from the cache, using the replacement policy :return: """ with self._lock: nb_items = self._current_size if self._current_size < nb_items else nb_items nb_to_delete = nb_items while nb_items > 0: key = next(iter(self._cache)) del (self._cache[key]) try: self._initialized_keys.remove(key) except KeyError: pass nb_items -= 1 self._current_size -= nb_to_delete return nb_to_delete def evict_by_key(self, predicate): """ Remove entries that matches the predicate :param predicate: :return: """ to_delete = [] with self._lock: for key in self._cache: if predicate(key): to_delete.append(key) for key in to_delete: del (self._cache[key]) try: self._initialized_keys.remove(key) except KeyError: pass return len(to_delete) def clear(self): with self._lock: self._cache.clear() self._current_size = 0 self._initialized_keys.clear() self.to_add.clear() self.to_remove.clear() def dump(self): with self._lock: return { "current_size": self._current_size, "cache": self._cache.copy() } def copy(self): with self._lock: return self._cache.copy() def init_from(self, dump): with self._lock: self._current_size = dump["current_size"] self._cache = dump["cache"].copy() return self def reset_events(self): with self._lock: self.to_add.clear() self.to_remove.clear() def _sync(self, *keys): for key in keys: if key not in self._initialized_keys and self._default: # to keep sync with the remote repo is needed self.get(key) def _add_to_add(self, key): self.to_add.add(key) try: self.to_remove.remove(key) except KeyError: pass def _add_to_remove(self, key): self.to_remove.add(key) try: self.to_add.remove(key) except KeyError: pass def _get(self, key): try: value = self._cache[key] except KeyError: if callable(self._default): value = self._default(key) if value is not None: self._cache[key] = value # update _current_size if isinstance(value, (list, set)): self._current_size += len(value) else: self._current_size += 1 else: value = self._default return value def _put(self, key, value): pass def _update(self, old_key, old_value, new_key, new_value): pass def _delete(self, key, value): raise NotImplementedError() # def _put(self, key, value): # self._cache[key] = value # self._add_to_add(key) # return True # # # def _update(self, old_key, old_value, new_key, new_value): # self._cache[new_key] = new_value # self._add_to_add(new_key) # # if new_key != old_key: # del (self._cache[old_key]) # self._add_to_remove(old_key)