I can bind elements

This commit is contained in:
2025-11-01 23:44:18 +01:00
parent 991a6f07ff
commit aaba6a5468
14 changed files with 463 additions and 109 deletions

View File

@@ -1,14 +1,15 @@
from fasthtml.components import *
from myfasthtml.core.bindings import Binding
from myfasthtml.core.commands import Command
from myfasthtml.core.utils import merge_classes
from myfasthtml.core.utils import merge_classes, get_default_ft_attr
class mk:
@staticmethod
def button(element, command: Command = None, **kwargs):
return mk.manage_command(Button(element, **kwargs), command)
def button(element, command: Command = None, binding: Binding = None, **kwargs):
return mk.mk(Button(element, **kwargs), command=command, binding=binding)
@staticmethod
def icon(icon, size=20,
@@ -16,6 +17,7 @@ class mk:
can_hover=False,
cls='',
command: Command = None,
binding: Binding = None,
**kwargs):
merged_cls = merge_classes(f"mf-icon-{size}",
'icon-btn' if can_select else '',
@@ -23,11 +25,32 @@ class mk:
cls,
kwargs)
return mk.manage_command(Div(icon, cls=merged_cls, **kwargs), command)
return mk.mk(Div(icon, cls=merged_cls, **kwargs), command=command, binding=binding)
@staticmethod
def manage_command(ft, command: Command):
htmx = command.get_htmx_params() if command else {}
ft.attrs |= htmx
if command:
htmx = command.get_htmx_params()
ft.attrs |= htmx
return ft
@staticmethod
def manage_binding(ft, binding: Binding):
if binding:
# update the component to post on the correct route
htmx = binding.get_htmx_params()
ft.attrs |= htmx
# update the binding with the ft
ft_attr = binding.ft_attr or get_default_ft_attr(ft)
ft_name = ft.attrs.get("name")
binding.bind_ft(ft, ft_name, ft_attr) # force the ft
return ft
@staticmethod
def mk(ft, command: Command = None, binding: Binding = None):
ft = mk.manage_command(ft, command)
ft = mk.manage_binding(ft, binding)
return ft

View File

@@ -0,0 +1,114 @@
import logging
import uuid
from typing import Optional
from fasthtml.fastapp import fast_app
from myutils.observable import make_observable, bind, collect_return_values
from myfasthtml.core.constants import Routes, ROUTE_ROOT
from myfasthtml.core.utils import get_default_attr
bindings_app, bindings_rt = fast_app()
logger = logging.getLogger("Bindings")
class Binding:
def __init__(self, data, attr=None, ft=None, ft_name=None, ft_attr=None):
"""
Creates a new binding object between a data object used as a pivot and an HTML element.
The same pivot object must be used for different bindings.
This will allow the binding between the HTML elements
:param data: object used as a pivot
:param attr: attribute of the data object
:param ft: HTML element to bind to
:param ft_name: name of the HTML element to bind to (send by the form)
:param ft_attr: value of the attribute to bind to (send by the form)
"""
self.id = uuid.uuid4()
self.htmx_extra = {}
self.data = data
self.data_attr = attr or get_default_attr(data)
self.ft = self._safe_ft(ft)
self.ft_name = ft_name
self.ft_attr = ft_attr
make_observable(self.data)
bind(self.data, self.data_attr, self.notify)
# register the command
BindingsManager.register(self)
def bind_ft(self, ft, name, attr=None):
"""
Update the elements to bind to
:param ft:
:param name:
:param attr:
:return:
"""
self.ft = self._safe_ft(ft)
self.ft_name = name
self.ft_attr = attr
def get_htmx_params(self):
return self.htmx_extra | {
"hx-post": f"{ROUTE_ROOT}{Routes.Bindings}",
"hx-vals": f'{{"b_id": "{self.id}"}}',
}
def notify(self, old, new):
logger.debug(f"Binding '{self.id}': Changing from '{old}' to '{new}'")
if self.ft_attr is None:
self.ft.children = (new,)
else:
self.ft.attrs[self.ft_attr] = new
self.ft.attrs["hx-swap-oob"] = "true"
return self.ft
def update(self, values: dict):
logger.debug(f"Binding '{self.id}': Updating with {values=}.")
for key, value in values.items():
if key == self.ft_name:
setattr(self.data, self.data_attr, value)
res = collect_return_values(self.data)
return res
else:
logger.debug(f"Nothing to trigger in {values}.")
return None
@staticmethod
def _safe_ft(ft):
"""
Make sure the ft has an id.
:param ft:
:return:
"""
if ft is None:
return None
if ft.attrs.get("id", None) is None:
ft.attrs["id"] = str(uuid.uuid4())
return ft
def htmx(self, trigger=None):
if trigger:
self.htmx_extra["hx-trigger"] = trigger
return self
class BindingsManager:
bindings = {}
@staticmethod
def register(binding: Binding):
BindingsManager.bindings[str(binding.id)] = binding
@staticmethod
def get_binding(binding_id: str) -> Optional[Binding]:
return BindingsManager.bindings.get(str(binding_id))
@staticmethod
def reset():
return BindingsManager.bindings.clear()

