Files
MyFastHtml/src/myfasthtml/core/instances.py

235 lines
7.0 KiB
Python

import logging
import uuid
from typing import Optional
from dbengine.utils import get_class
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.utils import pascal_to_snake, snake_to_pascal
logger = logging.getLogger("InstancesManager")
special_session = {
"user_info": {"id": "** SPECIAL SESSION **"}
}
class DuplicateInstanceError(Exception):
def __init__(self, instance):
self.instance = instance
class BaseInstance:
"""
Base class for all instances (manageable by InstancesManager)
"""
def __new__(cls, *args, **kwargs):
# Extract arguments from both positional and keyword arguments
# Signature matches __init__: parent, session=None, _id=None, auto_register=True
parent = args[0] if len(args) > 0 and isinstance(args[0], BaseInstance) else kwargs.get("parent", None)
session = args[1] if len(args) > 1 and isinstance(args[1], dict) else kwargs.get("session", None)
_id = args[2] if len(args) > 2 and isinstance(args[2], str) else kwargs.get("_id", None)
# Compute _id
_id = cls.compute_id(_id, parent)
if session is None:
if parent is not None:
session = parent.get_session()
else:
raise TypeError("Either session or parent must be provided")
session_id = InstancesManager.get_session_id(session)
key = (session_id, _id)
if key in InstancesManager.instances:
res = InstancesManager.instances[key]
if type(res) is not cls:
raise TypeError(f"Instance with id {_id} already exists, but is of type {type(res)}")
return res
# Otherwise create a new instance
instance = super().__new__(cls)
instance._is_new_instance = True # mark as fresh
return instance
def __init__(self, parent: Optional['BaseInstance'],
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
if not getattr(self, "_is_new_instance", False):
# Skip __init__ if instance already existed
return
elif not isinstance(self, UniqueInstance):
# No more __init__ unless it's UniqueInstance
self._is_new_instance = False
self._parent = parent
self._session = session or (parent.get_session() if parent else None)
self._id = self.compute_id(_id, parent)
self._prefix = self._id if isinstance(self, (UniqueInstance, SingleInstance)) else self.compute_prefix()
if auto_register:
InstancesManager.register(self._session, self)
def get_session(self) -> dict:
return self._session
def get_id(self) -> str:
return self._id
def get_parent(self) -> Optional['BaseInstance']:
return self._parent
def get_prefix(self) -> str:
return self._prefix
def get_full_id(self) -> str:
return f"{InstancesManager.get_session_id(self._session)}-{self._id}"
def get_full_parent_id(self) -> Optional[str]:
parent = self.get_parent()
return parent.get_full_id() if parent else None
@classmethod
def compute_prefix(cls):
return f"mf-{pascal_to_snake(cls.__name__)}"
@classmethod
def compute_id(cls, _id: Optional[str], parent: Optional['BaseInstance']):
if _id is None:
prefix = cls.compute_prefix()
if issubclass(cls, SingleInstance):
_id = prefix
else:
_id = f"{prefix}-{str(uuid.uuid4())}"
return _id
if _id.startswith("-") and parent is not None:
return f"{parent.get_prefix()}{_id}"
return _id
class SingleInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
"""
def __init__(self,
parent: Optional[BaseInstance] = None,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class UniqueInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
But unlike SingleInstance, the __init__ is called every time it's instantiated.
"""
def __init__(self,
parent: Optional[BaseInstance] = None,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class MultipleInstance(BaseInstance):
"""
Base class for instances that can have multiple instances at a time.
"""
def __init__(self, parent: BaseInstance,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class InstancesManager:
instances = {}
@staticmethod
def register(session: dict, instance: BaseInstance):
"""
Register an instance in the manager, so that it can be retrieved later.
:param session:
:param instance:
:return:
"""
key = (InstancesManager.get_session_id(session), instance.get_id())
if isinstance(instance, SingleInstance) and key in InstancesManager.instances:
raise DuplicateInstanceError(instance)
InstancesManager.instances[key] = instance
return instance
@staticmethod
def get(session: dict, instance_id: str):
"""
Get or create an instance of the given type (from its id)
:param session:
:param instance_id:
:return:
"""
key = (InstancesManager.get_session_id(session), instance_id)
return InstancesManager.instances[key]
@staticmethod
def get_session_id(session):
if session is None:
return "** NOT LOGGED IN **"
if "user_info" not in session:
return "** UNKNOWN USER **"
return session["user_info"].get("id", "** INVALID SESSION **")
@staticmethod
def get_session_user_name(session):
if session is None:
return "** NOT LOGGED IN **"
if "user_info" not in session:
return "** UNKNOWN USER **"
return session["user_info"].get("username", "** INVALID SESSION **")
@staticmethod
def reset():
InstancesManager.instances.clear()
@staticmethod
def clear_session(session):
"""
Remove all instances belonging to the given session.
:param session:
:return:
"""
session_id = InstancesManager.get_session_id(session)
InstancesManager.instances = {
key: instance
for key, instance in InstancesManager.instances.items()
if key[0] != session_id
}
@staticmethod
def dynamic_get(parent: BaseInstance, component_type: str, instance_id: str):
logger.debug(f"Dynamic get: {component_type=} {instance_id=}")
cls = InstancesManager._get_class_name(component_type)
fully_qualified_name = f"myfasthtml.controls.{cls}.{cls}"
cls = get_class(fully_qualified_name)
return cls(parent, instance_id)
@staticmethod
def _get_class_name(component_type: str) -> str:
component_type = component_type.replace("mf-", "")
component_type = snake_to_pascal(component_type)
return component_type
RootInstance = SingleInstance(None, special_session, Ids.Root)