import logging import uuid from enum import Enum from typing import Optional from fasthtml.fastapp import fast_app from myutils.observable import make_observable, bind, collect_return_values from myfasthtml.core.constants import Routes, ROUTE_ROOT from myfasthtml.core.utils import get_default_attr bindings_app, bindings_rt = fast_app() logger = logging.getLogger("Bindings") class UpdateMode(Enum): ValueChange = "ValueChange" AttributePresence = "AttributePresence" class DetectionMode(Enum): ValueChange = "ValueChange" AttributePresence = "AttributePresence" class AttrChangedDetection: """ Base class for detecting changes in an attribute of a data object. The when a modification is triggered we can * Search for the attribute that is modified (usual case) * Look if the attribute is present in the data object (for example when a checkbox is toggled) """ def __init__(self, attr): self.attr = attr def matches(self, values): pass class ValueChangedDetection(AttrChangedDetection): """ Search for the attribute that is modified. """ def matches(self, values): for key, value in values.items(): if key == self.attr: return True, value return False, None class AttrPresentDetection(AttrChangedDetection): """ Search if the attribute is present in the data object. """ def matches(self, values): return True, values.get(self.attr, None) class FtUpdate: def update(self, ft, ft_name, ft_attr, old, new): pass class ValueChangeFtUpdate(FtUpdate): def update(self, ft, ft_name, ft_attr, old, new): # simple mode, just update the text or the attribute if ft_attr is None: ft.children = (new,) else: ft.attrs[ft_attr] = new return ft class AttributePresenceFtUpdate(FtUpdate): def update(self, ft, ft_name, ft_attr, old, new): # attribute presence mode, toggle the attribute (add or remove it) if ft_attr is None: ft.children = (bool(new),) else: ft.attrs[ft_attr] = "true" if new else None # FastHtml auto remove None attributes return ft class DataConverter: def convert(self, data): pass class BooleanConverter(DataConverter): def convert(self, data): if data is None: return False if isinstance(data, int): return data != 0 if str(data).lower() in ("true", "yes", "on"): return True return False class Binding: def __init__(self, data, attr=None, data_converter: DataConverter = None, ft=None, ft_name=None, ft_attr=None, detection_mode: DetectionMode = DetectionMode.ValueChange, update_mode: UpdateMode = UpdateMode.ValueChange): """ Creates a new binding object between a data object used as a pivot and an HTML element. The same pivot object must be used for different bindings. This will allow the binding between the HTML elements :param data: object used as a pivot :param attr: attribute of the data object :param ft: HTML element to bind to :param ft_name: name of the HTML element to bind to (send by the form) :param ft_attr: value of the attribute to bind to (send by the form) """ self.id = uuid.uuid4() self.htmx_extra = {} self.data = data self.data_attr = attr or get_default_attr(data) self.data_converter = data_converter self.ft = self._safe_ft(ft) self.ft_name = ft_name self.ft_attr = ft_attr self.detection_mode = detection_mode self.update_mode = update_mode self._detection = self._factory(detection_mode) self._update = self._factory(update_mode) make_observable(self.data) bind(self.data, self.data_attr, self.notify) # register the command BindingsManager.register(self) def bind_ft(self, ft, name, attr=None, data_converter: DataConverter = None, detection_mode: DetectionMode = None, update_mode: UpdateMode = None): """ Update the elements to bind to :param ft: :param name: :param attr: :param data_converter: :param detection_mode: :param update_mode: :return: """ self.ft = self._safe_ft(ft) self.ft_name = name self.ft_attr = attr or self.ft_attr self.data_converter = data_converter or self.data_converter self.detection_mode = detection_mode or self.detection_mode self.update_mode = update_mode or self.update_mode self._detection = self._factory(self.detection_mode) self._update = self._factory(self.update_mode) return self def get_htmx_params(self): return self.htmx_extra | { "hx-post": f"{ROUTE_ROOT}{Routes.Bindings}", "hx-vals": f'{{"b_id": "{self.id}"}}', } def notify(self, old, new): logger.debug(f"Binding '{self.id}': Changing from '{old}' to '{new}'") self.ft = self._update.update(self.ft, self.ft_name, self.ft_attr, old, new) self.ft.attrs["hx-swap-oob"] = "true" return self.ft def update(self, values: dict): logger.debug(f"Binding '{self.id}': Updating with {values=}.") matches, value = self._detection.matches(values) if matches: setattr(self.data, self.data_attr, self.data_converter.convert(value) if self.data_converter else value) res = collect_return_values(self.data) return res else: logger.debug(f"Nothing to trigger in {values}.") return None @staticmethod def _safe_ft(ft): """ Make sure the ft has an id. :param ft: :return: """ if ft is None: return None if ft.attrs.get("id", None) is None: ft.attrs["id"] = str(uuid.uuid4()) return ft def _factory(self, mode): if mode == DetectionMode.ValueChange: return ValueChangedDetection(self.ft_name) elif mode == DetectionMode.AttributePresence: return AttrPresentDetection(self.ft_name) elif mode == UpdateMode.ValueChange: return ValueChangeFtUpdate() elif mode == UpdateMode.AttributePresence: return AttributePresenceFtUpdate() else: raise ValueError(f"Invalid detection mode: {mode}") def htmx(self, trigger=None): if trigger: self.htmx_extra["hx-trigger"] = trigger return self class BindingsManager: bindings = {} @staticmethod def register(binding: Binding): BindingsManager.bindings[str(binding.id)] = binding @staticmethod def get_binding(binding_id: str) -> Optional[Binding]: return BindingsManager.bindings.get(str(binding_id)) @staticmethod def reset(): return BindingsManager.bindings.clear()