Added first controls

This commit is contained in:
2025-11-26 20:53:12 +01:00
parent 459c89bae2
commit ce5328fe34
68 changed files with 37849 additions and 87048 deletions

View File

@@ -0,0 +1,17 @@
from myfasthtml.auth.utils import login_user, save_user_info, register_user
from myfasthtml.core.instances import SingleInstance
class AuthProxy(SingleInstance):
def __init__(self, parent, base_url: str = None):
super().__init__(parent)
self._base_url = base_url
def login_user(self, email: str, password: str):
return login_user(email, password, self._base_url)
def register_user(self, email: str, username: str, password: str):
return register_user(email, username, password, self._base_url)
def save_user_info(self, access_token: str, user_profile: dict):
return save_user_info(access_token, user_profile, self._base_url)

View File

@@ -129,6 +129,14 @@ class DataConverter:
pass
class LambdaConverter(DataConverter):
def __init__(self, func):
self.func = func
def convert(self, data):
return self.func(data)
class BooleanConverter(DataConverter):
def convert(self, data):
if data is None:
@@ -270,10 +278,10 @@ class Binding:
return self
def get_htmx_params(self):
return self.htmx_extra | {
return {
"hx-post": f"{ROUTE_ROOT}{Routes.Bindings}",
"hx-vals": f'{{"b_id": "{self.id}"}}',
}
} | self.htmx_extra
def init(self):
"""

View File

@@ -1,3 +1,4 @@
import inspect
import uuid
from typing import Optional
@@ -29,20 +30,23 @@ class BaseCommand:
self.description = description
self._htmx_extra = {}
self._bindings = []
self._ft = None
# register the command
CommandsManager.register(self)
def get_htmx_params(self):
return self._htmx_extra | {
return {
"hx-post": f"{ROUTE_ROOT}{Routes.Commands}",
"hx-vals": f'{{"c_id": "{self.id}"}}',
}
"hx-swap": "outerHTML",
"hx-vals": {"c_id": f"{self.id}"},
} | self._htmx_extra
def execute(self):
def execute(self, client_response: dict = None):
raise NotImplementedError
def htmx(self, target="this", swap="innerHTML"):
def htmx(self, target: Optional[str] = "this", swap="outerHTML", trigger=None):
# Note that the default value is the same than in get_htmx_params()
if target is None:
self._htmx_extra["hx-swap"] = "none"
elif target != "this":
@@ -50,8 +54,12 @@ class BaseCommand:
if swap is None:
self._htmx_extra["hx-swap"] = "none"
elif swap != "innerHTML":
elif swap != "outerHTML":
self._htmx_extra["hx-swap"] = swap
if trigger is not None:
self._htmx_extra["hx-trigger"] = trigger
return self
def bind_ft(self, ft):
@@ -61,6 +69,7 @@ class BaseCommand:
:param ft:
:return:
"""
self._ft = ft
htmx = self.get_htmx_params()
ft.attrs |= htmx
return ft
@@ -83,6 +92,16 @@ class BaseCommand:
self._htmx_extra["hx-swap"] = "none"
return self
@property
def url(self):
return f"{ROUTE_ROOT}{Routes.Commands}?c_id={self.id}"
def get_ft(self):
return self._ft
def __str__(self):
return f"Command({self.name})"
class Command(BaseCommand):
@@ -107,10 +126,24 @@ class Command(BaseCommand):
def __init__(self, name, description, callback, *args, **kwargs):
super().__init__(name, description)
self.callback = callback
self.callback_parameters = dict(inspect.signature(callback).parameters)
self.args = args
self.kwargs = kwargs
def execute(self):
def _convert(self, key, value):
if key in self.callback_parameters:
param = self.callback_parameters[key]
if param.annotation == bool:
return value == "true"
elif param.annotation == int:
return int(value)
elif param.annotation == float:
return float(value)
elif param.annotation == list:
return value.split(",")
return value
def execute(self, client_response: dict = None):
ret_from_bindings = []
def binding_result_callback(attr, old, new, results):
@@ -119,21 +152,41 @@ class Command(BaseCommand):
for data in self._bindings:
add_event_listener(ObservableEvent.AFTER_PROPERTY_CHANGE, data, "", binding_result_callback)
ret = self.callback(*self.args, **self.kwargs)
new_kwargs = self.kwargs.copy()
if client_response:
for k, v in client_response.items():
if k in self.callback_parameters:
new_kwargs[k] = self._convert(k, v)
if 'client_response' in self.callback_parameters:
new_kwargs['client_response'] = client_response
ret = self.callback(*self.args, **new_kwargs)
for data in self._bindings:
remove_event_listener(ObservableEvent.AFTER_PROPERTY_CHANGE, data, "", binding_result_callback)
# Set the hx-swap-oob attribute on all elements returned by the callback
if isinstance(ret, (list, tuple)):
for r in ret[1:]:
if hasattr(r, 'attrs') and r.get("id", None) is not None:
r.attrs["hx-swap-oob"] = r.attrs.get("hx-swap-oob", "true")
if not ret_from_bindings:
return ret
if isinstance(ret, list):
return ret + ret_from_bindings
if isinstance(ret, (list, tuple)):
return list(ret) + ret_from_bindings
else:
return [ret] + ret_from_bindings
class LambdaCommand(Command):
def __init__(self, delegate, name="LambdaCommand", description="Lambda Command"):
super().__init__(name, description, delegate)
self.htmx(target=None)
def __str__(self):
return f"Command({self.name})"
def execute(self, client_response: dict = None):
return self.callback(client_response)
class CommandsManager:

