Files
MyFastHtml/src/myfasthtml/core/commands.py

320 lines
10 KiB
Python

import html
import inspect
import json
import logging
import uuid
from dataclasses import dataclass
from typing import Optional, Literal
from myutils.observable import NotObservableError, ObservableResultCollector
from myfasthtml.core.constants import Routes, ROUTE_ROOT
from myfasthtml.core.utils import flatten
logger = logging.getLogger("Commands")
AUTO_SWAP_OOB = "__auto_swap_oob__"
@dataclass
class BoundCommand:
"""
Represents a command bound to another command.
Attributes:
command: The command to execute
when: When to execute the bound command ("before" or "after" the main command)
"""
command: 'Command'
when: Literal["before", "after"] = "after"
class Command:
"""
Represents the base command class for defining executable actions.
This class serves as a foundation for commands that can be registered,
executed, and utilized within a system. Each command has a unique identifier,
a name, and a description. Commands should override the `execute` method
to provide specific functionality.
:ivar id: A unique identifier for the command.
:type id: uuid.UUID
:ivar name: The name of the command.
:type name: str
:ivar description: A brief description of the command's functionality.
:type description: str
"""
@staticmethod
def process_key(key, name, owner, args, kwargs):
def _compute_from_args():
res = []
for arg in args:
if hasattr(arg, "get_full_id"):
res.append(arg.get_full_id())
else:
res.append(str(arg))
return "-".join(res)
# special management when kwargs are provided
# In this situation,
# either there is no parameter (so one single instance of the command is enough)
# or the parameter is a kwargs (so the parameters are provided when the command is called)
if key is None:
key_parts = []
if owner is not None:
key_parts.append(f"{owner.get_full_id()}")
key_parts.append(name)
if args:
key_parts.append(_compute_from_args())
key = "-".join(key_parts)
else:
key = key.replace("#{args}", _compute_from_args())
if owner is not None:
key = key.replace("#{id}", owner.get_full_id())
key = key.replace("#{id-name-args}", f"{owner.get_full_id()}-{name}-{_compute_from_args()}")
return key
def __init__(self, name,
description,
owner=None,
callback=None,
args: list = None,
kwargs: dict = None,
key=None,
auto_register=True):
self.id = uuid.uuid4()
self.name = name
self.description = description
self.owner = owner
self.callback = callback
self.default_args = args or []
self.default_kwargs = kwargs or {}
self._htmx_extra = {AUTO_SWAP_OOB: True}
self._bindings = []
self._ft = None
self._callback_parameters = dict(inspect.signature(callback).parameters) if callback else {}
self._key = self.process_key(key, self.name, self.owner, self.default_args, self.default_kwargs)
# register the command
if auto_register:
if self._key is not None:
if self._key in CommandsManager.commands_by_key:
#logger.debug(f"Command {self.name} with key={self._key} will not be registered.")
self.id = CommandsManager.commands_by_key[self._key].id
else:
#logger.debug(f"Command {self.name} with key={self._key} will be registered.")
CommandsManager.register(self)
else:
logger.warning(f"Command {self.name} has no key, it will not be registered.")
def get_key(self):
return self._key
def get_htmx_params(self, escaped=False, values_encode=None):
res = {
"hx-post": f"{ROUTE_ROOT}{Routes.Commands}",
"hx-swap": "outerHTML",
"hx-vals": {"c_id": f"{self.id}"},
}
for k, v in self._htmx_extra.items():
if k == "hx-post":
continue # cannot override this one
elif k == "hx-vals":
res["hx-vals"] |= v
else:
res[k] = v
# kwarg are given to the callback as values
res["hx-vals"] |= self.default_kwargs
if escaped:
res["hx-vals"] = html.escape(json.dumps(res["hx-vals"]))
if values_encode == "json":
res["hx-vals"] = json.dumps(res["hx-vals"])
return res
def execute(self, client_response: dict = None):
logger.debug(f"Executing command {self.name} with arguments {client_response=}")
with ObservableResultCollector(self._bindings) as collector:
# Execute "before" bound commands
ret_from_before_commands = []
if self.owner:
before_commands = [bc for bc in self.owner.get_bound_commands(self.name) if bc.when == "before"]
for bound_cmd in before_commands:
logger.debug(f" will execute bound command {bound_cmd.command.name} BEFORE...")
r = bound_cmd.command.execute(client_response)
ret_from_before_commands.append(r)
# Execute main callback
kwargs = self._create_kwargs(self.default_kwargs,
client_response,
{"client_response": client_response or {}})
ret = self.callback(*self.default_args, **kwargs)
# Execute "after" bound commands
ret_from_after_commands = []
if self.owner:
after_commands = [bc for bc in self.owner.get_bound_commands(self.name) if bc.when == "after"]
for bound_cmd in after_commands:
logger.debug(f" will execute bound command {bound_cmd.command.name} AFTER...")
r = bound_cmd.command.execute(client_response)
ret_from_after_commands.append(r)
all_ret = flatten(ret, ret_from_before_commands, ret_from_after_commands, collector.results)
# Set the hx-swap-oob attribute on all elements returned by the callback
if self._htmx_extra[AUTO_SWAP_OOB]:
for r in all_ret[1:]:
if (hasattr(r, 'attrs')
and "hx-swap-oob" not in r.attrs
and r.get("id", None) is not None):
r.attrs["hx-swap-oob"] = r.attrs.get("hx-swap-oob", "true")
return all_ret[0] if len(all_ret) == 1 else all_ret
def htmx(self, target: Optional[str] = "this", swap="outerHTML", trigger=None, auto_swap_oob=True):
self._htmx_extra[AUTO_SWAP_OOB] = auto_swap_oob
# Note that the default value is the same than in get_htmx_params()
if target is None:
self._htmx_extra["hx-swap"] = "none"
elif target != "this":
self._htmx_extra["hx-target"] = target
if swap is None:
self._htmx_extra["hx-swap"] = "none"
elif swap != "outerHTML":
self._htmx_extra["hx-swap"] = swap
if trigger is not None:
self._htmx_extra["hx-trigger"] = trigger
return self
def bind_ft(self, ft):
"""
Update the FT with the command's HTMX parameters.
:param ft:
:return:
"""
self._ft = ft
htmx = self.get_htmx_params()
ft.attrs |= htmx
return ft
def bind(self, data):
"""
Attach a binding to the command.
When done, if a binding is triggered during the execution of the command,
the results of the binding will be passed to the command's execute() method.
:param data:
:return:
"""
if not hasattr(data, '_listeners'):
raise NotObservableError(
f"Object must be made observable with make_observable() before binding"
)
self._bindings.append(data)
# by default, remove the swap on the attached element when binding is used
self._htmx_extra["hx-swap"] = "none"
return self
@property
def url(self):
return f"{ROUTE_ROOT}{Routes.Commands}?c_id={self.id}"
def ajax_htmx_options(self):
res = {
"url": self.url,
"target": self._htmx_extra.get("hx-target", "this"),
"swap": self._htmx_extra.get("hx-swap", "outerHTML"),
"values": self.default_kwargs
}
res["values"]["c_id"] = f"{self.id}" # cannot be overridden
return res
def get_ft(self):
return self._ft
def _cast_parameter(self, key, value):
if key in self._callback_parameters:
param = self._callback_parameters[key]
if param.annotation == bool:
return value == "true"
elif param.annotation == int:
return int(value)
elif param.annotation == float:
return float(value)
elif param.annotation == list:
return value.split(",")
elif param.annotation == dict:
return json.loads(value)
return value
def _create_kwargs(self, *args):
"""
Try to recreate the requested kwargs from the client response and the default kwargs.
:param args:
:return:
"""
all_args = {}
for arg in [arg for arg in args if arg is not None]:
all_args |= arg
res = {}
for k, v in self._callback_parameters.items():
if k in all_args:
res[k] = self._cast_parameter(k, all_args[k])
return res
def __str__(self):
return f"Command({self.name})"
class LambdaCommand(Command):
def __init__(self, owner, delegate, name="LambdaCommand", description="Lambda Command"):
super().__init__(name, description, owner, delegate)
self.htmx(target=None)
class CommandTemplate:
def __init__(self, key, command_type, args: list = None, kwargs: dict = None):
self.key = key
args = args or []
kwargs = kwargs or {}
self.command = CommandsManager.get_command_by_key(key) or command_type(*args, **kwargs)
class CommandsManager:
commands = {} # by_id
commands_by_key = {}
@staticmethod
def register(command: Command):
CommandsManager.commands[str(command.id)] = command
if (key := command.get_key()) is not None:
CommandsManager.commands_by_key[key] = command
@staticmethod
def get_command(command_id: str) -> Optional[Command]:
return CommandsManager.commands.get(command_id)
@staticmethod
def get_command_by_key(key):
return CommandsManager.commands_by_key.get(key, None)
@staticmethod
def reset():
CommandsManager.commands.clear()
CommandsManager.commands_by_key.clear()