View File

@@ -1,14 +1,7 @@
import logging
import uuid
from typing import Optional
from fasthtml.fastapp import fast_app
from myfasthtml.core.constants import Routes, ROUTE_ROOT
from myfasthtml.core.utils import mount_if_not_exists
commands_app, commands_rt = fast_app()
logger = logging.getLogger("Commands")
class BaseCommand:
@@ -98,31 +91,3 @@ class CommandsManager:
@staticmethod
def reset():
return CommandsManager.commands.clear()
@commands_rt(Routes.Commands)
def post(session: str, c_id: str):
"""
Default routes for all commands.
:param session:
:param c_id:
:return:
"""
logger.debug(f"Entering {Routes.Commands} with {session=}, {c_id=}")
command = CommandsManager.get_command(c_id)
if command:
return command.execute()
raise ValueError(f"Command with ID '{c_id}' not found.")
def mount_commands(app):
"""
Mounts the commands_app to the given application instance if the route does not already exist.
:param app: The application instance to which the commands_app will be mounted.
:type app: Any
:return: Returns the result of the mount operation performed by mount_if_not_exists.
:rtype: Any
"""
return mount_if_not_exists(app, ROUTE_ROOT, commands_app)

View File

@@ -1,4 +1,5 @@
ROUTE_ROOT = "/myfasthtml"
class Routes:
Commands = "/commands"
Commands = "/commands"
Bindings = "/bindings"

View File

@@ -1,4 +1,12 @@
from starlette.routing import Mount
import logging
from fasthtml.fastapp import fast_app
from starlette.routing import Mount, Route
from myfasthtml.core.constants import Routes, ROUTE_ROOT
utils_app, utils_rt = fast_app()
logger = logging.getLogger("Commands")
def mount_if_not_exists(app, path: str, sub_app):
@@ -46,3 +54,83 @@ def merge_classes(*args):
return " ".join(unique_elements)
else:
return None
def debug_routes(app):
for route in app.router.routes:
if isinstance(route, Mount):
for sub_route in route.app.router.routes:
print(f"path={route.path}{sub_route.path}, method={sub_route.methods}, endpoint={sub_route.endpoint}")
elif isinstance(route, Route):
print(f"path={route.path}, methods={route.methods}, endpoint={route.endpoint}")
def mount_utils(app):
"""
Mounts the commands_app to the given application instance if the route does not already exist.
:param app: The application instance to which the commands_app will be mounted.
:type app: Any
:return: Returns the result of the mount operation performed by mount_if_not_exists.
:rtype: Any
"""
return mount_if_not_exists(app, ROUTE_ROOT, utils_app)
def get_default_ft_attr(ft):
"""
for every type of HTML element (ft) gives the default attribute to use for binding
:param ft:
:return:
"""
if ft.tag == "input":
if ft.attrs.get("type") == "checkbox":
return "checked"
elif ft.attrs.get("type") == "radio":
return "checked"
elif ft.attrs.get("type") == "file":
return "files"
else:
return "value"
else:
return None # indicate that the content of the FT should be updated
def get_default_attr(data):
all_attrs = data.__dict__.keys()
return next(iter(all_attrs))
@utils_rt(Routes.Commands)
def post(session: str, c_id: str):
"""
Default routes for all commands.
:param session:
:param c_id:
:return:
"""
logger.debug(f"Entering {Routes.Commands} with {session=}, {c_id=}")
from myfasthtml.core.commands import CommandsManager
command = CommandsManager.get_command(c_id)
if command:
return command.execute()
raise ValueError(f"Command with ID '{c_id}' not found.")
@utils_rt(Routes.Bindings)
def post(session: str, b_id: str, values: dict):
"""
Default routes for all bindings.
:param session:
:param b_id:
:param values:
:return:
"""
logger.debug(f"Entering {Routes.Bindings} with {session=}, {b_id=}, {values=}")
from myfasthtml.core.bindings import BindingsManager
binding = BindingsManager.get_binding(b_id)
if binding:
return binding.update(values)
raise ValueError(f"Binding with ID '{b_id}' not found.")

