First implementation of bindings
This commit is contained in:
462
src/myfasthtml/core/bindings.py
Normal file
462
src/myfasthtml/core/bindings.py
Normal file
@@ -0,0 +1,462 @@
|
||||
import logging
|
||||
import uuid
|
||||
from enum import Enum
|
||||
from typing import Optional, Any
|
||||
|
||||
from fasthtml.components import Option
|
||||
from fasthtml.fastapp import fast_app
|
||||
from myutils.observable import make_observable, bind, collect_return_values, unbind
|
||||
|
||||
from myfasthtml.core.constants import Routes, ROUTE_ROOT
|
||||
from myfasthtml.core.utils import get_default_attr, get_default_ft_attr, is_checkbox, is_radio, is_select, is_datalist
|
||||
|
||||
bindings_app, bindings_rt = fast_app()
|
||||
logger = logging.getLogger("Bindings")
|
||||
|
||||
|
||||
class UpdateMode(Enum):
|
||||
ValueChange = "ValueChange"
|
||||
AttributePresence = "AttributePresence"
|
||||
SelectValueChange = "SelectValueChange"
|
||||
DatalistListChange = "DatalistListChange"
|
||||
|
||||
|
||||
class DetectionMode(Enum):
|
||||
ValueChange = "ValueChange"
|
||||
AttributePresence = "AttributePresence"
|
||||
SelectValueChange = "SelectValueChange"
|
||||
|
||||
|
||||
class AttrChangedDetection:
|
||||
"""
|
||||
Base class for detecting changes in an attribute of a data object.
|
||||
The when a modification is triggered we can
|
||||
* Search for the attribute that is modified (usual case)
|
||||
* Look if the attribute is present in the data object (for example when a checkbox is toggled)
|
||||
"""
|
||||
|
||||
def __init__(self, attr):
|
||||
self.attr = attr
|
||||
|
||||
def matches(self, values):
|
||||
pass
|
||||
|
||||
|
||||
class ValueChangedDetection(AttrChangedDetection):
|
||||
"""
|
||||
Search for the attribute that is modified.
|
||||
"""
|
||||
|
||||
def matches(self, values):
|
||||
for key, value in values.items():
|
||||
if key == self.attr:
|
||||
return True, value
|
||||
|
||||
return False, None
|
||||
|
||||
|
||||
class SelectValueChangedDetection(AttrChangedDetection):
|
||||
"""
|
||||
Search for the attribute that is modified.
|
||||
"""
|
||||
|
||||
def matches(self, values):
|
||||
for key, value in values.items():
|
||||
if key == self.attr:
|
||||
return True, value
|
||||
|
||||
return True, []
|
||||
|
||||
|
||||
class AttrPresentDetection(AttrChangedDetection):
|
||||
"""
|
||||
Search if the attribute is present in the data object.
|
||||
"""
|
||||
|
||||
def matches(self, values):
|
||||
return True, values.get(self.attr, None)
|
||||
|
||||
|
||||
class FtUpdate:
|
||||
def update(self, ft, ft_name, ft_attr, old, new, converter):
|
||||
pass
|
||||
|
||||
|
||||
class ValueChangeFtUpdate(FtUpdate):
|
||||
def update(self, ft, ft_name, ft_attr, old, new, converter):
|
||||
# simple mode, just update the text or the attribute
|
||||
new_to_use = converter.convert(new) if converter else new
|
||||
if ft_attr is None:
|
||||
ft.children = (new_to_use,)
|
||||
else:
|
||||
ft.attrs[ft_attr] = new_to_use
|
||||
return ft
|
||||
|
||||
|
||||
class SelectValueChangeFtUpdate(FtUpdate):
|
||||
def update(self, ft, ft_name, ft_attr, old, new, converter):
|
||||
# simple mode, just update the text or the attribute
|
||||
new_to_use = converter.convert(new) if converter else new
|
||||
new_to_use = [new_to_use] if not isinstance(new_to_use, list) else new_to_use
|
||||
for child in [c for c in ft.children if c.tag == "option"]:
|
||||
if child.attrs.get("value", None) in new_to_use:
|
||||
child.attrs["selected"] = "true"
|
||||
else:
|
||||
child.attrs.pop("selected", None)
|
||||
return ft
|
||||
|
||||
|
||||
class DatalistListChangeFtUpdate(FtUpdate):
|
||||
def update(self, ft, ft_name, ft_attr, old, new, converter):
|
||||
new_to_use = converter.convert(new) if converter else new
|
||||
ft.children = tuple([Option(value=v) for v in new_to_use])
|
||||
return ft
|
||||
|
||||
|
||||
class AttributePresenceFtUpdate(FtUpdate):
|
||||
def update(self, ft, ft_name, ft_attr, old, new, converter):
|
||||
# attribute presence mode, toggle the attribute (add or remove it)
|
||||
new_to_use = converter.convert(new) if converter else new
|
||||
if ft_attr is None:
|
||||
ft.children = (bool(new_to_use),)
|
||||
else:
|
||||
ft.attrs[ft_attr] = "true" if new_to_use else None # FastHtml auto remove None attributes
|
||||
return ft
|
||||
|
||||
|
||||
class DataConverter:
|
||||
def convert(self, data):
|
||||
pass
|
||||
|
||||
|
||||
class BooleanConverter(DataConverter):
|
||||
def convert(self, data):
|
||||
if data is None:
|
||||
return False
|
||||
|
||||
if isinstance(data, int):
|
||||
return data != 0
|
||||
|
||||
if str(data).lower() in ("true", "yes", "on"):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class ListConverter(DataConverter):
|
||||
def convert(self, data):
|
||||
if data is None:
|
||||
return []
|
||||
|
||||
if isinstance(data, str):
|
||||
return data.split("\n")
|
||||
|
||||
if isinstance(data, (list, set, tuple)):
|
||||
return data
|
||||
|
||||
return [data]
|
||||
|
||||
|
||||
class RadioConverter(DataConverter):
|
||||
def __init__(self, radio_value):
|
||||
self.radio_value = radio_value
|
||||
|
||||
def convert(self, data):
|
||||
return data == self.radio_value
|
||||
|
||||
|
||||
class Binding:
|
||||
def __init__(self, data: Any, attr: str = None, converter: DataConverter = None):
|
||||
"""
|
||||
Creates a new binding object between a data object and an HTML element.
|
||||
The binding is not active until bind_ft() is called.
|
||||
|
||||
Args:
|
||||
data: Object used as a pivot
|
||||
attr: Attribute of the data object to bind
|
||||
"""
|
||||
self.id = uuid.uuid4()
|
||||
self.htmx_extra = {}
|
||||
self.data = data
|
||||
self.data_attr = attr or get_default_attr(data)
|
||||
self.data_converter = converter
|
||||
|
||||
# UI-related attributes (configured later via bind_ft)
|
||||
self.ft = None
|
||||
self.ft_name = None
|
||||
self.ft_attr = None
|
||||
self.detection_mode = DetectionMode.ValueChange
|
||||
self.update_mode = UpdateMode.ValueChange
|
||||
|
||||
# Strategy objects (configured later)
|
||||
self._detection = None
|
||||
self._update = None
|
||||
|
||||
# Activation state
|
||||
self._is_active = False
|
||||
|
||||
def bind_ft(self,
|
||||
ft,
|
||||
attr=None,
|
||||
name=None,
|
||||
data_converter: DataConverter = None,
|
||||
detection_mode: DetectionMode = None,
|
||||
update_mode: UpdateMode = None):
|
||||
"""
|
||||
Configure the UI element and activate the binding.
|
||||
|
||||
Args:
|
||||
ft: HTML element to bind to
|
||||
name: Name of the HTML element (sent by the form)
|
||||
attr: Attribute of the HTML element to bind to
|
||||
data_converter: Optional converter for data transformation
|
||||
detection_mode: How to detect changes from UI
|
||||
update_mode: How to update the UI element
|
||||
|
||||
Returns:
|
||||
self for method chaining
|
||||
"""
|
||||
# Deactivate if already active
|
||||
if self._is_active:
|
||||
self.deactivate()
|
||||
|
||||
if ft.tag in ["input", "textarea", "select"]:
|
||||
# I must not force the htmx
|
||||
if {"hx-post", "hx_post"} & set(ft.attrs.keys()):
|
||||
raise ValueError(f"Binding '{self.id}': htmx post already set on input.")
|
||||
|
||||
# update the component to post on the correct route input and forms only
|
||||
htmx = self.get_htmx_params()
|
||||
ft.attrs |= htmx
|
||||
|
||||
# Configure UI elements
|
||||
self.ft = self._safe_ft(ft)
|
||||
self.ft_name = name or ft.attrs.get("name")
|
||||
self.ft_attr = attr or get_default_ft_attr(ft)
|
||||
|
||||
if is_checkbox(ft):
|
||||
default_data_converter = self.data_converter or BooleanConverter()
|
||||
default_detection_mode = DetectionMode.AttributePresence
|
||||
default_update_mode = UpdateMode.AttributePresence
|
||||
elif is_radio(ft):
|
||||
default_data_converter = self.data_converter or RadioConverter(ft.attrs["value"])
|
||||
default_detection_mode = DetectionMode.ValueChange
|
||||
default_update_mode = UpdateMode.AttributePresence
|
||||
elif is_select(ft):
|
||||
default_data_converter = self.data_converter
|
||||
default_detection_mode = DetectionMode.SelectValueChange
|
||||
default_update_mode = UpdateMode.SelectValueChange
|
||||
elif is_datalist(ft):
|
||||
default_data_converter = self.data_converter or ListConverter()
|
||||
default_detection_mode = DetectionMode.SelectValueChange
|
||||
default_update_mode = UpdateMode.DatalistListChange
|
||||
else:
|
||||
default_data_converter = self.data_converter
|
||||
default_detection_mode = DetectionMode.ValueChange
|
||||
default_update_mode = UpdateMode.ValueChange
|
||||
|
||||
# Update optional parameters if provided
|
||||
self.data_converter = data_converter or default_data_converter
|
||||
self.detection_mode = detection_mode or default_detection_mode
|
||||
self.update_mode = update_mode or default_update_mode
|
||||
|
||||
# Create strategy objects
|
||||
self._detection = self._factory(self.detection_mode)
|
||||
self._update = self._factory(self.update_mode)
|
||||
|
||||
# Activate the binding
|
||||
self.activate()
|
||||
|
||||
return self
|
||||
|
||||
def get_htmx_params(self):
|
||||
return self.htmx_extra | {
|
||||
"hx-post": f"{ROUTE_ROOT}{Routes.Bindings}",
|
||||
"hx-vals": f'{{"b_id": "{self.id}"}}',
|
||||
}
|
||||
|
||||
def init(self):
|
||||
"""
|
||||
Initialise the UI element with the value of the data
|
||||
:return:
|
||||
"""
|
||||
old_value = None # to complicated to retrieve as it depends on the nature of self.ft
|
||||
new_value = getattr(self.data, self.data_attr)
|
||||
self.notify(old_value, new_value)
|
||||
return self
|
||||
|
||||
def notify(self, old, new):
|
||||
"""
|
||||
Callback when the data attribute changes.
|
||||
Updates the UI element accordingly.
|
||||
|
||||
Args:
|
||||
old: Previous value
|
||||
new: New value
|
||||
|
||||
Returns:
|
||||
Updated ft element
|
||||
"""
|
||||
if not self._is_active:
|
||||
logger.warning(f"Binding '{self.id}' received notification but is not active")
|
||||
return None
|
||||
|
||||
logger.debug(f"Binding '{self.id}': Changing from '{old}' to '{new}'")
|
||||
self.ft = self._update.update(self.ft, self.ft_name, self.ft_attr, old, new, self.data_converter)
|
||||
|
||||
self.ft.attrs["hx-swap-oob"] = "true"
|
||||
return self.ft
|
||||
|
||||
def update(self, values: dict):
|
||||
"""
|
||||
Called by the FastHTML router when a request is received.
|
||||
:param values:
|
||||
:return: the list of updated elements (all elements that are bound to this binding)
|
||||
"""
|
||||
logger.debug(f"Binding '{self.id}': Updating with {values=}.")
|
||||
matches, value = self._detection.matches(values)
|
||||
if matches:
|
||||
setattr(self.data, self.data_attr, value)
|
||||
res = collect_return_values(self.data)
|
||||
return res
|
||||
|
||||
else:
|
||||
logger.debug(f"Nothing to trigger in {values}.")
|
||||
return None
|
||||
|
||||
def activate(self):
|
||||
"""
|
||||
Activate the binding by setting up observers and registering it.
|
||||
Should only be called after the binding is fully configured.
|
||||
|
||||
Raises:
|
||||
ValueError: If the binding is not fully configured
|
||||
"""
|
||||
if self._is_active:
|
||||
logger.warning(f"Binding '{self.id}' is already active")
|
||||
return
|
||||
|
||||
# Validate configuration
|
||||
self._validate_configuration()
|
||||
|
||||
# Setup observable
|
||||
make_observable(self.data)
|
||||
bind(self.data, self.data_attr, self.notify)
|
||||
|
||||
# Register in manager
|
||||
BindingsManager.register(self)
|
||||
|
||||
# Mark as active
|
||||
self._is_active = True
|
||||
|
||||
logger.debug(f"Binding '{self.id}' activated for {self.data_attr}")
|
||||
|
||||
def deactivate(self):
|
||||
"""
|
||||
Deactivate the binding by removing observers and unregistering it.
|
||||
Can be called multiple times safely.
|
||||
"""
|
||||
if not self._is_active:
|
||||
logger.debug(f"Binding '{self.id}' is not active, nothing to deactivate")
|
||||
return
|
||||
|
||||
# Remove observer
|
||||
unbind(self.data, self.data_attr, self.notify)
|
||||
|
||||
# Unregister from manager
|
||||
BindingsManager.unregister(self.id)
|
||||
|
||||
# Mark as inactive
|
||||
self._is_active = False
|
||||
|
||||
logger.debug(f"Binding '{self.id}' deactivated")
|
||||
|
||||
@staticmethod
|
||||
def _safe_ft(ft):
|
||||
"""
|
||||
Make sure the ft has an id.
|
||||
:param ft:
|
||||
:return:
|
||||
"""
|
||||
if ft is None:
|
||||
return None
|
||||
|
||||
if ft.attrs.get("id", None) is None:
|
||||
ft.attrs["id"] = str(uuid.uuid4())
|
||||
return ft
|
||||
|
||||
def _factory(self, mode):
|
||||
if mode == DetectionMode.ValueChange:
|
||||
return ValueChangedDetection(self.ft_name)
|
||||
|
||||
elif mode == DetectionMode.AttributePresence:
|
||||
return AttrPresentDetection(self.ft_name)
|
||||
|
||||
elif mode == DetectionMode.SelectValueChange:
|
||||
return SelectValueChangedDetection(self.ft_name)
|
||||
|
||||
elif mode == UpdateMode.ValueChange:
|
||||
return ValueChangeFtUpdate()
|
||||
|
||||
elif mode == UpdateMode.AttributePresence:
|
||||
return AttributePresenceFtUpdate()
|
||||
|
||||
elif mode == UpdateMode.SelectValueChange:
|
||||
return SelectValueChangeFtUpdate()
|
||||
|
||||
elif mode == UpdateMode.DatalistListChange:
|
||||
return DatalistListChangeFtUpdate()
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid detection mode: {mode}")
|
||||
|
||||
def _validate_configuration(self):
|
||||
"""
|
||||
Validate that the binding is fully configured before activation.
|
||||
|
||||
Raises:
|
||||
ValueError: If required configuration is missing
|
||||
"""
|
||||
if self.ft is None:
|
||||
raise ValueError(f"Binding '{self.id}': ft element is required")
|
||||
|
||||
# if self.ft_name is None:
|
||||
# raise ValueError(f"Binding '{self.id}': ft_name is required")
|
||||
|
||||
if self._detection is None:
|
||||
raise ValueError(f"Binding '{self.id}': detection strategy not initialized")
|
||||
|
||||
if self._update is None:
|
||||
raise ValueError(f"Binding '{self.id}': update strategy not initialized")
|
||||
|
||||
def htmx(self, trigger=None):
|
||||
if trigger:
|
||||
self.htmx_extra["hx-trigger"] = trigger
|
||||
return self
|
||||
|
||||
|
||||
class BindingsManager:
|
||||
bindings = {}
|
||||
|
||||
@staticmethod
|
||||
def register(binding: Binding):
|
||||
BindingsManager.bindings[str(binding.id)] = binding
|
||||
|
||||
@staticmethod
|
||||
def unregister(binding_id: str):
|
||||
"""
|
||||
Unregister a binding from the manager.
|
||||
|
||||
Args:
|
||||
binding_id: ID of the binding to unregister
|
||||
"""
|
||||
if str(binding_id) in BindingsManager.bindings:
|
||||
del BindingsManager.bindings[str(binding_id)]
|
||||
|
||||
@staticmethod
|
||||
def get_binding(binding_id: str) -> Optional[Binding]:
|
||||
return BindingsManager.bindings.get(str(binding_id))
|
||||
|
||||
@staticmethod
|
||||
def reset():
|
||||
return BindingsManager.bindings.clear()
|
||||
Reference in New Issue
Block a user