View File

@@ -0,0 +1,118 @@
from contextlib import contextmanager
from types import SimpleNamespace
from dbengine.dbengine import DbEngine
from myfasthtml.core.instances import SingleInstance, BaseInstance
from myfasthtml.core.utils import retrieve_user_info
class DbManager(SingleInstance):
def __init__(self, parent, root=".myFastHtmlDb", auto_register: bool = True):
super().__init__(parent, auto_register=auto_register)
self.db = DbEngine(root=root)
def save(self, entry, obj):
self.db.save(self.get_tenant(), self.get_user(), entry, obj)
def load(self, entry):
return self.db.load(self.get_tenant(), entry)
def exists_entry(self, entry):
return self.db.exists(self.get_tenant(), entry)
def get_tenant(self):
return retrieve_user_info(self._session)["id"]
def get_user(self):
return retrieve_user_info(self._session)["email"]
class DbObject:
"""
When you set the attribute, it persists in DB
It loads from DB at startup
"""
_initializing = False
_forbidden_attrs = {"_initializing", "_db_manager", "_name", "_owner", "_forbidden_attrs"}
def __init__(self, owner: BaseInstance, name=None, db_manager=None):
self._owner = owner
self._name = name or self.__class__.__name__
self._db_manager = db_manager or DbManager(self._owner)
self._finalize_initialization()
@contextmanager
def initializing(self):
old_state = getattr(self, "_initializing", False)
self._initializing = True
try:
yield
finally:
self._finalize_initialization()
self._initializing = old_state
def __setattr__(self, name: str, value: str):
if name.startswith("_") or name.startswith("ns") or getattr(self, "_initializing", False):
super().__setattr__(name, value)
return
old_value = getattr(self, name, None)
if old_value == value:
return
super().__setattr__(name, value)
self._save_self()
def _finalize_initialization(self):
if self._db_manager.exists_entry(self._name):
props = self._db_manager.load(self._name)
self.update(props)
else:
self._save_self()
def _save_self(self):
props = {k: getattr(self, k) for k, v in self._get_properties().items() if
not k.startswith("_") and not k.startswith("ns")}
if props:
self._db_manager.save(self._name, props)
def _get_properties(self):
"""
Retrieves all the properties of the current object, combining both the properties defined in
the class and the instance attributes.
:return: A dictionary containing the properties of the object, where keys are property names
and values are their corresponding values.
"""
props = {k: getattr(self, k) for k, v in self.__class__.__dict__.items()} # for dataclass
props |= {k: getattr(self, k) for k, v in self.__dict__.items()} # for dataclass
props = {k: v for k, v in props.items() if not k.startswith("__")}
return props
def update(self, *args, **kwargs):
if len(args) > 1:
raise ValueError("Only one argument is allowed")
properties = {}
if args:
arg = args[0]
if not isinstance(arg, (dict, SimpleNamespace)):
raise ValueError("Only dict or Expando are allowed as argument")
properties |= vars(arg) if isinstance(arg, SimpleNamespace) else arg
properties |= kwargs
# save the new state
old_state = getattr(self, "_initializing", False)
self._initializing = True
for k, v in properties.items():
if hasattr(self, k) and k not in DbObject._forbidden_attrs: # internal variables cannot be updated
setattr(self, k, v)
self._save_self()
self._initializing = old_state
def copy(self):
as_dict = self._get_properties().copy()
as_dict = {k: v for k, v in as_dict.items() if k not in DbObject._forbidden_attrs}
return SimpleNamespace(**as_dict)

