429 lines
12 KiB
Python
429 lines
12 KiB
Python
import importlib
|
|
import logging
|
|
import re
|
|
|
|
from bs4 import Tag
|
|
from fastcore.xml import FT
|
|
from fasthtml.fastapp import fast_app
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from starlette.routing import Mount
|
|
|
|
from myfasthtml.core.constants import Routes, ROUTE_ROOT
|
|
from myfasthtml.core.dsl.types import Position
|
|
from myfasthtml.core.dsls import DslsManager
|
|
from myfasthtml.core.formatting.dsl import DSLSyntaxError
|
|
from myfasthtml.test.MyFT import MyFT
|
|
|
|
utils_app, utils_rt = fast_app()
|
|
logger = logging.getLogger("Routing")
|
|
|
|
|
|
def mount_if_not_exists(app, path: str, sub_app):
|
|
"""
|
|
Mounts a sub-application only if no Mount object already exists
|
|
at the specified path in the main application's router.
|
|
"""
|
|
is_mounted = False
|
|
|
|
for route in app.router.routes:
|
|
|
|
if isinstance(route, Mount):
|
|
if route.path == path:
|
|
is_mounted = True
|
|
break
|
|
|
|
if not is_mounted:
|
|
app.mount(path, app=sub_app)
|
|
|
|
|
|
def merge_classes(*args):
|
|
all_elements = []
|
|
for element in args:
|
|
if element is None or element == '':
|
|
continue
|
|
|
|
if isinstance(element, (tuple, list, set)):
|
|
all_elements.extend(element)
|
|
|
|
elif isinstance(element, dict):
|
|
if "cls" in element:
|
|
all_elements.append(element.pop("cls"))
|
|
elif "class" in element:
|
|
all_elements.append(element.pop("class"))
|
|
|
|
elif isinstance(element, str):
|
|
all_elements.append(element)
|
|
|
|
else:
|
|
raise ValueError(f"Cannot merge {element} of type {type(element)}")
|
|
|
|
if all_elements:
|
|
# Remove duplicates while preserving order
|
|
unique_elements = list(dict.fromkeys(all_elements))
|
|
return " ".join(unique_elements)
|
|
else:
|
|
return None
|
|
|
|
|
|
def debug_routes(app):
|
|
routes = []
|
|
|
|
def _clean_endpoint(endpoint):
|
|
res = str(endpoint).replace("<function ", "").replace(".<locals>", "")
|
|
return res.split(" at ")[0]
|
|
|
|
def _debug_routes(_app, _route, prefix=""):
|
|
if isinstance(_route, Mount):
|
|
for sub_route in _route.app.router.routes:
|
|
_debug_routes(_app, sub_route, prefix=_route.path)
|
|
else:
|
|
routes.append({
|
|
"number": len(routes),
|
|
"app": str(_app),
|
|
"name": _route.name,
|
|
"path": _route.path,
|
|
"full_path": prefix + _route.path,
|
|
"endpoint": _clean_endpoint(_route.endpoint),
|
|
"methods": _route.methods if hasattr(_route, "methods") else [],
|
|
"path_format": _route.path_format,
|
|
"path_regex": str(_route.path_regex),
|
|
})
|
|
|
|
for route in app.router.routes:
|
|
_debug_routes(app, route)
|
|
|
|
if not routes:
|
|
print("No routes found.")
|
|
return
|
|
|
|
table = Table(show_header=True, expand=True, header_style="bold")
|
|
columns = ["number", "name", "full_path", "endpoint", "methods"] # routes[0].keys()
|
|
for column in columns:
|
|
table.add_column(column)
|
|
|
|
for route in routes:
|
|
table.add_row(*[str(route[column]) for column in columns])
|
|
|
|
console = Console()
|
|
console.print(table)
|
|
|
|
|
|
def mount_utils(app):
|
|
"""
|
|
Mounts the commands_app to the given application instance if the route does not already exist.
|
|
|
|
:param app: The application instance to which the commands_app will be mounted.
|
|
:type app: Any
|
|
:return: Returns the result of the mount operation performed by mount_if_not_exists.
|
|
:rtype: Any
|
|
"""
|
|
return mount_if_not_exists(app, ROUTE_ROOT, utils_app)
|
|
|
|
|
|
def get_default_ft_attr(ft):
|
|
"""
|
|
for every type of HTML element (ft) gives the default attribute to use for binding
|
|
:param ft:
|
|
:return:
|
|
"""
|
|
if ft.tag == "input":
|
|
if ft.attrs.get("type") == "checkbox":
|
|
return "checked"
|
|
elif ft.attrs.get("type") == "radio":
|
|
return "checked"
|
|
elif ft.attrs.get("type") == "file":
|
|
return "files"
|
|
else:
|
|
return "value"
|
|
else:
|
|
return None # indicate that the content of the FT should be updated
|
|
|
|
|
|
def get_default_attr(data):
|
|
all_attrs = data.__dict__.keys()
|
|
return next(iter(all_attrs))
|
|
|
|
|
|
def is_checkbox(elt):
|
|
if isinstance(elt, (FT, MyFT)):
|
|
return elt.tag == "input" and elt.attrs.get("type", None) == "checkbox"
|
|
elif isinstance(elt, Tag):
|
|
return elt.name == "input" and elt.attrs.get("type", None) == "checkbox"
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_radio(elt):
|
|
if isinstance(elt, (FT, MyFT)):
|
|
return elt.tag == "input" and elt.attrs.get("type", None) == "radio"
|
|
elif isinstance(elt, Tag):
|
|
return elt.name == "input" and elt.attrs.get("type", None) == "radio"
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_select(elt):
|
|
if isinstance(elt, (FT, MyFT)):
|
|
return elt.tag == "select"
|
|
elif isinstance(elt, Tag):
|
|
return elt.name == "select"
|
|
else:
|
|
return False
|
|
|
|
|
|
def is_datalist(elt):
|
|
if isinstance(elt, (FT, MyFT)):
|
|
return elt.tag == "datalist"
|
|
elif isinstance(elt, Tag):
|
|
return elt.name == "datalist"
|
|
else:
|
|
return False
|
|
|
|
|
|
def quoted_str(s):
|
|
if s is None:
|
|
return "None"
|
|
|
|
if isinstance(s, str):
|
|
if "'" in s and '"' in s:
|
|
return f'"{s.replace('"', '\\"')}"'
|
|
elif '"' in s:
|
|
return f"'{s}'"
|
|
else:
|
|
return f'"{s}"'
|
|
|
|
return str(s)
|
|
|
|
|
|
def retrieve_user_info(session: dict):
|
|
if not session:
|
|
return {
|
|
"id": "** NOT LOGGED IN **",
|
|
"email": "** NOT LOGGED IN **",
|
|
"username": "** NOT LOGGED IN **",
|
|
"role": [],
|
|
"user_settings": {}
|
|
}
|
|
|
|
if "user_info" not in session:
|
|
return {
|
|
"id": "** UNKNOWN USER **",
|
|
"email": "** UNKNOWN USER **",
|
|
"username": "** UNKNOWN USER **",
|
|
"role": [],
|
|
"user_settings": {}
|
|
}
|
|
|
|
return session["user_info"]
|
|
|
|
|
|
def debug_session(session):
|
|
if session is None:
|
|
return "None"
|
|
|
|
if not isinstance(session, dict):
|
|
return str(session)
|
|
|
|
return session.get("user_info", {}).get("email", "** UNKNOWN USER **")
|
|
|
|
|
|
def get_id(obj):
|
|
if isinstance(obj, str):
|
|
return obj
|
|
elif hasattr(obj, "id"):
|
|
return obj.id
|
|
elif hasattr(obj, "get_id"):
|
|
return obj.get_id()
|
|
else:
|
|
return str(obj)
|
|
|
|
|
|
def pascal_to_snake(name: str) -> str:
|
|
"""Convert a PascalCase or CamelCase string to snake_case."""
|
|
if name is None:
|
|
return None
|
|
|
|
name = name.strip()
|
|
# Insert underscore before capital letters (except the first one)
|
|
s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
|
|
# Handle consecutive capital letters (like 'HTTPServer' -> 'http_server')
|
|
s2 = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1)
|
|
return s2.lower()
|
|
|
|
|
|
def snake_to_pascal(name: str) -> str:
|
|
"""Convert a snake_case string to PascalCase."""
|
|
if name is None:
|
|
return None
|
|
|
|
name = name.strip()
|
|
if not name:
|
|
return ""
|
|
|
|
# Split on underscores and capitalize each part
|
|
parts = name.split('_')
|
|
return ''.join(word.capitalize() for word in parts if word)
|
|
|
|
|
|
def flatten(*args):
|
|
"""
|
|
Flattens nested lists or tuples into a single list. This utility function takes
|
|
any number of arguments, iterating recursively through any nested lists or
|
|
tuples, and returns a flat list containing all the elements.
|
|
|
|
:param args: Arbitrary number of arguments, which can include nested lists or
|
|
tuples to be flattened.
|
|
:type args: Any
|
|
:return: A flat list containing all the elements from the input, preserving the
|
|
order of elements as they are recursively extracted from nested
|
|
structures.
|
|
:rtype: list
|
|
"""
|
|
res = []
|
|
for arg in args:
|
|
if isinstance(arg, (list, tuple)):
|
|
res.extend(flatten(*arg))
|
|
else:
|
|
res.append(arg)
|
|
return res
|
|
|
|
|
|
def make_html_id(s: str | None) -> str | None:
|
|
"""
|
|
Creates a valid html id
|
|
:param s:
|
|
:return:
|
|
"""
|
|
if s is None:
|
|
return None
|
|
|
|
s = str(s).strip()
|
|
# Replace spaces and special characters with hyphens or remove them
|
|
s = re.sub(r'[^a-zA-Z0-9_-]', '-', s)
|
|
|
|
# Ensure the ID starts with a letter or underscore
|
|
if not re.match(r'^[a-zA-Z_]', s):
|
|
s = 'id_' + s # Add a prefix if it doesn't
|
|
|
|
# Collapse multiple consecutive hyphens into one
|
|
s = re.sub(r'-+', '-', s)
|
|
|
|
# Replace trailing hyphens with underscores
|
|
s = re.sub(r'-+$', '_', s)
|
|
|
|
return s
|
|
|
|
|
|
def make_safe_id(s: str | None):
|
|
if s is None:
|
|
return None
|
|
|
|
res = re.sub('-', '_', make_html_id(s)) # replace '-' by '_'
|
|
return res.lower() # no uppercase
|
|
|
|
|
|
def get_class(qualified_class_name: str):
|
|
"""
|
|
Dynamically loads and returns a class type from its fully qualified name.
|
|
Note that the class is not instantiated.
|
|
|
|
:param qualified_class_name: Fully qualified name of the class (e.g., 'some.module.ClassName').
|
|
:return: The class object.
|
|
:raises ImportError: If the module cannot be imported.
|
|
:raises AttributeError: If the class cannot be resolved in the module.
|
|
"""
|
|
module_name, class_name = qualified_class_name.rsplit(".", 1)
|
|
|
|
try:
|
|
module = importlib.import_module(module_name)
|
|
except ModuleNotFoundError as e:
|
|
raise ImportError(f"Could not import module '{module_name}' for '{qualified_class_name}': {e}")
|
|
|
|
if not hasattr(module, class_name):
|
|
raise AttributeError(f"Component '{class_name}' not found in '{module.__name__}'.")
|
|
|
|
return getattr(module, class_name)
|
|
|
|
|
|
@utils_rt(Routes.Commands)
|
|
def post(session, c_id: str, client_response: dict = None):
|
|
"""
|
|
Default routes for all commands.
|
|
:param session:
|
|
:param c_id: id of the command set
|
|
:param client_response: extra data received from the client (from the browser)
|
|
:return:
|
|
"""
|
|
client_response.pop("c_id", None)
|
|
logger.debug(f"Entering {Routes.Commands} with session='{debug_session(session)}', {c_id=}, {client_response=}")
|
|
from myfasthtml.core.commands import CommandsManager
|
|
command = CommandsManager.get_command(c_id)
|
|
if command:
|
|
logger.debug(f"Executing command {command.name}.")
|
|
return command.execute(client_response)
|
|
|
|
raise ValueError(f"Command with ID '{c_id}' not found.")
|
|
|
|
|
|
@utils_rt(Routes.Bindings)
|
|
def post(session, b_id: str, values: dict):
|
|
"""
|
|
Default routes for all bindings.
|
|
:param session:
|
|
:param b_id:
|
|
:param values:
|
|
:return:
|
|
"""
|
|
logger.debug(f"Entering {Routes.Bindings} with session='{debug_session(session)}', {b_id=}, {values=}")
|
|
from myfasthtml.core.bindings import BindingsManager
|
|
binding = BindingsManager.get_binding(b_id)
|
|
if binding:
|
|
res = binding.update(values)
|
|
return res
|
|
|
|
raise ValueError(f"Binding with ID '{b_id}' not found.")
|
|
|
|
|
|
@utils_rt(Routes.Completions)
|
|
def get(session, e_id: str, text: str, line: int, ch: int):
|
|
"""
|
|
Default routes for Domaine Specific Languages completion
|
|
:param session:
|
|
:param e_id: engine_id
|
|
:param text:
|
|
:param line:
|
|
:param ch:
|
|
:return:
|
|
"""
|
|
logger.debug(
|
|
f"Entering {Routes.Completions} with session='{debug_session(session)}', {e_id=}, text={len(text)} char(s), {line=}, {ch=}")
|
|
completion = DslsManager.get_completion_engine(e_id)
|
|
result = completion.get_completions(text, Position(line, ch))
|
|
return result.to_dict()
|
|
|
|
|
|
@utils_rt(Routes.Validations)
|
|
def get(session, e_id: str, text: str, line: int, ch: int):
|
|
"""
|
|
Default routes for Domaine Specific Languages syntax validation
|
|
:param session:
|
|
:param e_id:
|
|
:param text:
|
|
:param line:
|
|
:param ch:
|
|
:return:
|
|
"""
|
|
logger.debug(
|
|
f"Entering {Routes.Validations} with session='{debug_session(session)}', {e_id=}, text={len(text)} char(s), {line=}, {ch=}")
|
|
validation = DslsManager.get_validation_parser(e_id)
|
|
try:
|
|
validation.parse(text)
|
|
return {"errors": []}
|
|
except DSLSyntaxError as e:
|
|
return {"errors": [{
|
|
"line": e.line or 1,
|
|
"column": e.column or 1,
|
|
"message": e.message
|
|
}]}
|