import logging from contextlib import contextmanager from types import SimpleNamespace from dbengine.dbengine import DbEngine from myfasthtml.core.instances import SingleInstance, BaseInstance from myfasthtml.core.utils import retrieve_user_info logger = logging.getLogger("DbManager") class DbManager(SingleInstance): def __init__(self, parent, root=".myFastHtmlDb", auto_register: bool = True): super().__init__(parent, auto_register=auto_register) if not hasattr(self, "db"): # hack to manage singleton inheritance self.db = DbEngine(root=root) def save(self, entry, obj): self.db.save(self.get_tenant(), self.get_user(), entry, obj) def load(self, entry): return self.db.load(self.get_tenant(), entry) def exists_entry(self, entry): return self.db.exists(self.get_tenant(), entry) def get_tenant(self): return retrieve_user_info(self._session)["id"] def get_user(self): return retrieve_user_info(self._session)["email"] class DbObject: """ When you set the attribute, it persists in DB It loads from DB at startup """ _initializing = False _forbidden_attrs = {"_initializing", "_db_manager", "_name", "_owner", "_forbidden_attrs"} def __init__(self, owner: BaseInstance, name=None, db_manager=None, save_state=True): self._owner = owner self._name = name or owner.get_id() if self._name.startswith(("#", "-")) and owner.get_parent() is not None: self._name = owner.get_parent().get_id() + self._name self._db_manager = db_manager or DbManager(self._owner) self._save_state = save_state self._finalize_initialization() @contextmanager def initializing(self): old_state = getattr(self, "_initializing", False) self._initializing = True try: yield finally: self._initializing = old_state self._finalize_initialization() def __setattr__(self, name: str, value: str): if name.startswith("_") or name.startswith("ns_") or getattr(self, "_initializing", False): super().__setattr__(name, value) return if not name.startswith("ne_"): old_value = getattr(self, name, None) if old_value == value: return super().__setattr__(name, value) self._save_self() def _finalize_initialization(self): if getattr(self, "_initializing", False): return # still under initialization if self._reload_self(): # logger.debug(f"finalize_initialization ({self._name}) : Loaded existing content.") return else: # logger.debug(f"finalize_initialization ({self._name}) : No existing content found, creating new entry {self._save_state=}.") self._save_self() def _reload_self(self): if self._db_manager.exists_entry(self._name): props = self._db_manager.load(self._name) self.update(props) return True return False def _save_self(self): if not self._save_state: return props = {k: getattr(self, k) for k, v in self._get_properties().items() if not k.startswith("_") and not k.startswith("ns")} if props: self._db_manager.save(self._name, props) def _get_properties(self): """ Retrieves all the properties of the current object, combining both the properties defined in the class and the instance attributes. :return: A dictionary containing the properties of the object, where keys are property names and values are their corresponding values. """ props = {k: getattr(self, k) for k, v in self.__class__.__dict__.items()} # for dataclass props |= {k: getattr(self, k) for k, v in self.__dict__.items()} # for dataclass props = {k: v for k, v in props.items() if not k.startswith("__")} return props def update(self, *args, **kwargs): if len(args) > 1: raise ValueError("Only one argument is allowed") properties = {} if args: arg = args[0] if arg is None: return self if not isinstance(arg, (dict, SimpleNamespace)): raise ValueError("Only dict or Expando are allowed as argument") properties |= vars(arg) if isinstance(arg, SimpleNamespace) else arg properties |= kwargs # save the new state old_state = getattr(self, "_initializing", False) self._initializing = True for k, v in properties.items(): if hasattr(self, k) and k not in DbObject._forbidden_attrs: # internal variables cannot be updated setattr(self, k, v) self._save_self() self._initializing = old_state return self def copy(self): as_dict = self._get_properties().copy() as_dict = {k: v for k, v in as_dict.items() if k not in DbObject._forbidden_attrs} return SimpleNamespace(**as_dict) def save(self): self._save_self() def reload(self): self._reload_self() def exists(self): return self._db_manager.exists_entry(self._name) def get_id(self): return self._name