View File

@@ -1,3 +1,4 @@
import logging
from importlib.resources import files
from pathlib import Path
from typing import Optional, Any
@@ -8,7 +9,9 @@ from starlette.responses import Response
from myfasthtml.auth.routes import setup_auth_routes
from myfasthtml.auth.utils import create_auth_beforeware
from myfasthtml.core.commands import commands_app
from myfasthtml.core.utils import utils_app
logger = logging.getLogger("MyFastHtml")
def get_asset_path(filename):
@@ -85,8 +88,8 @@ def create_app(daisyui: Optional[bool] = True,
# and put it back after the myfasthtml static files routes
app.routes.append(static_route_exts_get)
# route the commands
app.mount("/myfasthtml", commands_app)
# route the commands and the bindings
app.mount("/myfasthtml", utils_app)
if mount_auth_app:
# Setup authentication routes

View File

@@ -10,7 +10,7 @@ from fasthtml.common import FastHTML
from starlette.responses import Response
from starlette.testclient import TestClient
from myfasthtml.core.commands import mount_commands
from myfasthtml.core.utils import mount_utils
verbs = {
'hx_get': 'GET',
@@ -130,8 +130,15 @@ class TestableElement:
if data is not None:
headers['Content-Type'] = 'application/x-www-form-urlencoded'
bag_to_use = data
elif json_data is not None:
headers['Content-Type'] = 'application/json'
bag_to_use = json_data
else:
# default to json_data
headers['Content-Type'] = 'application/json'
json_data = {}
bag_to_use = json_data
# .props contains the kwargs passed to the object (e.g., hx_post="/url")
element_attrs = self.my_ft.attrs or {}
@@ -151,11 +158,10 @@ class TestableElement:
elif key == 'hx_vals':
# hx_vals defines the JSON body, if not already provided by the test
if json_data is None:
if isinstance(value, str):
json_data = json.loads(value)
elif isinstance(value, dict):
json_data = value
if isinstance(value, str):
bag_to_use |= json.loads(value)
elif isinstance(value, dict):
bag_to_use |= value
elif key.startswith('hx_'):
# Any other hx_* attribute is converted to an HTTP header
@@ -924,7 +930,7 @@ class MyTestClient:
self.parent_levels = parent_levels
# make sure that the commands are mounted
mount_commands(self.app)
mount_utils(self.app)
def open(self, path: str) -> Self:
"""
@@ -952,6 +958,8 @@ class MyTestClient:
def send_request(self, method: str, url: str, headers: dict = None, data=None, json_data=None):
if json_data is not None:
json_data['session'] = self._session
if data is not None:
data['session'] = self._session
res = self.client.request(
method,
@@ -1088,7 +1096,7 @@ class MyTestClient:
f"No element found matching selector '{selector}'."
)
elif len(results) == 1:
return TestableElement(self, results[0], results[0].name)
return self._testable_element_factory(results[0])
else:
raise AssertionError(
f"Found {len(results)} elements matching selector '{selector}'. Expected exactly 1."
@@ -1148,6 +1156,12 @@ class MyTestClient:
self._soup = BeautifulSoup(content, 'html.parser')
return self
def _testable_element_factory(self, elt):
if elt.name == "input":
return TestableInput(self, elt)
else:
return TestableElement(self, elt, elt.name)
@staticmethod
def _find_visible_text_element(soup, text: str):
"""