View File

@@ -0,0 +1,218 @@
import logging
import uuid
from typing import Optional
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.utils import pascal_to_snake
logger = logging.getLogger("InstancesManager")
special_session = {
"user_info": {"id": "** SPECIAL SESSION **"}
}
class DuplicateInstanceError(Exception):
def __init__(self, instance):
self.instance = instance
class BaseInstance:
"""
Base class for all instances (manageable by InstancesManager)
"""
def __new__(cls, *args, **kwargs):
# Extract arguments from both positional and keyword arguments
# Signature matches __init__: parent, session=None, _id=None, auto_register=True
parent = args[0] if len(args) > 0 and isinstance(args[0], BaseInstance) else kwargs.get("parent", None)
session = args[1] if len(args) > 1 and isinstance(args[1], dict) else kwargs.get("session", None)
_id = args[2] if len(args) > 2 and isinstance(args[2], str) else kwargs.get("_id", None)
# Compute _id
_id = cls.compute_id(_id, parent)
if session is None:
if parent is not None:
session = parent.get_session()
else:
raise TypeError("Either session or parent must be provided")
session_id = InstancesManager.get_session_id(session)
key = (session_id, _id)
if key in InstancesManager.instances:
res = InstancesManager.instances[key]
if type(res) is not cls:
raise TypeError(f"Instance with id {_id} already exists, but is of type {type(res)}")
return res
# Otherwise create a new instance
instance = super().__new__(cls)
instance._is_new_instance = True # mark as fresh
return instance
def __init__(self, parent: Optional['BaseInstance'],
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
if not getattr(self, "_is_new_instance", False):
# Skip __init__ if instance already existed
return
elif not isinstance(self, UniqueInstance):
# No more __init__ unless it's UniqueInstance
self._is_new_instance = False
self._parent = parent
self._session = session or (parent.get_session() if parent else None)
self._id = self.compute_id(_id, parent)
self._prefix = self._id if isinstance(self, (UniqueInstance, SingleInstance)) else self.compute_prefix()
if auto_register:
InstancesManager.register(self._session, self)
def get_session(self) -> dict:
return self._session
def get_id(self) -> str:
return self._id
def get_parent(self) -> Optional['BaseInstance']:
return self._parent
def get_prefix(self) -> str:
return self._prefix
def get_full_id(self) -> str:
return f"{InstancesManager.get_session_id(self._session)}-{self._id}"
def get_full_parent_id(self) -> Optional[str]:
parent = self.get_parent()
return parent.get_full_id() if parent else None
@classmethod
def compute_prefix(cls):
return f"mf-{pascal_to_snake(cls.__name__)}"
@classmethod
def compute_id(cls, _id: Optional[str], parent: Optional['BaseInstance']):
if _id is None:
prefix = cls.compute_prefix()
if issubclass(cls, SingleInstance):
_id = prefix
else:
_id = f"{prefix}-{str(uuid.uuid4())}"
return _id
if _id.startswith("-") and parent is not None:
return f"{parent.get_prefix()}{_id}"
return _id
class SingleInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
"""
def __init__(self,
parent: Optional[BaseInstance] = None,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class UniqueInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
But unlike SingleInstance, the __init__ is called every time it's instantiated.
"""
def __init__(self,
parent: Optional[BaseInstance] = None,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class MultipleInstance(BaseInstance):
"""
Base class for instances that can have multiple instances at a time.
"""
def __init__(self, parent: BaseInstance,
session: Optional[dict] = None,
_id: Optional[str] = None,
auto_register: bool = True):
super().__init__(parent, session, _id, auto_register)
class InstancesManager:
instances = {}
@staticmethod
def register(session: dict, instance: BaseInstance):
"""
Register an instance in the manager, so that it can be retrieved later.
:param session:
:param instance:
:return:
"""
key = (InstancesManager.get_session_id(session), instance.get_id())
if isinstance(instance, SingleInstance) and key in InstancesManager.instances:
raise DuplicateInstanceError(instance)
InstancesManager.instances[key] = instance
return instance
@staticmethod
def get(session: dict, instance_id: str):
"""
Get or create an instance of the given type (from its id)
:param session:
:param instance_id:
:return:
"""
key = (InstancesManager.get_session_id(session), instance_id)
return InstancesManager.instances[key]
@staticmethod
def get_session_id(session):
if session is None:
return "** NOT LOGGED IN **"
if "user_info" not in session:
return "** UNKNOWN USER **"
return session["user_info"].get("id", "** INVALID SESSION **")
@staticmethod
def get_session_user_name(session):
if session is None:
return "** NOT LOGGED IN **"
if "user_info" not in session:
return "** UNKNOWN USER **"
return session["user_info"].get("username", "** INVALID SESSION **")
@staticmethod
def reset():
InstancesManager.instances.clear()
@staticmethod
def clear_session(session):
"""
Remove all instances belonging to the given session.
:param session:
:return:
"""
session_id = InstancesManager.get_session_id(session)
InstancesManager.instances = {
key: instance
for key, instance in InstancesManager.instances.items()
if key[0] != session_id
}
RootInstance = SingleInstance(None, special_session, Ids.Root)

View File

@@ -0,0 +1,85 @@
from difflib import SequenceMatcher
from typing import Any
def _is_subsequence(query: str, target: str) -> tuple[bool, float]:
"""
Determines if a query string is a subsequence of a target string and calculates
a score based on the compactness of the match. The match is case-insensitive.
The function iterates through each character of the query and checks if it
exists in the target string while maintaining the order. If all characters of
the query are found in order, it calculates a score based on the smallest
window in the target that contains all the matched characters.
:param query: The query string to check as a subsequence.
:param target: The target string in which to find the subsequence.
:return: A tuple where the first value is a boolean indicating if a valid
subsequence exists, and the second value is a float representing the
compactness score of the match.
:rtype: tuple[bool, float]
"""
query = query.lower()
target = target.lower()
positions = []
idx = 0
for char in query:
idx = target.find(char, idx)
if idx == -1:
return False, 0.0
positions.append(idx)
idx += 1
# Smallest window containing all matched chars
window_size = positions[-1] - positions[0] + 1
# Score: ratio of query length vs window size (compactness)
score = len(query) / window_size
return True, score
def fuzzy_matching(query: str, choices: list[Any], similarity_threshold: float = 0.7, get_attr=None):
"""
Perform fuzzy matching on a list of items to find the items that are similar
to the given query based on a similarity threshold.
:param query: The search query to be matched, provided as a string.
:param choices: A list of strings representing the items to be compared against the query.
:param similarity_threshold: A float value representing the minimum similarity score
(between 0 and 1) an item needs to achieve to be considered a match. Defaults to 0.7.
:param get_attr: When choice is a object, give the property to use
:return: A list of strings containing the items from the input list that meet or exceed
the similarity threshold, sorted in descending order of similarity.
"""
get_attr = get_attr or (lambda x: x)
matches = []
for file_doc in choices:
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, query.lower(), get_attr(file_doc).lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_doc, similarity))
# Sort by similarity score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
def subsequence_matching(query: str, choices: list[Any], get_attr=None):
get_attr = get_attr or (lambda x: x)
matches = []
for item in choices:
matched, score = _is_subsequence(query, get_attr(item))
if matched:
matches.append((item, score))
# Sort by score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]

