6 Commits

37 changed files with 29784 additions and 86972 deletions

1
.gitignore vendored
View File

@@ -3,6 +3,7 @@ app.egg-info
*.pyc
.mypy_cache
.coverage
.myFastHtmlDb
htmlcov
.cache
.venv

View File

@@ -63,7 +63,7 @@ if __name__ == "__main__":
```python
from fasthtml import serve
from myfasthtml.controls.helpers import mk_button
from myfasthtml.controls.helpers import mk
from myfasthtml.core.commands import Command
from myfasthtml.myfastapp import create_app
@@ -82,7 +82,7 @@ app, rt = create_app(protect_routes=False)
@rt("/")
def get_homepage():
return mk_button("Click Me!", command=hello_command)
return mk.button("Click Me!", command=hello_command)
if __name__ == "__main__":
@@ -97,11 +97,17 @@ if __name__ == "__main__":
### Bind components
```python
from dataclasses import dataclass
from myfasthtml.controls.helpers import mk
@dataclass
class Data:
value: str = "Hello World"
checked: bool = False
# Binds an Input with a label
mk.mk(Input(name="input_name"), binding=Binding(data, attr="value").htmx(trigger="input changed")),
mk.mk(Label("Text"), binding=Binding(data, attr="value")),
@@ -815,6 +821,24 @@ mk.manage_binding(label_elt, Binding(data))
# Input won't trigger updates, but label will still display data
```
## Authentication
session
```
{'access_token': 'xxx',
'refresh_token': 'yyy',
'user_info': {
'email': 'admin@myauth.com',
'username': 'admin',
'roles': ['admin'],
'user_settings': {},
'id': 'uuid',
'created_at': '2025-11-10T15:52:59.006213',
'updated_at': '2025-11-10T15:52:59.006213'
}
}
```
## Contributing
We welcome contributions! To get started:

View File

@@ -35,9 +35,11 @@ dependencies = [
"email-validator",
"httptools",
"myauth",
"mydbengine",
"myutils",
"python-fasthtml",
"PyYAML",
"typer",
"uvloop",
"watchfiles",
"websockets",

View File

@@ -36,6 +36,7 @@ markdown-it-py==4.0.0
mdurl==0.1.2
more-itertools==10.8.0
myauth==0.2.0
mydbengine==0.1.0
myutils==0.4.0
nh3==0.3.1
oauthlib==3.3.1
@@ -64,11 +65,13 @@ rfc3986==2.0.0
rich==14.2.0
rsa==4.9.1
SecretStorage==3.4.0
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
soupsieve==2.8
starlette==0.48.0
twine==6.2.0
typer==0.20.0
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.5.0

39
src/app.py Normal file
View File

@@ -0,0 +1,39 @@
import logging
from fasthtml import serve
from fasthtml.components import *
from myfasthtml.controls.Layout import Layout
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import InstancesManager
from myfasthtml.myfastapp import create_app
logging.basicConfig(
level=logging.DEBUG, # Set logging level to DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
datefmt='%Y-%m-%d %H:%M:%S', # Timestamp format
)
app, rt = create_app(protect_routes=True,
mount_auth_app=True,
pico=False,
title="MyFastHtml",
live=True,
base_url="http://localhost:5003")
@rt("/")
def index(session):
layout = InstancesManager.get(session, Ids.Layout, Layout, "Testing Layout")
layout.set_footer("Goodbye World")
for i in range(1000):
layout.left_drawer.append(Div(f"Left Drawer Item {i}"))
content = tuple([Div(f"Content {i}") for i in range(1000)])
layout.set_main(content)
return layout
if __name__ == "__main__":
# debug_routes(app)
serve(port=5003)

View File

@@ -12,4 +12,161 @@
height: 16px;
margin-top: auto;
margin-bottom: 4px;
}
/*
* MF Layout Component - CSS Grid Layout
* Provides fixed header/footer, collapsible drawers, and scrollable main content
* Compatible with DaisyUI 5
*/
/* Main layout container using CSS Grid */
.mf-layout {
display: grid;
grid-template-areas:
"header header header"
"left-drawer main right-drawer"
"footer footer footer";
grid-template-rows: 32px 1fr 32px;
grid-template-columns: auto 1fr auto;
height: 100vh;
width: 100vw;
overflow: hidden;
}
/* Header - fixed at top */
.mf-layout-header {
grid-area: header;
display: flex;
align-items: center;
justify-content: space-between; /* put one item on each side */
gap: 1rem;
padding: 0 1rem;
background-color: var(--color-base-300);
border-bottom: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
z-index: 30;
}
/* Footer - fixed at bottom */
.mf-layout-footer {
grid-area: footer;
display: flex;
align-items: center;
padding: 0 1rem;
background-color: var(--color-neutral);
color: var(--color-neutral-content);
border-top: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
z-index: 30;
}
/* Main content area - scrollable */
.mf-layout-main {
grid-area: main;
overflow-y: auto;
overflow-x: auto;
padding: 1rem;
background-color: var(--color-base-100);
}
/* Drawer base styles */
.mf-layout-drawer {
overflow-y: auto;
overflow-x: hidden;
background-color: var(--color-base-100);
transition: width 0.3s ease-in-out, margin 0.3s ease-in-out;
width: 250px;
padding: 1rem;
}
/* Left drawer */
.mf-layout-left-drawer {
grid-area: left-drawer;
border-right: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
}
/* Right drawer */
.mf-layout-right-drawer {
grid-area: right-drawer;
border-left: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
}
/* Collapsed drawer states */
.mf-layout-drawer.collapsed {
width: 0;
padding: 0;
border: none;
overflow: hidden;
}
/* Toggle buttons positioning */
.mf-layout-toggle-left {
margin-right: auto;
}
.mf-layout-toggle-right {
margin-left: auto;
}
/* Smooth scrollbar styling for webkit browsers */
.mf-layout-main::-webkit-scrollbar,
.mf-layout-drawer::-webkit-scrollbar {
width: 8px;
height: 8px;
}
.mf-layout-main::-webkit-scrollbar-track,
.mf-layout-drawer::-webkit-scrollbar-track {
background: var(--color-base-200);
}
.mf-layout-main::-webkit-scrollbar-thumb,
.mf-layout-drawer::-webkit-scrollbar-thumb {
background: color-mix(in oklab, var(--color-base-content) 20%, #0000);
border-radius: 4px;
}
.mf-layout-main::-webkit-scrollbar-thumb:hover,
.mf-layout-drawer::-webkit-scrollbar-thumb:hover {
background: color-mix(in oklab, var(--color-base-content) 30%, #0000);
}
/* Responsive adjustments for smaller screens */
@media (max-width: 768px) {
.mf-layout-drawer {
width: 200px;
}
.mf-layout-header,
.mf-layout-footer {
padding: 0 0.5rem;
}
.mf-layout-main {
padding: 0.5rem;
}
}
/* Handle layouts with no drawers */
.mf-layout[data-left-drawer="false"] {
grid-template-areas:
"header header"
"main right-drawer"
"footer footer";
grid-template-columns: 1fr auto;
}
.mf-layout[data-right-drawer="false"] {
grid-template-areas:
"header header"
"left-drawer main"
"footer footer";
grid-template-columns: auto 1fr;
}
.mf-layout[data-left-drawer="false"][data-right-drawer="false"] {
grid-template-areas:
"header"
"main"
"footer";
grid-template-columns: 1fr;
}

View File

@@ -0,0 +1,119 @@
/**
* MF Layout Component - JavaScript Controller
* Manages drawer state and provides programmatic control
*/
// Global registry for layout instances
if (typeof window.mfLayoutInstances === 'undefined') {
window.mfLayoutInstances = {};
}
/**
* Initialize a layout instance with drawer controls
* @param {string} layoutId - The unique ID of the layout (mf-layout-xxx)
*/
function initLayout(layoutId) {
const layoutElement = document.getElementById(layoutId);
if (!layoutElement) {
console.error(`Layout with id "${layoutId}" not found`);
return;
}
// Create layout controller object
const layoutController = {
layoutId: layoutId,
element: layoutElement,
/**
* Get drawer element by side
* @param {string} side - 'left' or 'right'
* @returns {HTMLElement|null} The drawer element
*/
getDrawer: function (side) {
if (side !== 'left' && side !== 'right') {
console.error(`Invalid drawer side: "${side}". Must be "left" or "right".`);
return null;
}
const drawerClass = side === 'left' ? '.mf-layout-left-drawer' : '.mf-layout-right-drawer';
return this.element.querySelector(drawerClass);
},
/**
* Check if a drawer is currently open
* @param {string} side - 'left' or 'right'
* @returns {boolean} True if drawer is open
*/
isDrawerOpen: function (side) {
const drawer = this.getDrawer(side);
return drawer ? !drawer.classList.contains('collapsed') : false;
},
/**
* Open a drawer
* @param {string} side - 'left' or 'right'
*/
openDrawer: function (side) {
const drawer = this.getDrawer(side);
if (drawer) {
drawer.classList.remove('collapsed');
}
},
/**
* Close a drawer
* @param {string} side - 'left' or 'right'
*/
closeDrawer: function (side) {
const drawer = this.getDrawer(side);
if (drawer) {
drawer.classList.add('collapsed');
}
},
/**
* Toggle a drawer between open and closed
* @param {string} side - 'left' or 'right'
*/
toggleDrawer: function (side) {
if (this.isDrawerOpen(side)) {
this.closeDrawer(side);
} else {
this.openDrawer(side);
}
},
/**
* Initialize event listeners for toggle buttons
*/
initEventListeners: function () {
// Get all toggle buttons within this layout
const toggleButtons = this.element.querySelectorAll('[class*="mf-layout-toggle"]');
toggleButtons.forEach(button => {
button.addEventListener('click', (event) => {
event.preventDefault();
const side = button.getAttribute('data-side');
if (side) {
this.toggleDrawer(side);
}
});
});
}
};
// Initialize event listeners
layoutController.initEventListeners();
// Store instance in global registry for programmatic access
window.mfLayoutInstances[layoutId] = layoutController;
// Log successful initialization
console.log(`Layout "${layoutId}" initialized successfully`);
}
// Export for module environments if needed
if (typeof module !== 'undefined' && module.exports) {
module.exports = {initLayout};
}

View File

@@ -18,7 +18,7 @@ from ..auth.utils import (
)
def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db"):
def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db", base_url=None):
"""
Setup all authentication and protected routes.
@@ -27,6 +27,7 @@ def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db"):
rt: Route decorator from FastHTML
mount_auth_app: Whether to mount the auth FastApi API routes
sqlite_db_path: by default, create a new SQLite database at this path
base_url: Base URL for the application (default to localhost:5001 if not provided)
"""
# ============================================================================
@@ -61,7 +62,7 @@ def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db"):
RedirectResponse on success, or LoginPage with error on failure
"""
# Attempt login
auth_data = login_user(email, password)
auth_data = login_user(email, password, base_url=base_url)
if auth_data:
# Login successful - store tokens in session
@@ -69,7 +70,7 @@ def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db"):
session['refresh_token'] = auth_data['refresh_token']
# Get user info and store in session
user_info = get_user_info(auth_data['access_token'])
user_info = get_user_info(auth_data['access_token'], base_url=base_url)
if user_info:
session['user_info'] = user_info
@@ -116,7 +117,7 @@ def setup_auth_routes(app, rt, mount_auth_app=True, sqlite_db_path="Users.db"):
return RegisterPage(error_message="Password must be at least 8 characters long.")
# Attempt registration
result = register_user(email, username, password)
result = register_user(email, username, password, base_url=base_url)
if result:
# Registration successful - show success message and auto-login

View File

@@ -63,14 +63,12 @@ def auth_before(request, session):
Args:
request: Starlette request object
session: FastHTML session object
Returns:
RedirectResponse to login page if authentication fails, None otherwise
"""
# Get tokens from session
access_token = session.get('access_token')
refresh_token = session.get('refresh_token')
print(f"path={request.scope['path']}, {session=}, {access_token=}, {refresh_token=}")
# If no access token, redirect to login
if not access_token:
return RedirectResponse('/login', status_code=303)
@@ -163,13 +161,14 @@ def check_token_expiry(token: str) -> Optional[float]:
return None
def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
def login_user(email: str, password: str, base_url: str = None) -> Optional[Dict[str, Any]]:
"""
Authenticate user with email and password.
Args:
email: User email address
password: User password
base_url:
Returns:
Dictionary containing access_token, refresh_token, and user_info if successful,
@@ -177,7 +176,7 @@ def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/login",
f"{base_url or API_BASE_URL}/auth/login",
data={"username": email, "password": password},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=10.0
@@ -196,7 +195,7 @@ def login_user(email: str, password: str) -> Optional[Dict[str, Any]]:
return None
def register_user(email: str, username: str, password: str) -> Optional[Dict[str, Any]]:
def register_user(email: str, username: str, password: str, base_url: str = None) -> Optional[Dict[str, Any]]:
"""
Register a new user.
@@ -204,14 +203,14 @@ def register_user(email: str, username: str, password: str) -> Optional[Dict[str
email: User email address
username: User name
password: User password
base_url:
Returns:
Dictionary containing success message if registration succeeds,
None if registration fails
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/register",
f"{base_url or API_BASE_URL}/auth/register",
json={"email": email, "username": username, "password": password},
timeout=10.0
)
@@ -224,20 +223,20 @@ def register_user(email: str, username: str, password: str) -> Optional[Dict[str
return None
def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]:
def refresh_access_token(refresh_token: str, base_url: str = None) -> Optional[Dict[str, Any]]:
"""
Refresh the access token using a refresh token.
Args:
refresh_token: Valid refresh token
base_url:
Returns:
Dictionary containing new access_token and refresh_token if successful,
None if refresh fails
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/refresh",
f"{base_url or API_BASE_URL}/auth/refresh",
json={"refresh_token": refresh_token},
timeout=10.0
)
@@ -254,20 +253,20 @@ def refresh_access_token(refresh_token: str) -> Optional[Dict[str, Any]]:
return None
def get_user_info(access_token: str) -> Optional[Dict[str, Any]]:
def get_user_info(access_token: str, base_url: str = None) -> Optional[Dict[str, Any]]:
"""
Get current user information using access token.
Args:
access_token: Valid access token
base_url:
Returns:
Dictionary containing user information if successful,
None if request fails
"""
try:
response = http_client.get(
f"{API_BASE_URL}/auth/me",
f"{base_url or API_BASE_URL}/auth/me",
headers={"Authorization": f"Bearer {access_token}"},
timeout=10.0
)
@@ -280,19 +279,36 @@ def get_user_info(access_token: str) -> Optional[Dict[str, Any]]:
return None
def logout_user(refresh_token: str) -> bool:
def save_user_info(access_token: str, user_profile: dict, base_url: str = None):
try:
response = http_client.patch(
f"{base_url or API_BASE_URL}/auth/me",
headers={"Authorization": f"Bearer {access_token}"},
timeout=10.0,
json=user_profile
)
if response.status_code == 200:
return response.json()
return None
except httpx.HTTPError:
return None
def logout_user(refresh_token: str, base_url: str = None) -> bool:
"""
Logout user by revoking the refresh token.
Args:
refresh_token: Refresh token to revoke
base_url:
Returns:
True if logout successful, False otherwise
"""
try:
response = http_client.post(
f"{API_BASE_URL}/auth/logout",
f"{base_url or API_BASE_URL}/auth/logout",
json={"refresh_token": refresh_token},
timeout=10.0
)

View File

@@ -0,0 +1,4 @@
class BaseCommands:
def __init__(self, owner):
self._owner = owner
self._id = owner.get_id()

View File

@@ -0,0 +1,207 @@
"""
Layout component for FastHTML applications.
This component provides a responsive layout with fixed header/footer,
optional collapsible left/right drawers, and a scrollable main content area.
"""
import logging
from typing import Literal
from fasthtml.common import *
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.UserProfile import UserProfile
from myfasthtml.controls.helpers import mk, Ids
from myfasthtml.core.commands import Command
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.instances import InstancesManager, SingleInstance
from myfasthtml.icons.fluent import panel_left_expand20_regular as left_drawer_icon
from myfasthtml.icons.fluent_p2 import panel_right_expand20_regular as right_drawer_icon
logger = logging.getLogger("LayoutControl")
@dataclass
class LayoutState(DbObject):
def __init__(self, owner):
super().__init__(owner.get_session(), owner.get_id())
left_drawer_open: bool = True
right_drawer_open: bool = False
class Commands(BaseCommands):
def toggle_left_drawer(self):
return Command("ToggleDrawer", "Toggle main layout drawer", self._owner.toggle_drawer, "left")
class Layout(SingleInstance):
"""
A responsive layout component with header, footer, main content area,
and optional collapsible side drawers.
Attributes:
app_name (str): Name of the application
left_drawer (bool): Whether to include a left drawer
right_drawer (bool): Whether to include a right drawer
"""
class DrawerContent:
def __init__(self, owner, side: Literal["left", "right"]):
self._owner = owner
self.side = side
self._content = []
def append(self, content):
self._content.append(content)
def get_content(self):
return self._content
def __init__(self, session, app_name):
"""
Initialize the Layout component.
Args:
app_name (str): Name of the application
left_drawer (bool): Enable left drawer. Default is True.
right_drawer (bool): Enable right drawer. Default is True.
"""
super().__init__(session, Ids.Layout)
self.app_name = app_name
# Content storage
self._header_content = None
self._footer_content = None
self._main_content = None
self._state = LayoutState(self)
self.commands = Commands(self)
self.left_drawer = self.DrawerContent(self, "left")
self.right_drawer = self.DrawerContent(self, "right")
def set_footer(self, content):
"""
Set the footer content.
Args:
content: FastHTML component(s) or content for the footer
"""
self._footer_content = content
def set_main(self, content):
"""
Set the main content area.
Args:
content: FastHTML component(s) or content for the main area
"""
self._main_content = content
def toggle_drawer(self, side: Literal["left", "right"]):
logger.debug(f"Toggle drawer: {side=}, {self._state.left_drawer_open=}")
if side == "left":
self._state.left_drawer_open = not self._state.left_drawer_open
return self._mk_left_drawer_icon(), self._mk_left_drawer()
elif side == "right":
self._state.right_drawer_open = not self._state.right_drawer_open
return self._mk_left_drawer_icon(), self._mk_right_drawer()
else:
raise ValueError("Invalid drawer side")
def _mk_header(self):
"""
Render the header component.
Returns:
Header: FastHTML Header component
"""
return Header(
self._mk_left_drawer_icon(),
InstancesManager.get(self._session, Ids.UserProfile, UserProfile),
cls="mf-layout-header"
)
def _mk_footer(self):
"""
Render the footer component.
Returns:
Footer: FastHTML Footer component
"""
footer_content = self._footer_content if self._footer_content else ""
return Footer(
footer_content,
cls="mf-layout-footer footer sm:footer-horizontal"
)
def _mk_main(self):
"""
Render the main content area.
Returns:
Main: FastHTML Main component
"""
main_content = self._main_content if self._main_content else ""
return Main(
main_content,
cls="mf-layout-main"
)
def _mk_left_drawer(self):
"""
Render the left drawer if enabled.
Returns:
Div or None: FastHTML Div component for left drawer, or None if disabled
"""
return Div(
*self.left_drawer.get_content(),
id=f"{self._id}_ld",
cls=f"mf-layout-drawer mf-layout-left-drawer {'collapsed' if not self._state.left_drawer_open else ''}",
)
def _mk_right_drawer(self):
"""
Render the right drawer if enabled.
Returns:
Div or None: FastHTML Div component for right drawer, or None if disabled
"""
return Div(
*self.right_drawer.get_content(),
cls=f"mf-layout-drawer mf-layout-right-drawer {'collapsed' if not self._state.right_drawer_open else ''}",
id=f"{self._id}_rd",
)
def _mk_left_drawer_icon(self):
return mk.icon(right_drawer_icon if self._state.left_drawer_open else left_drawer_icon,
id=f"{self._id}_ldi",
command=self.commands.toggle_left_drawer())
def render(self):
"""
Render the complete layout.
Returns:
Div: Complete layout as FastHTML Div component
"""
# Wrap everything in a container div
return Div(
self._mk_header(),
self._mk_left_drawer(),
self._mk_main(),
self._mk_right_drawer(),
self._mk_footer(),
id=self._id,
cls="mf-layout",
)
def __ft__(self):
"""
FastHTML magic method for rendering.
Returns:
Div: The rendered layout
"""
return self.render()

View File

@@ -0,0 +1,80 @@
from fasthtml.components import *
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.helpers import Ids, mk
from myfasthtml.core.commands import Command
from myfasthtml.core.instances import SingleInstance, InstancesManager
from myfasthtml.core.utils import retrieve_user_info
from myfasthtml.icons.material import dark_mode_filled, person_outline_sharp
from myfasthtml.icons.material_p1 import light_mode_filled, alternate_email_filled
class UserProfileState:
def __init__(self, owner):
self._owner = owner
self._session = owner.get_session()
self.theme = "light"
def load(self):
user_info = retrieve_user_info(self._session)
user_settings = user_info.get("user_settings", {})
for k, v in user_settings.items():
if hasattr(self, k):
setattr(self, k, v)
def save(self):
user_settings = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
auth_proxy = InstancesManager.get_auth_proxy()
auth_proxy.save_user_info(self._session["access_token"], {"user_settings": user_settings})
class Commands(BaseCommands):
def update_dark_mode(self):
return Command("UpdateDarkMode", "Set the dark mode", self._owner.update_dark_mode).htmx(target=None)
class UserProfile(SingleInstance):
def __init__(self, session):
super().__init__(session, Ids.UserProfile)
self._state = UserProfileState(self)
self._commands = Commands(self)
def update_dark_mode(self, client_response):
self._state.theme = client_response.get("theme", "light")
self._state.save()
def render(self):
user_info = retrieve_user_info(self._session)
return Div(
Div(user_info['username'],
tabindex="0",
role="button",
cls="btn btn-xs"),
Div(
Div(mk.icon(person_outline_sharp, cls="mr-1"), user_info['username'], cls="flex m-1"),
Div(mk.icon(alternate_email_filled, cls="mr-1"), user_info['email'], cls="flex m-1"),
Div(mk.icon(dark_mode_filled, cls="mr-1"), self.mk_dark_mode(), cls="flex m-1"),
Div(A("Logout", cls="btn btn-xs mr-1", href="/logout"), cls="flex justify-center items-center"),
tabindex="-1",
cls="dropdown-content menu w-52 rounded-box bg-base-300 shadow-xl"
),
cls="dropdown dropdown-end"
)
def mk_dark_mode(self):
return Label(
mk.mk(Input(type="checkbox",
name='theme',
aria_label='Dark',
value="dark",
checked='true' if self._state.theme == 'dark' else None,
cls='theme-controller'),
command=self._commands.update_dark_mode()),
light_mode_filled,
dark_mode_filled,
cls="toggle text-base-content"
)
def __ft__(self):
return self.render()

View File

@@ -5,6 +5,13 @@ from myfasthtml.core.commands import Command
from myfasthtml.core.utils import merge_classes
class Ids:
AuthProxy = "mf-auth-proxy"
DbManager = "mf-dbmanager"
Layout = "mf-layout"
UserProfile = "mf-user-profile"
class mk:
@staticmethod

View File

@@ -0,0 +1,18 @@
from myfasthtml.auth.utils import login_user, save_user_info, register_user
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import special_session, UniqueInstance
class AuthProxy(UniqueInstance):
def __init__(self, base_url: str = None):
super().__init__(special_session, Ids.AuthProxy)
self._base_url = base_url
def login_user(self, email: str, password: str):
return login_user(email, password, self._base_url)
def register_user(self, email: str, username: str, password: str):
return register_user(email, username, password, self._base_url)
def save_user_info(self, access_token: str, user_profile: dict):
return save_user_info(access_token, user_profile, self._base_url)

View File

@@ -270,10 +270,10 @@ class Binding:
return self
def get_htmx_params(self):
return self.htmx_extra | {
return {
"hx-post": f"{ROUTE_ROOT}{Routes.Bindings}",
"hx-vals": f'{{"b_id": "{self.id}"}}',
}
} | self.htmx_extra
def init(self):
"""

View File

@@ -1,3 +1,4 @@
import inspect
import uuid
from typing import Optional
@@ -34,12 +35,13 @@ class BaseCommand:
CommandsManager.register(self)
def get_htmx_params(self):
return self._htmx_extra | {
return {
"hx-post": f"{ROUTE_ROOT}{Routes.Commands}",
"hx-swap": "outerHTML",
"hx-vals": f'{{"c_id": "{self.id}"}}',
}
} | self._htmx_extra
def execute(self):
def execute(self, client_response: dict = None):
raise NotImplementedError
def htmx(self, target="this", swap="innerHTML"):
@@ -83,6 +85,9 @@ class BaseCommand:
self._htmx_extra["hx-swap"] = "none"
return self
def __str__(self):
return f"Command({self.name})"
class Command(BaseCommand):
@@ -109,8 +114,9 @@ class Command(BaseCommand):
self.callback = callback
self.args = args
self.kwargs = kwargs
self.requires_client_response = 'client_response' in inspect.signature(callback).parameters
def execute(self):
def execute(self, client_response: dict = None):
ret_from_bindings = []
def binding_result_callback(attr, old, new, results):
@@ -119,21 +125,27 @@ class Command(BaseCommand):
for data in self._bindings:
add_event_listener(ObservableEvent.AFTER_PROPERTY_CHANGE, data, "", binding_result_callback)
ret = self.callback(*self.args, **self.kwargs)
if self.requires_client_response:
ret = self.callback(client_response=client_response, *self.args, **self.kwargs)
else:
ret = self.callback(*self.args, **self.kwargs)
for data in self._bindings:
remove_event_listener(ObservableEvent.AFTER_PROPERTY_CHANGE, data, "", binding_result_callback)
# Set the hx-swap-oob attribute on all elements returned by the callback
if isinstance(ret, (list, tuple)):
for r in ret[1:]:
if hasattr(r, 'attrs'):
r.attrs["hx-swap-oob"] = "true"
if not ret_from_bindings:
return ret
if isinstance(ret, list):
return ret + ret_from_bindings
if isinstance(ret, (list, tuple)):
return list(ret) + ret_from_bindings
else:
return [ret] + ret_from_bindings
def __str__(self):
return f"Command({self.name})"
class CommandsManager:

View File

@@ -0,0 +1,62 @@
from dbengine.dbengine import DbEngine
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import SingleInstance, InstancesManager
from myfasthtml.core.utils import retrieve_user_info
class DbManager(SingleInstance):
def __init__(self, session, root=".myFastHtmlDb", auto_register: bool = True):
super().__init__(session, Ids.DbManager, auto_register=auto_register)
self.db = DbEngine(root=root)
def save(self, entry, obj):
self.db.save(self.get_tenant(), self.get_user(), entry, obj)
def load(self, entry):
return self.db.load(self.get_tenant(), entry)
def exists_entry(self, entry):
return self.db.exists(self.get_tenant(), entry)
def get_tenant(self):
return retrieve_user_info(self._session)["id"]
def get_user(self):
return retrieve_user_info(self._session)["email"]
class DbObject:
"""
When you set the attribute, it persists in DB
It loads from DB at startup
"""
def __init__(self, session, name=None, db_manager=None):
self._session = session
self._name = name or self.__class__.__name__
self._db_manager = db_manager or InstancesManager.get(self._session, Ids.DbManager, DbManager)
# init is possible
if self._db_manager.exists_entry(self._name):
props = self._db_manager.load(self._name)
for k, v in props.items():
setattr(self, k, v)
else:
self._save_self()
def __setattr__(self, name: str, value: str):
if name.startswith("_"):
super().__setattr__(name, value)
return
old_value = getattr(self, name, None)
if old_value == value:
return
super().__setattr__(name, value)
self._save_self()
def _save_self(self):
props = {k: getattr(self, k) for k, v in self.__class__.__dict__.items() if not k.startswith("_")}
self._db_manager.save(self._name, props)

View File

@@ -0,0 +1,113 @@
import uuid
from myfasthtml.controls.helpers import Ids
special_session = {
"user_info": {"id": "** SPECIAL SESSION **"}
}
class DuplicateInstanceError(Exception):
def __init__(self, instance):
self.instance = instance
class BaseInstance:
"""
Base class for all instances (manageable by InstancesManager)
"""
def __init__(self, session: dict, _id: str, auto_register: bool = True):
self._session = session
self._id = _id
if auto_register:
InstancesManager.register(session, self)
def get_id(self):
return self._id
def get_session(self):
return self._session
class SingleInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True):
super().__init__(session, prefix, auto_register)
self._instance = None
class UniqueInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
Does not throw exception if the instance already exists, it simply overwrites it.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True):
super().__init__(session, prefix, auto_register)
self._instance = None
class MultipleInstance(BaseInstance):
"""
Base class for instances that can have multiple instances at a time.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True):
super().__init__(session, f"{prefix}-{str(uuid.uuid4())}", auto_register)
self._instance = None
class InstancesManager:
instances = {}
@staticmethod
def register(session: dict, instance: BaseInstance):
"""
Register an instance in the manager, so that it can be retrieved later.
:param session:
:param instance:
:return:
"""
key = (InstancesManager._get_session_id(session), instance.get_id())
if isinstance(instance, SingleInstance) and key in InstancesManager.instances:
raise DuplicateInstanceError(instance)
InstancesManager.instances[key] = instance
return instance
@staticmethod
def get(session: dict, instance_id: str, instance_type: type = None, *args, **kwargs):
"""
Get or create an instance of the given type (from its id)
:param session:
:param instance_id:
:param instance_type:
:param args:
:param kwargs:
:return:
"""
try:
key = (InstancesManager._get_session_id(session), instance_id)
return InstancesManager.instances[key]
except KeyError:
if instance_type:
return instance_type(session, *args, **kwargs) # it will be automatically registered
else:
raise
@staticmethod
def _get_session_id(session):
if not session:
return "** NOT LOGGED IN **"
if "user_info" not in session:
return "** UNKNOWN USER **"
return session["user_info"].get("id", "** INVALID SESSION **")
@staticmethod
def get_auth_proxy():
return InstancesManager.get(special_session, Ids.AuthProxy)

View File

@@ -3,6 +3,8 @@ import logging
from bs4 import Tag
from fastcore.xml import FT
from fasthtml.fastapp import fast_app
from rich.console import Console
from rich.table import Table
from starlette.routing import Mount
from myfasthtml.core.constants import Routes, ROUTE_ROOT
@@ -60,15 +62,46 @@ def merge_classes(*args):
def debug_routes(app):
routes = []
def _clean_endpoint(endpoint):
res = str(endpoint).replace("<function ", "").replace(".<locals>", "")
return res.split(" at ")[0]
def _debug_routes(_app, _route, prefix=""):
if isinstance(_route, Mount):
for sub_route in _route.app.router.routes:
_debug_routes(_app, sub_route, prefix=_route.path)
else:
print(f"path={prefix}{_route.path}, methods={_route.methods}, endpoint={_route.endpoint}")
routes.append({
"number": len(routes),
"app": str(_app),
"name": _route.name,
"path": _route.path,
"full_path": prefix + _route.path,
"endpoint": _clean_endpoint(_route.endpoint),
"methods": _route.methods if hasattr(_route, "methods") else [],
"path_format": _route.path_format,
"path_regex": str(_route.path_regex),
})
for route in app.router.routes:
_debug_routes(app, route)
if not routes:
print("No routes found.")
return
table = Table(show_header=True, expand=True, header_style="bold")
columns = ["number", "name", "full_path", "endpoint", "methods"] # routes[0].keys()
for column in columns:
table.add_column(column)
for route in routes:
table.add_row(*[str(route[column]) for column in columns])
console = Console()
console.print(table)
def mount_utils(app):
@@ -158,19 +191,53 @@ def quoted_str(s):
return str(s)
def retrieve_user_info(session: dict):
if not session:
return {
"id": "** NOT LOGGED IN **",
"email": "** NOT LOGGED IN **",
"username": "** NOT LOGGED IN **",
"role": [],
"user_settings": {}
}
if "user_info" not in session:
return {
"id": "** UNKNOWN USER **",
"email": "** UNKNOWN USER **",
"username": "** UNKNOWN USER **",
"role": [],
"user_settings": {}
}
return session["user_info"]
def debug_session(session):
if session is None:
return "None"
if not isinstance(session, dict):
return str(session)
return session.get("user_info", {}).get("email", "** UNKNOWN USER **")
@utils_rt(Routes.Commands)
def post(session, c_id: str):
def post(session, c_id: str, client_response: dict = None):
"""
Default routes for all commands.
:param session:
:param c_id:
:param c_id: id of the command set
:param client_response: extra data received from the client (from the browser)
:return:
"""
logger.debug(f"Entering {Routes.Commands} with {session=}, {c_id=}")
client_response.pop("c_id", None)
logger.debug(f"Entering {Routes.Commands} with session='{debug_session(session)}', {c_id=}, {client_response=}")
from myfasthtml.core.commands import CommandsManager
command = CommandsManager.get_command(c_id)
if command:
return command.execute()
return command.execute(client_response)
raise ValueError(f"Command with ID '{c_id}' not found.")

View File

@@ -17,5 +17,15 @@ Update the root folder in `update_icons.py` to point to the root folder of the i
##
```sh
python update_icons.py
```
python manage_icons.py --help
```
To list
```sh
python manage_icons.py list
```
To generate icons
```sh
python manage_icons.py generate --no-dry-run --suppress-suffix
```

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,150 @@
import os
from pathlib import Path
import typer
ROOT_FOLDER = "/home/kodjo/Dev/MyDocManager/src/frontend/node_modules/@sicons"
MAX_SIZE = 2000000
import re
def pascal_to_snake(name: str) -> str:
"""Convert a PascalCase or CamelCase string to snake_case."""
# Insert underscore before capital letters (except the first one)
s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
# Handle consecutive capital letters (like 'HTTPServer' -> 'http_server')
s2 = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1)
return s2.lower()
app = typer.Typer()
def list_sources(source_folder: str):
return os.listdir(source_folder)
def list_icons_from_source(source_folder: str, source: str):
res = []
for f in os.listdir(f"{source_folder}/{source}"):
if f.endswith(".svg"):
res.append(f)
return res
def read_content(source_folder: str, source: str, file_name: str):
with open(f"{source_folder}/{source}/{file_name}", "r") as f:
return f.read().strip()
def get_dir_size(path: str | Path) -> int:
p = Path(path)
if p.is_file():
return p.stat().st_size
elif p.is_dir():
return sum(f.stat().st_size for f in p.rglob('*') if f.is_file())
else:
raise FileNotFoundError(f"Path not found: {path}")
def sizeof_fmt(num, suffix="B"):
for unit in ["", "K", "M", "G", "T"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}P{suffix}"
def init_buffer(source_folder: str, source: str):
buffer = ""
readme_file_path = f"{source_folder}/{source}/README.md"
if os.path.exists(readme_file_path):
with open(readme_file_path, "r") as f_readme:
for line in f_readme:
if line.startswith("#"):
buffer += line
else:
buffer += f"# {line}"
buffer += "\n\n"
buffer += "from fastcore.basics import NotStr\n\n"
return buffer
def flush(dry_run, suppress_suffix, source_folder: str, target_folder: str, buffer: str, size: int, part: int, source: str):
suffix = '' if suppress_suffix else f"_test"
outfile = f"{source}{suffix}.py" if part == 0 else f"{source}_p{part}{suffix}.py"
if not dry_run:
output_path = f"{target_folder}/{outfile}" if part == 0 else f"{target_folder}/{outfile}"
with open(output_path, "w") as f:
f.write(buffer)
typer.echo(f" Generated {source} as {outfile} ({sizeof_fmt(size)}, max={sizeof_fmt(MAX_SIZE)})")
return init_buffer(source_folder, source), 0, part + 1
@app.command("list")
def list_icons(
source: str = typer.Argument(None, help="The source file to list icons from"),
source_folder: str = typer.Option(ROOT_FOLDER, help="The source folder containing icons"),
count: bool = typer.Option(False, help="Counts the number of items"),
size: bool = typer.Option(False, help="Gets the size of the items"),
):
res = []
if source:
res.extend(list_icons_from_source(source_folder, source))
else:
res.extend(list_sources(source_folder))
if count:
typer.echo(len(res))
return
if size:
path = f"{source_folder}/{source}" if source else f"{source_folder}"
size = get_dir_size(path)
typer.echo(sizeof_fmt(size))
return
for r in res:
typer.echo(r)
@app.command("generate")
def generate_icons(
source: str = typer.Argument(None, help="The source file to list icons from"),
source_folder: str = typer.Option(ROOT_FOLDER, help="The source folder containing icons"),
target_folder: str = typer.Option(".", help="The folder where to create the python files."),
top: int = typer.Option(0, help="The number of top items to generate"),
dry_run: bool = typer.Option(True, help="Does not generate the icons"),
suppress_suffix: bool = typer.Option(False, help="Does not add the suffix to the icon names"),
):
sources = [source] if source else list_sources(source_folder)
for current_source in sources:
typer.echo(f"Generating icons for {current_source}")
buffer = init_buffer(source_folder, current_source)
size = 0
part = 0
for index, svg_file in enumerate(list_icons_from_source(source_folder, current_source)):
if 0 < top <= index:
break
icon_name = os.path.splitext(os.path.basename(svg_file))[0]
svg_content = read_content(source_folder, current_source, svg_file)
svg_content = svg_content.replace("<svg ", f'<svg name="{current_source}-{icon_name}" ').replace("\n", "")
content = f"{pascal_to_snake(icon_name)} = NotStr('''{svg_content}''')"
buffer += f"{content}\n"
size += len(content)
if size > MAX_SIZE:
buffer, size, part = flush(dry_run, suppress_suffix, source_folder, target_folder, buffer, size, part,
current_source)
flush(dry_run, suppress_suffix, source_folder, target_folder, buffer, size, part, current_source)
if __name__ == "__main__":
app()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -9,6 +9,7 @@ from starlette.responses import Response
from myfasthtml.auth.routes import setup_auth_routes
from myfasthtml.auth.utils import create_auth_beforeware
from myfasthtml.core.AuthProxy import AuthProxy
from myfasthtml.core.utils import utils_app
logger = logging.getLogger("MyFastHtml")
@@ -32,6 +33,7 @@ def get_asset_content(filename):
def create_app(daisyui: Optional[bool] = True,
protect_routes: Optional[bool] = True,
mount_auth_app: Optional[bool] = False,
base_url: Optional[str] = None,
**kwargs) -> Any:
"""
Creates and configures a FastHtml application with optional support for daisyUI themes and
@@ -39,22 +41,20 @@ def create_app(daisyui: Optional[bool] = True,
:param daisyui: Flag to enable or disable inclusion of daisyUI-related assets for styling.
Defaults to False.
:type daisyui: Optional[bool]
:param protect_routes: Flag to enable or disable routes protection based on authentication.
Defaults to True.
:type protect_routes: Optional[bool]
:param mount_auth_app: Flag to enable or disable mounting of authentication routes.
Defaults to False.
:type mount_auth_app: Optional[bool]
:param base_url: Url to use for the application (used by the auth APIs)
:param kwargs: Arbitrary keyword arguments forwarded to the application initialization logic.
:return: A tuple containing the FastHtml application instance and the associated router.
:rtype: Any
"""
hdrs = [Link(href="/myfasthtml/myfasthtml.css", rel="stylesheet", type="text/css")]
hdrs = [
Link(href="/myfasthtml/myfasthtml.css", rel="stylesheet", type="text/css"),
Script(src="/myfasthtml/myfasthtml.js"),
]
if daisyui:
hdrs += [
@@ -67,11 +67,12 @@ def create_app(daisyui: Optional[bool] = True,
app, rt = fasthtml.fastapp.fast_app(before=beforeware, hdrs=tuple(hdrs), **kwargs)
# remove the global static files routes
static_route_exts_get = app.routes.pop(0)
original_routes = app.routes[:]
app.routes.clear()
# Serve assets
@app.get("/myfasthtml/{filename:path}.{ext:static}")
def serve_asset(filename: str, ext: str):
def serve_assets(filename: str, ext: str):
path = filename + "." + ext
try:
content = get_asset_content(path)
@@ -86,13 +87,17 @@ def create_app(daisyui: Optional[bool] = True,
return Response(f"Asset not found: {path}", status_code=404)
# and put it back after the myfasthtml static files routes
app.routes.append(static_route_exts_get)
for r in original_routes:
app.routes.append(r)
# route the commands and the bindings
app.mount("/myfasthtml", utils_app)
if mount_auth_app:
# Setup authentication routes
setup_auth_routes(app, rt)
setup_auth_routes(app, rt, base_url=base_url)
# create the AuthProxy instance
AuthProxy(base_url) # using the auto register mechanism to expose it
return app, rt

View File

@@ -2,7 +2,7 @@ from dataclasses import dataclass
from typing import Any
import pytest
from fasthtml.components import Button
from fasthtml.components import Button, Div
from myutils.observable import make_observable, bind
from myfasthtml.core.commands import Command, CommandsManager
@@ -93,3 +93,32 @@ def test_i_can_bind_a_command_to_an_observable_2():
res = command.execute()
assert res == ["another 1", "another 2", ("hello", "new value")]
def test_by_default_swap_is_set_to_outer_html():
command = Command('test', 'Command description', callback)
elt = Button()
updated = command.bind_ft(elt)
expected = Button(hx_post=f"{ROUTE_ROOT}{Routes.Commands}", hx_swap="outerHTML")
assert matches(updated, expected)
@pytest.mark.parametrize("return_values", [
[Div(), Div(), "hello", Div()], # list
(Div(), Div(), "hello", Div()) # tuple
])
def test_swap_oob_is_automatically_set_when_multiple_elements_are_returned(return_values):
"""Test that hx-swap-oob is automatically set, but not for the first."""
def another_callback():
return return_values
command = Command('test', 'Command description', another_callback)
res = command.execute()
assert "hx_swap_oob" not in res[0].attrs
assert res[1].attrs["hx-swap-oob"] == "true"
assert res[3].attrs["hx-swap-oob"] == "true"

View File

@@ -0,0 +1,98 @@
import shutil
from dataclasses import dataclass
import pytest
from myfasthtml.core.dbmanager import DbManager, DbObject
@pytest.fixture(scope="session")
def session():
return {
"user_info": {
"id": "test_tenant_id",
"email": "test@email.com",
"username": "test user",
"role": [],
}
}
@pytest.fixture
def db_manager(session):
shutil.rmtree("TestDb", ignore_errors=True)
db_manager_instance = DbManager(session, root="TestDb", auto_register=False)
yield db_manager_instance
shutil.rmtree("TestDb", ignore_errors=True)
def simplify(res: dict) -> dict:
return {k: v for k, v in res.items() if not k.startswith("_")}
def test_i_can_init(session, db_manager):
@dataclass
class DummyObject(DbObject):
def __init__(self, sess: dict):
super().__init__(sess, "DummyObject", db_manager)
value: str = "hello"
number: int = 42
DummyObject(session)
assert simplify(db_manager.load("DummyObject")) == {"value": "hello", "number": 42}
def test_i_can_init_from_db(session, db_manager):
@dataclass
class DummyObject(DbObject):
def __init__(self, sess: dict):
super().__init__(sess, "DummyObject", db_manager)
value: str = "hello"
number: int = 42
# insert other values in db
db_manager.save("DummyObject", {"value": "other_value", "number": 34})
dummy = DummyObject(session)
assert dummy.value == "other_value"
assert dummy.number == 34
def test_db_is_updated_when_attribute_is_modified(session, db_manager):
@dataclass
class DummyObject(DbObject):
def __init__(self, sess: dict):
super().__init__(sess, "DummyObject", db_manager)
value: str = "hello"
number: int = 42
dummy = DummyObject(session)
dummy.value = "other_value"
assert simplify(db_manager.load("DummyObject")) == {"value": "other_value", "number": 42}
def test_i_do_not_save_in_db_when_value_is_the_same(session, db_manager):
@dataclass
class DummyObject(DbObject):
def __init__(self, sess: dict):
super().__init__(sess, "DummyObject", db_manager)
value: str = "hello"
number: int = 42
dummy = DummyObject(session)
dummy.value = "other_value"
in_db_1 = db_manager.load("DummyObject")
dummy.value = "other_value"
in_db_2 = db_manager.load("DummyObject")
assert in_db_1["__parent__"] == in_db_2["__parent__"]