Files
MyFastHtml/src/myfasthtml/core/utils.py

324 lines
8.6 KiB
Python

import logging
import re
from bs4 import Tag
from fastcore.xml import FT
from fasthtml.fastapp import fast_app
from rich.console import Console
from rich.table import Table
from starlette.routing import Mount
from myfasthtml.core.constants import Routes, ROUTE_ROOT
from myfasthtml.test.MyFT import MyFT
utils_app, utils_rt = fast_app()
logger = logging.getLogger("Routing")
def mount_if_not_exists(app, path: str, sub_app):
"""
Mounts a sub-application only if no Mount object already exists
at the specified path in the main application's router.
"""
is_mounted = False
for route in app.router.routes:
if isinstance(route, Mount):
if route.path == path:
is_mounted = True
break
if not is_mounted:
app.mount(path, app=sub_app)
def merge_classes(*args):
all_elements = []
for element in args:
if element is None or element == '':
continue
if isinstance(element, (tuple, list, set)):
all_elements.extend(element)
elif isinstance(element, dict):
if "cls" in element:
all_elements.append(element.pop("cls"))
elif "class" in element:
all_elements.append(element.pop("class"))
elif isinstance(element, str):
all_elements.append(element)
else:
raise ValueError(f"Cannot merge {element} of type {type(element)}")
if all_elements:
# Remove duplicates while preserving order
unique_elements = list(dict.fromkeys(all_elements))
return " ".join(unique_elements)
else:
return None
def debug_routes(app):
routes = []
def _clean_endpoint(endpoint):
res = str(endpoint).replace("<function ", "").replace(".<locals>", "")
return res.split(" at ")[0]
def _debug_routes(_app, _route, prefix=""):
if isinstance(_route, Mount):
for sub_route in _route.app.router.routes:
_debug_routes(_app, sub_route, prefix=_route.path)
else:
routes.append({
"number": len(routes),
"app": str(_app),
"name": _route.name,
"path": _route.path,
"full_path": prefix + _route.path,
"endpoint": _clean_endpoint(_route.endpoint),
"methods": _route.methods if hasattr(_route, "methods") else [],
"path_format": _route.path_format,
"path_regex": str(_route.path_regex),
})
for route in app.router.routes:
_debug_routes(app, route)
if not routes:
print("No routes found.")
return
table = Table(show_header=True, expand=True, header_style="bold")
columns = ["number", "name", "full_path", "endpoint", "methods"] # routes[0].keys()
for column in columns:
table.add_column(column)
for route in routes:
table.add_row(*[str(route[column]) for column in columns])
console = Console()
console.print(table)
def mount_utils(app):
"""
Mounts the commands_app to the given application instance if the route does not already exist.
:param app: The application instance to which the commands_app will be mounted.
:type app: Any
:return: Returns the result of the mount operation performed by mount_if_not_exists.
:rtype: Any
"""
return mount_if_not_exists(app, ROUTE_ROOT, utils_app)
def get_default_ft_attr(ft):
"""
for every type of HTML element (ft) gives the default attribute to use for binding
:param ft:
:return:
"""
if ft.tag == "input":
if ft.attrs.get("type") == "checkbox":
return "checked"
elif ft.attrs.get("type") == "radio":
return "checked"
elif ft.attrs.get("type") == "file":
return "files"
else:
return "value"
else:
return None # indicate that the content of the FT should be updated
def get_default_attr(data):
all_attrs = data.__dict__.keys()
return next(iter(all_attrs))
def is_checkbox(elt):
if isinstance(elt, (FT, MyFT)):
return elt.tag == "input" and elt.attrs.get("type", None) == "checkbox"
elif isinstance(elt, Tag):
return elt.name == "input" and elt.attrs.get("type", None) == "checkbox"
else:
return False
def is_radio(elt):
if isinstance(elt, (FT, MyFT)):
return elt.tag == "input" and elt.attrs.get("type", None) == "radio"
elif isinstance(elt, Tag):
return elt.name == "input" and elt.attrs.get("type", None) == "radio"
else:
return False
def is_select(elt):
if isinstance(elt, (FT, MyFT)):
return elt.tag == "select"
elif isinstance(elt, Tag):
return elt.name == "select"
else:
return False
def is_datalist(elt):
if isinstance(elt, (FT, MyFT)):
return elt.tag == "datalist"
elif isinstance(elt, Tag):
return elt.name == "datalist"
else:
return False
def quoted_str(s):
if s is None:
return "None"
if isinstance(s, str):
if "'" in s and '"' in s:
return f'"{s.replace('"', '\\"')}"'
elif '"' in s:
return f"'{s}'"
else:
return f'"{s}"'
return str(s)
def retrieve_user_info(session: dict):
if not session:
return {
"id": "** NOT LOGGED IN **",
"email": "** NOT LOGGED IN **",
"username": "** NOT LOGGED IN **",
"role": [],
"user_settings": {}
}
if "user_info" not in session:
return {
"id": "** UNKNOWN USER **",
"email": "** UNKNOWN USER **",
"username": "** UNKNOWN USER **",
"role": [],
"user_settings": {}
}
return session["user_info"]
def debug_session(session):
if session is None:
return "None"
if not isinstance(session, dict):
return str(session)
return session.get("user_info", {}).get("email", "** UNKNOWN USER **")
def get_id(obj):
if isinstance(obj, str):
return obj
elif hasattr(obj, "id"):
return obj.id
elif hasattr(obj, "get_id"):
return obj.get_id()
else:
return str(obj)
def pascal_to_snake(name: str) -> str:
"""Convert a PascalCase or CamelCase string to snake_case."""
if name is None:
return None
name = name.strip()
# Insert underscore before capital letters (except the first one)
s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
# Handle consecutive capital letters (like 'HTTPServer' -> 'http_server')
s2 = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1)
return s2.lower()
def snake_to_pascal(name: str) -> str:
"""Convert a snake_case string to PascalCase."""
if name is None:
return None
name = name.strip()
if not name:
return ""
# Split on underscores and capitalize each part
parts = name.split('_')
return ''.join(word.capitalize() for word in parts if word)
def flatten(*args):
"""
Flattens nested lists or tuples into a single list. This utility function takes
any number of arguments, iterating recursively through any nested lists or
tuples, and returns a flat list containing all the elements.
:param args: Arbitrary number of arguments, which can include nested lists or
tuples to be flattened.
:type args: Any
:return: A flat list containing all the elements from the input, preserving the
order of elements as they are recursively extracted from nested
structures.
:rtype: list
"""
res = []
for arg in args:
if isinstance(arg, (list, tuple)):
res.extend(flatten(*arg))
else:
res.append(arg)
return res
@utils_rt(Routes.Commands)
def post(session, c_id: str, client_response: dict = None):
"""
Default routes for all commands.
:param session:
:param c_id: id of the command set
:param client_response: extra data received from the client (from the browser)
:return:
"""
client_response.pop("c_id", None)
logger.debug(f"Entering {Routes.Commands} with session='{debug_session(session)}', {c_id=}, {client_response=}")
from myfasthtml.core.commands import CommandsManager
command = CommandsManager.get_command(c_id)
if command:
logger.debug(f"Executing command {command.name}.")
return command.execute(client_response)
raise ValueError(f"Command with ID '{c_id}' not found.")
@utils_rt(Routes.Bindings)
def post(session, b_id: str, values: dict):
"""
Default routes for all bindings.
:param session:
:param b_id:
:param values:
:return:
"""
logger.debug(f"Entering {Routes.Bindings} with {session=}, {b_id=}, {values=}")
from myfasthtml.core.bindings import BindingsManager
binding = BindingsManager.get_binding(b_id)
if binding:
res = binding.update(values)
return res
raise ValueError(f"Binding with ID '{b_id}' not found.")