View File

@@ -0,0 +1,238 @@
from collections.abc import Callable
ROOT_COLOR = "#ff9999"
GHOST_COLOR = "#cccccc"
def from_nested_dict(trees: list[dict]) -> tuple[list, list]:
"""
Convert a list of nested dictionaries to vis.js nodes and edges format.
Args:
trees: List of nested dictionaries where keys are node names and values are
dictionaries of children (e.g., [{"root1": {"child1": {}}}, {"root2": {}}])
Returns:
tuple: (nodes, edges) where:
- nodes: list of dicts with auto-incremented numeric IDs
- edges: list of dicts with 'from' and 'to' keys
Example:
>>> trees = [{"root1": {"child1": {}}}, {"root2": {"child2": {}}}]
>>> nodes, edges = from_nested_dict(trees)
>>> # nodes = [{"id": 1, "label": "root1"}, {"id": 2, "label": "child1"}, ...]
"""
nodes = []
edges = []
node_id_counter = 1
node_id_map = {} # maps node_label -> node_id
def traverse(subtree: dict, parent_id: int | None = None):
nonlocal node_id_counter
for node_label, children in subtree.items():
# Create node with auto-incremented ID
current_id = node_id_counter
node_id_map[node_label] = current_id
nodes.append({
"id": current_id,
"label": node_label
})
node_id_counter += 1
# Create edge from parent to current node
if parent_id is not None:
edges.append({
"from": parent_id,
"to": current_id
})
# Recursively process children
if children:
traverse(children, parent_id=current_id)
# Process each tree in the list
for tree in trees:
traverse(tree)
return nodes, edges
def from_tree_with_metadata(
trees: list[dict],
id_getter: Callable = None,
label_getter: Callable = None,
children_getter: Callable = None
) -> tuple[list, list]:
"""
Convert a list of trees with metadata to vis.js nodes and edges format.
Args:
trees: List of dictionaries with 'id', 'label', and 'children' keys
(e.g., [{"id": "root1", "label": "Root 1", "children": [...]}, ...])
id_getter: Optional callback to extract node ID from dict
Default: lambda n: n.get("id")
label_getter: Optional callback to extract node label from dict
Default: lambda n: n.get("label", "")
children_getter: Optional callback to extract children list from dict
Default: lambda n: n.get("children", [])
Returns:
tuple: (nodes, edges) where:
- nodes: list of dicts with IDs from tree or auto-incremented
- edges: list of dicts with 'from' and 'to' keys
Example:
>>> trees = [
... {
... "id": "root1",
... "label": "Root Node 1",
... "children": [
... {"id": "child1", "label": "Child 1", "children": []}
... ]
... },
... {"id": "root2", "label": "Root Node 2"}
... ]
>>> nodes, edges = from_tree_with_metadata(trees)
"""
# Default getters
if id_getter is None:
id_getter = lambda n: n.get("id")
if label_getter is None:
label_getter = lambda n: n.get("label", "")
if children_getter is None:
children_getter = lambda n: n.get("children", [])
nodes = []
edges = []
node_id_counter = 1
def traverse(node_dict: dict, parent_id: int | str | None = None):
nonlocal node_id_counter
# Extract ID (use provided or auto-increment)
node_id = id_getter(node_dict)
if node_id is None:
node_id = node_id_counter
node_id_counter += 1
# Extract label
node_label = label_getter(node_dict)
# Create node
nodes.append({
"id": node_id,
"label": node_label
})
# Create edge from parent to current node
if parent_id is not None:
edges.append({
"from": parent_id,
"to": node_id
})
# Recursively process children
children = children_getter(node_dict)
if children:
for child in children:
traverse(child, parent_id=node_id)
# Process each tree in the list
for tree in trees:
traverse(tree)
return nodes, edges
def from_parent_child_list(
items: list,
id_getter: Callable = None,
label_getter: Callable = None,
parent_getter: Callable = None,
ghost_color: str = GHOST_COLOR,
root_color: str | None = ROOT_COLOR
) -> tuple[list, list]:
"""
Convert a list of items with parent references to vis.js nodes and edges format.
Args:
items: List of items (dicts or objects) with parent references
id_getter: callback to extract node ID
label_getter: callback to extract node label
parent_getter: callback to extract parent ID
ghost_color: color for ghost nodes (referenced parents)
root_color: color for root nodes (nodes without parent)
Returns:
tuple: (nodes, edges)
"""
# Default getters
if id_getter is None:
id_getter = lambda item: item.get("id")
if label_getter is None:
label_getter = lambda item: item.get("label", "")
if parent_getter is None:
parent_getter = lambda item: item.get("parent")
nodes = []
edges = []
# Track all existing node IDs
existing_ids = set()
# First pass: create nodes for all items
for item in items:
node_id = id_getter(item)
node_label = label_getter(item)
existing_ids.add(node_id)
nodes.append({
"id": node_id,
"label": node_label,
# root color assigned later
})
# Track ghost nodes
ghost_nodes = set()
# Track which nodes have parents
nodes_with_parent = set()
# Second pass: create edges and detect ghost nodes
for item in items:
node_id = id_getter(item)
parent_id = parent_getter(item)
# Skip roots
if parent_id is None or parent_id == "":
continue
# Child has a parent
nodes_with_parent.add(node_id)
# Create edge parent → child
edges.append({
"from": parent_id,
"to": node_id
})
# Create ghost node if parent not found
if parent_id not in existing_ids and parent_id not in ghost_nodes:
ghost_nodes.add(parent_id)
nodes.append({
"id": parent_id,
"label": str(parent_id),
"color": ghost_color
})
# Final pass: assign color to root nodes
if root_color is not None:
for node in nodes:
if node["id"] not in nodes_with_parent and node["id"] not in ghost_nodes:
# Root node
node["color"] = root_color
return nodes, edges

