241 lines
7.2 KiB
Python
241 lines
7.2 KiB
Python
import inspect
|
|
import json
|
|
import uuid
|
|
from typing import Optional
|
|
|
|
from myutils.observable import NotObservableError, ObservableResultCollector
|
|
|
|
from myfasthtml.core.constants import Routes, ROUTE_ROOT
|
|
from myfasthtml.core.utils import flatten
|
|
|
|
|
|
class Command:
|
|
"""
|
|
Represents the base command class for defining executable actions.
|
|
|
|
This class serves as a foundation for commands that can be registered,
|
|
executed, and utilized within a system. Each command has a unique identifier,
|
|
a name, and a description. Commands should override the `execute` method
|
|
to provide specific functionality.
|
|
|
|
:ivar id: A unique identifier for the command.
|
|
:type id: uuid.UUID
|
|
:ivar name: The name of the command.
|
|
:type name: str
|
|
:ivar description: A brief description of the command's functionality.
|
|
:type description: str
|
|
"""
|
|
|
|
def __init__(self, name,
|
|
description,
|
|
owner=None,
|
|
callback=None,
|
|
args: list = None,
|
|
kwargs: dict = None,
|
|
key=None,
|
|
auto_register=True):
|
|
self.id = uuid.uuid4()
|
|
self.name = name
|
|
self.description = description
|
|
self.owner = owner
|
|
self.callback = callback
|
|
self.default_args = args or []
|
|
self.default_kwargs = kwargs or {}
|
|
self._htmx_extra = {}
|
|
self._bindings = []
|
|
self._ft = None
|
|
self._callback_parameters = dict(inspect.signature(callback).parameters) if callback else {}
|
|
self._key = key
|
|
|
|
# register the command
|
|
if auto_register:
|
|
if key in CommandsManager.commands_by_key:
|
|
self.id = CommandsManager.commands_by_key[key].id
|
|
else:
|
|
CommandsManager.register(self)
|
|
|
|
def get_key(self):
|
|
return self._key
|
|
|
|
def get_htmx_params(self):
|
|
res = {
|
|
"hx-post": f"{ROUTE_ROOT}{Routes.Commands}",
|
|
"hx-swap": "outerHTML",
|
|
"hx-vals": {"c_id": f"{self.id}"},
|
|
}
|
|
|
|
for k, v in self._htmx_extra.items():
|
|
if k == "hx-post":
|
|
continue # cannot override this one
|
|
elif k == "hx-vals":
|
|
res["hx-vals"] |= v
|
|
else:
|
|
res[k] = v
|
|
|
|
# kwarg are given to the callback as values
|
|
res["hx-vals"] |= self.default_kwargs
|
|
|
|
return res
|
|
|
|
def execute(self, client_response: dict = None):
|
|
with ObservableResultCollector(self._bindings) as collector:
|
|
kwargs = self._create_kwargs(self.default_kwargs,
|
|
client_response,
|
|
{"client_response": client_response or {}})
|
|
ret = self.callback(*self.default_args, **kwargs)
|
|
|
|
ret_from_bound_commands = []
|
|
if self.owner:
|
|
for command in self.owner.get_bound_commands(self.name):
|
|
r = command.execute(client_response)
|
|
ret_from_bound_commands.append(r) # it will be flatten if needed later
|
|
|
|
all_ret = flatten(ret, ret_from_bound_commands, collector.results)
|
|
|
|
# Set the hx-swap-oob attribute on all elements returned by the callback
|
|
for r in all_ret[1:]:
|
|
if (hasattr(r, 'attrs')
|
|
and "hx-swap-oob" not in r.attrs
|
|
and r.get("id", None) is not None):
|
|
r.attrs["hx-swap-oob"] = r.attrs.get("hx-swap-oob", "true")
|
|
|
|
return all_ret[0] if len(all_ret) == 1 else all_ret
|
|
|
|
def htmx(self, target: Optional[str] = "this", swap="outerHTML", trigger=None):
|
|
# Note that the default value is the same than in get_htmx_params()
|
|
if target is None:
|
|
self._htmx_extra["hx-swap"] = "none"
|
|
elif target != "this":
|
|
self._htmx_extra["hx-target"] = target
|
|
|
|
if swap is None:
|
|
self._htmx_extra["hx-swap"] = "none"
|
|
elif swap != "outerHTML":
|
|
self._htmx_extra["hx-swap"] = swap
|
|
|
|
if trigger is not None:
|
|
self._htmx_extra["hx-trigger"] = trigger
|
|
|
|
return self
|
|
|
|
def bind_ft(self, ft):
|
|
"""
|
|
Update the FT with the command's HTMX parameters.
|
|
|
|
:param ft:
|
|
:return:
|
|
"""
|
|
self._ft = ft
|
|
htmx = self.get_htmx_params()
|
|
ft.attrs |= htmx
|
|
return ft
|
|
|
|
def bind(self, data):
|
|
"""
|
|
Attach a binding to the command.
|
|
When done, if a binding is triggered during the execution of the command,
|
|
the results of the binding will be passed to the command's execute() method.
|
|
:param data:
|
|
:return:
|
|
"""
|
|
if not hasattr(data, '_listeners'):
|
|
raise NotObservableError(
|
|
f"Object must be made observable with make_observable() before binding"
|
|
)
|
|
self._bindings.append(data)
|
|
|
|
# by default, remove the swap on the attached element when binding is used
|
|
self._htmx_extra["hx-swap"] = "none"
|
|
|
|
return self
|
|
|
|
@property
|
|
def url(self):
|
|
return f"{ROUTE_ROOT}{Routes.Commands}?c_id={self.id}"
|
|
|
|
def ajax_htmx_options(self):
|
|
res = {
|
|
"url": self.url,
|
|
"target": self._htmx_extra.get("hx-target", "this"),
|
|
"swap": self._htmx_extra.get("hx-swap", "outerHTML"),
|
|
"values": self.default_kwargs
|
|
}
|
|
res["values"]["c_id"] = f"{self.id}" # cannot be overridden
|
|
|
|
return res
|
|
|
|
def get_ft(self):
|
|
return self._ft
|
|
|
|
def _cast_parameter(self, key, value):
|
|
if key in self._callback_parameters:
|
|
param = self._callback_parameters[key]
|
|
if param.annotation == bool:
|
|
return value == "true"
|
|
elif param.annotation == int:
|
|
return int(value)
|
|
elif param.annotation == float:
|
|
return float(value)
|
|
elif param.annotation == list:
|
|
return value.split(",")
|
|
elif param.annotation == dict:
|
|
return json.loads(value)
|
|
return value
|
|
|
|
def _create_kwargs(self, *args):
|
|
"""
|
|
Try to recreate the requested kwargs from the client response and the default kwargs.
|
|
:param args:
|
|
:return:
|
|
"""
|
|
all_args = {}
|
|
for arg in [arg for arg in args if arg is not None]:
|
|
all_args |= arg
|
|
|
|
res = {}
|
|
for k, v in self._callback_parameters.items():
|
|
if k in all_args:
|
|
res[k] = self._cast_parameter(k, all_args[k])
|
|
return res
|
|
|
|
def __str__(self):
|
|
return f"Command({self.name})"
|
|
|
|
|
|
class LambdaCommand(Command):
|
|
def __init__(self, owner, delegate, name="LambdaCommand", description="Lambda Command"):
|
|
super().__init__(name, description, owner, delegate)
|
|
self.htmx(target=None)
|
|
|
|
|
|
class CommandTemplate:
|
|
def __init__(self, key, command_type, args: list = None, kwargs: dict = None):
|
|
self.key = key
|
|
args = args or []
|
|
kwargs = kwargs or {}
|
|
self.command = CommandsManager.get_command_by_key(key) or command_type(*args, **kwargs)
|
|
|
|
|
|
class CommandsManager:
|
|
commands = {} # by_id
|
|
commands_by_key = {}
|
|
|
|
@staticmethod
|
|
def register(command: Command):
|
|
CommandsManager.commands[str(command.id)] = command
|
|
if (key := command.get_key()) is not None:
|
|
CommandsManager.commands_by_key[key] = command
|
|
|
|
@staticmethod
|
|
def get_command(command_id: str) -> Optional[Command]:
|
|
return CommandsManager.commands.get(command_id)
|
|
|
|
@staticmethod
|
|
def get_command_by_key(key):
|
|
return CommandsManager.commands_by_key.get(key, None)
|
|
|
|
@staticmethod
|
|
def reset():
|
|
CommandsManager.commands.clear()
|
|
CommandsManager.commands_by_key.clear()
|