import logging import uuid from typing import Optional from myfasthtml.controls.helpers import Ids from myfasthtml.core.utils import pascal_to_snake logger = logging.getLogger("InstancesManager") special_session = { "user_info": {"id": "** SPECIAL SESSION **"} } class DuplicateInstanceError(Exception): def __init__(self, instance): self.instance = instance class BaseInstance: """ Base class for all instances (manageable by InstancesManager) """ def __new__(cls, *args, **kwargs): # Extract arguments from both positional and keyword arguments # Signature matches __init__: parent, session=None, _id=None, auto_register=True parent = args[0] if len(args) > 0 and isinstance(args[0], BaseInstance) else kwargs.get("parent", None) session = args[1] if len(args) > 1 and isinstance(args[1], dict) else kwargs.get("session", None) _id = args[2] if len(args) > 2 and isinstance(args[2], str) else kwargs.get("_id", None) # Compute _id _id = cls.compute_id(_id, parent) if session is None: if parent is not None: session = parent.get_session() else: raise TypeError("Either session or parent must be provided") session_id = InstancesManager.get_session_id(session) key = (session_id, _id) if key in InstancesManager.instances: res = InstancesManager.instances[key] if type(res) is not cls: raise TypeError(f"Instance with id {_id} already exists, but is of type {type(res)}") return res # Otherwise create a new instance instance = super().__new__(cls) instance._is_new_instance = True # mark as fresh return instance def __init__(self, parent: Optional['BaseInstance'], session: Optional[dict] = None, _id: Optional[str] = None, auto_register: bool = True): if not getattr(self, "_is_new_instance", False): # Skip __init__ if instance already existed return elif not isinstance(self, UniqueInstance): # No more __init__ unless it's UniqueInstance self._is_new_instance = False self._parent = parent self._session = session or (parent.get_session() if parent else None) self._id = self.compute_id(_id, parent) self._prefix = self._id if isinstance(self, (UniqueInstance, SingleInstance)) else self.compute_prefix() if auto_register: InstancesManager.register(self._session, self) def get_session(self) -> dict: return self._session def get_id(self) -> str: return self._id def get_parent(self) -> Optional['BaseInstance']: return self._parent def get_prefix(self) -> str: return self._prefix def get_full_id(self) -> str: return f"{InstancesManager.get_session_id(self._session)}#{self._id}" def get_full_parent_id(self) -> Optional[str]: parent = self.get_parent() return parent.get_full_id() if parent else None @classmethod def compute_prefix(cls): return f"mf-{pascal_to_snake(cls.__name__)}" @classmethod def compute_id(cls, _id: Optional[str], parent: Optional['BaseInstance']): if _id is None: prefix = cls.compute_prefix() if issubclass(cls, SingleInstance): _id = prefix else: _id = f"{prefix}-{str(uuid.uuid4())}" return _id if _id.startswith("-") and parent is not None: return f"{parent.get_prefix()}{_id}" return _id class SingleInstance(BaseInstance): """ Base class for instances that can only have one instance at a time. """ def __init__(self, parent: Optional[BaseInstance] = None, session: Optional[dict] = None, _id: Optional[str] = None, auto_register: bool = True): super().__init__(parent, session, _id, auto_register) class UniqueInstance(BaseInstance): """ Base class for instances that can only have one instance at a time. But unlike SingleInstance, the __init__ is called every time it's instantiated. """ def __init__(self, parent: Optional[BaseInstance] = None, session: Optional[dict] = None, _id: Optional[str] = None, auto_register: bool = True): super().__init__(parent, session, _id, auto_register) class MultipleInstance(BaseInstance): """ Base class for instances that can have multiple instances at a time. """ def __init__(self, parent: BaseInstance, session: Optional[dict] = None, _id: Optional[str] = None, auto_register: bool = True): super().__init__(parent, session, _id, auto_register) class InstancesManager: instances = {} @staticmethod def register(session: dict, instance: BaseInstance): """ Register an instance in the manager, so that it can be retrieved later. :param session: :param instance: :return: """ key = (InstancesManager.get_session_id(session), instance.get_id()) if isinstance(instance, SingleInstance) and key in InstancesManager.instances: raise DuplicateInstanceError(instance) InstancesManager.instances[key] = instance return instance @staticmethod def get(session: dict, instance_id: str): """ Get or create an instance of the given type (from its id) :param session: :param instance_id: :return: """ session_id = InstancesManager.get_session_id(session) key = (session_id, instance_id) return InstancesManager.instances[key] @staticmethod def get_by_type(session: dict, cls: type): session_id = InstancesManager.get_session_id(session) res = [i for s, i in InstancesManager.instances.items() if s[0] == session_id and isinstance(i, cls)] assert len(res) <= 1, f"Multiple instances of type {cls.__name__} found" assert len(res) > 0, f"No instance of type {cls.__name__} found" return res[0] @staticmethod def get_session_id(session): if isinstance(session, str): return session if session is None: return "** NOT LOGGED IN **" if "user_info" not in session: return "** UNKNOWN USER **" return session["user_info"].get("id", "** INVALID SESSION **") @staticmethod def get_session_user_name(session): if session is None: return "** NOT LOGGED IN **" if "user_info" not in session: return "** UNKNOWN USER **" return session["user_info"].get("username", "** INVALID SESSION **") @staticmethod def reset(): InstancesManager.instances.clear() @staticmethod def clear_session(session): """ Remove all instances belonging to the given session. :param session: :return: """ session_id = InstancesManager.get_session_id(session) InstancesManager.instances = { key: instance for key, instance in InstancesManager.instances.items() if key[0] != session_id } RootInstance = SingleInstance(None, special_session, Ids.Root)