View File

@@ -1,8 +1,11 @@
import logging
import re
from bs4 import Tag
from fastcore.xml import FT
from fasthtml.fastapp import fast_app
from rich.console import Console
from rich.table import Table
from starlette.routing import Mount
from myfasthtml.core.constants import Routes, ROUTE_ROOT
@@ -60,15 +63,46 @@ def merge_classes(*args):
def debug_routes(app):
routes = []
def _clean_endpoint(endpoint):
res = str(endpoint).replace("<function ", "").replace(".<locals>", "")
return res.split(" at ")[0]
def _debug_routes(_app, _route, prefix=""):
if isinstance(_route, Mount):
for sub_route in _route.app.router.routes:
_debug_routes(_app, sub_route, prefix=_route.path)
else:
print(f"path={prefix}{_route.path}, methods={_route.methods}, endpoint={_route.endpoint}")
routes.append({
"number": len(routes),
"app": str(_app),
"name": _route.name,
"path": _route.path,
"full_path": prefix + _route.path,
"endpoint": _clean_endpoint(_route.endpoint),
"methods": _route.methods if hasattr(_route, "methods") else [],
"path_format": _route.path_format,
"path_regex": str(_route.path_regex),
})
for route in app.router.routes:
_debug_routes(app, route)
if not routes:
print("No routes found.")
return
table = Table(show_header=True, expand=True, header_style="bold")
columns = ["number", "name", "full_path", "endpoint", "methods"] # routes[0].keys()
for column in columns:
table.add_column(column)
for route in routes:
table.add_row(*[str(route[column]) for column in columns])
console = Console()
console.print(table)
def mount_utils(app):
@@ -158,19 +192,92 @@ def quoted_str(s):
return str(s)
def retrieve_user_info(session: dict):
if not session:
return {
"id": "** NOT LOGGED IN **",
"email": "** NOT LOGGED IN **",
"username": "** NOT LOGGED IN **",
"role": [],
"user_settings": {}
}
if "user_info" not in session:
return {
"id": "** UNKNOWN USER **",
"email": "** UNKNOWN USER **",
"username": "** UNKNOWN USER **",
"role": [],
"user_settings": {}
}
return session["user_info"]
def debug_session(session):
if session is None:
return "None"
if not isinstance(session, dict):
return str(session)
return session.get("user_info", {}).get("email", "** UNKNOWN USER **")
def get_id(obj):
if isinstance(obj, str):
return obj
elif hasattr(obj, "id"):
return obj.id
elif hasattr(obj, "get_id"):
return obj.get_id()
else:
return str(obj)
def pascal_to_snake(name: str) -> str:
"""Convert a PascalCase or CamelCase string to snake_case."""
if name is None:
return None
name = name.strip()
# Insert underscore before capital letters (except the first one)
s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
# Handle consecutive capital letters (like 'HTTPServer' -> 'http_server')
s2 = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1)
return s2.lower()
def snake_to_pascal(name: str) -> str:
"""Convert a snake_case string to PascalCase."""
if name is None:
return None
name = name.strip()
if not name:
return ""
# Split on underscores and capitalize each part
parts = name.split('_')
return ''.join(word.capitalize() for word in parts if word)
@utils_rt(Routes.Commands)
def post(session, c_id: str):
def post(session, c_id: str, client_response: dict = None):
"""
Default routes for all commands.
:param session:
:param c_id:
:param c_id: id of the command set
:param client_response: extra data received from the client (from the browser)
:return:
"""
logger.debug(f"Entering {Routes.Commands} with {session=}, {c_id=}")
client_response.pop("c_id", None)
logger.debug(f"Entering {Routes.Commands} with session='{debug_session(session)}', {c_id=}, {client_response=}")
from myfasthtml.core.commands import CommandsManager
command = CommandsManager.get_command(c_id)
if command:
return command.execute()
logger.debug(f"Executing command {command.name}.")
return command.execute(client_response)
raise ValueError(f"Command with ID '{c_id}' not found.")
@@ -188,6 +295,7 @@ def post(session, b_id: str, values: dict):
from myfasthtml.core.bindings import BindingsManager
binding = BindingsManager.get_binding(b_id)
if binding:
return binding.update(values)
res = binding.update(values)
return res
raise ValueError(f"Binding with ID '{b_id}' not found.")