11 Commits

30 changed files with 1977 additions and 269 deletions

View File

@@ -25,3 +25,4 @@ clean: clean-build clean-tests
clean-all : clean
rm -rf src/.sesskey
rm -rf src/Users.db
rm -rf src/.myFastHtmlDb

View File

@@ -1,24 +1,26 @@
import logging
import logging.config
import yaml
from fasthtml import serve
from fasthtml.components import *
from myfasthtml.controls.InstancesDebugger import InstancesDebugger
from myfasthtml.controls.Layout import Layout
from myfasthtml.controls.TabsManager import TabsManager
from myfasthtml.controls.helpers import Ids, mk
from myfasthtml.core.commands import Command
from myfasthtml.core.instances import InstancesManager
from myfasthtml.core.instances import InstancesManager, RootInstance
from myfasthtml.icons.carbon import volume_object_storage
from myfasthtml.myfastapp import create_app
logging.basicConfig(
level=logging.DEBUG, # Set logging level to DEBUG
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # Log format
datefmt='%Y-%m-%d %H:%M:%S', # Timestamp format
)
with open('logging.yaml', 'r') as f:
config = yaml.safe_load(f)
# At the top of your script or module
logging.config.dictConfig(config)
app, rt = create_app(protect_routes=True,
mount_auth_app=True,
pico=False,
vis=True,
title="MyFastHtml",
live=True,
base_url="http://localhost:5003")
@@ -26,26 +28,21 @@ app, rt = create_app(protect_routes=True,
@rt("/")
def index(session):
layout = InstancesManager.get(session, Ids.Layout, Layout, "Testing Layout")
layout = InstancesManager.get(session, Ids.Layout, Layout, RootInstance, "Testing Layout")
layout.set_footer("Goodbye World")
for i in range(1000):
layout.left_drawer.append(Div(f"Left Drawer Item {i}"))
layout.right_drawer.append(Div(f"Left Drawer Item {i}"))
tabs_manager = TabsManager(session, _id="main")
btn = mk.button("Add Tab",
command=Command("AddTab",
"Add a new tab",
tabs_manager.on_new_tab, "Tabs", Div("Content")).
htmx(target=f"#{tabs_manager.get_id()}"))
tabs_manager = TabsManager(layout, _id=f"{Ids.TabsManager}-main")
instances_debugger = InstancesManager.get(session, Ids.InstancesDebugger, InstancesDebugger, layout)
btn_show_right_drawer = mk.button("show",
command=Command("ShowRightDrawer",
"Show Right Drawer",
layout.toggle_drawer, "right"),
command=layout.commands.toggle_drawer("right"),
id="btn_show_right_drawer_id")
layout.set_footer(btn_show_right_drawer)
btn_show_instances_debugger = mk.icon(volume_object_storage,
command=tabs_manager.commands.add_tab("Instances", instances_debugger),
id=instances_debugger.get_id())
layout.header_left.add(tabs_manager.add_tab_btn())
layout.header_right.add(btn_show_right_drawer)
layout.left_drawer.add(btn_show_instances_debugger)
layout.set_main(tabs_manager)
return layout

54
src/logging.yaml Normal file
View File

@@ -0,0 +1,54 @@
version: 1
disable_existing_loggers: False
formatters:
default:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
formatter: default
root:
level: DEBUG
handlers: [console]
loggers:
# Explicit logger configuration (example)
multipart.multipart:
level: INFO
handlers: [console]
propagate: False
python_multipart.multipart:
level: INFO
handlers: [ console ]
propagate: False
httpcore:
level: ERROR
handlers: [ console ]
propagate: False
httpx:
level: INFO
handlers: [ console ]
propagate: False
watchfiles.main:
level: ERROR
handlers: [console]
propagate: False
dbengine.dbengine:
level: ERROR
handlers: [console]
propagate: False
passlib.registry:
level: ERROR
handlers: [console]
propagate: False
socket.socket:
level: ERROR
handlers: [ console ]
propagate: False

View File

@@ -1,3 +1,7 @@
:root {
--color-border-primary: color-mix(in oklab, var(--color-primary) 40%, #0000);
}
.mf-icon-20 {
width: 20px;
min-width: 20px;
@@ -14,6 +18,22 @@
margin-bottom: 4px;
}
.mf-icon-24 {
width: 24px;
min-width: 24px;
height: 24px;
margin-top: auto;
margin-bottom: 4px;
}
.mf-icon-32 {
width: 32px;
min-width: 32px;
height: 32px;
margin-top: auto;
margin-bottom: 4px;
}
/*
* MF Layout Component - CSS Grid Layout
* Provides fixed header/footer, collapsible drawers, and scrollable main content
@@ -81,13 +101,14 @@
/* Left drawer */
.mf-layout-left-drawer {
grid-area: left-drawer;
border-right: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
border-right: 1px solid var(--color-border-primary);
}
/* Right drawer */
.mf-layout-right-drawer {
grid-area: right-drawer;
border-left: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);
/*border-left: 1px solid color-mix(in oklab, var(--color-base-content) 10%, #0000);*/
border-left: 1px solid var(--color-border-primary);
}
/* Collapsed drawer states */
@@ -171,11 +192,7 @@
grid-template-columns: 1fr;
}
/**
* Layout Drawer Resizer Styles
*
* Styles for the resizable drawer borders with visual feedback
*/
/**
* Layout Drawer Resizer Styles
*
@@ -296,8 +313,21 @@
.mf-tabs-header {
display: flex;
gap: 0;
flex-shrink: 0;
flex-shrink: 1;
min-height: 25px;
overflow-x: hidden;
overflow-y: hidden;
white-space: nowrap;
}
.mf-tabs-header-wrapper {
display: flex;
justify-content: space-between;
align-items: center;
width: 100%;
/*overflow: hidden; important */
}
/* Individual Tab Button using DaisyUI tab classes */
@@ -319,6 +349,7 @@
background-color: var(--color-base-100);
color: var(--color-base-content);
border-radius: .25rem;
border-bottom: 4px solid var(--color-primary);
box-shadow: 0 1px oklch(100% 0 0/calc(var(--depth) * .1)) inset, 0 1px 1px -1px color-mix(in oklab, var(--color-neutral) calc(var(--depth) * 50%), #0000), 0 1px 6px -4px color-mix(in oklab, var(--color-neutral) calc(var(--depth) * 100%), #0000);
}
@@ -349,10 +380,17 @@
/* Tab Content Area */
.mf-tab-content {
flex: 1;
overflow: auto;
height: 100%;
}
.mf-tab-content-wrapper {
flex: 1;
overflow: auto;
background-color: var(--color-base-100);
padding: 1rem;
border-top: 1px solid var(--color-border-primary);
}
/* Empty Content State */
@@ -363,4 +401,15 @@
height: 100%;
@apply text-base-content/50;
font-style: italic;
}
.mf-vis {
width: 100%;
height: 100%;
}
.mf-search-results {
margin-top: 0.5rem;
max-height: 200px;
overflow: auto;
}

View File

@@ -151,7 +151,118 @@ function initLayoutResizer(layoutId) {
initResizers();
// Re-initialize after HTMX swaps within this layout
layoutElement.addEventListener('htmx:afterSwap', function(event) {
layoutElement.addEventListener('htmx:afterSwap', function (event) {
initResizers();
});
}
function initBoundaries(elementId, updateUrl) {
function updateBoundaries() {
const container = document.getElementById(elementId);
if (!container) {
console.warn("initBoundaries : element " + elementId + " is not found !");
return;
}
const rect = container.getBoundingClientRect();
const width = Math.floor(rect.width);
const height = Math.floor(rect.height);
console.log("boundaries: ", rect)
// Send boundaries to server
htmx.ajax('POST', updateUrl, {
target: '#' + elementId,
swap: 'outerHTML',
values: {width: width, height: height}
});
}
// Debounce function
let resizeTimeout;
function debouncedUpdate() {
clearTimeout(resizeTimeout);
resizeTimeout = setTimeout(updateBoundaries, 250);
}
// Update on load
setTimeout(updateBoundaries, 100);
// Update on window resize
const container = document.getElementById(elementId);
container.addEventListener('resize', debouncedUpdate);
// Cleanup on element removal
if (container) {
const observer = new MutationObserver(function (mutations) {
mutations.forEach(function (mutation) {
mutation.removedNodes.forEach(function (node) {
if (node.id === elementId) {
window.removeEventListener('resize', debouncedUpdate);
}
});
});
});
observer.observe(container.parentNode, {childList: true});
}
}
/**
* Updates the tabs display by showing the active tab content and scrolling to make it visible.
* This function is called when switching between tabs to update both the content visibility
* and the tab button states.
*
* @param {string} controllerId - The ID of the tabs controller element (format: "{managerId}-controller")
*/
function updateTabs(controllerId) {
const controller = document.getElementById(controllerId);
if (!controller) {
console.warn(`Controller ${controllerId} not found`);
return;
}
const activeTabId = controller.dataset.activeTab;
if (!activeTabId) {
console.warn('No active tab ID found');
return;
}
// Extract manager ID from controller ID (remove '-controller' suffix)
const managerId = controllerId.replace('-controller', '');
// Hide all tab contents for this manager
const contentWrapper = document.getElementById(`${managerId}-content-wrapper`);
if (contentWrapper) {
contentWrapper.querySelectorAll('.mf-tab-content').forEach(content => {
content.classList.add('hidden');
});
// Show the active tab content
const activeContent = document.getElementById(`${managerId}-${activeTabId}-content`);
if (activeContent) {
activeContent.classList.remove('hidden');
}
}
// Update active tab button styling
const header = document.getElementById(`${managerId}-header`);
if (header) {
// Remove active class from all tabs
header.querySelectorAll('.mf-tab-button').forEach(btn => {
btn.classList.remove('mf-tab-active');
});
// Add active class to current tab
const activeButton = header.querySelector(`[data-tab-id="${activeTabId}"]`);
if (activeButton) {
activeButton.classList.add('mf-tab-active');
// Scroll to make active tab visible if needed
activeButton.scrollIntoView({
behavior: 'smooth',
block: 'nearest',
inline: 'nearest'
});
}
}
}

File diff suppressed because one or more lines are too long

View File

@@ -41,7 +41,7 @@ Note: `python-jose` may already be installed if you have FastAPI.
### 1. Update API configuration in `auth/utils.py`
```python
API_BASE_URL = "http://localhost:5001" # Your FastAPI backend URL
API_BASE_URL = "http://localhost:5003" # Your FastAPI backend URL
JWT_SECRET = "jwt-secret-to-change" # Must match your FastAPI secret
```

View File

@@ -0,0 +1,62 @@
from fasthtml.xtend import Script
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.commands import Command
from myfasthtml.core.instances import SingleInstance
class BoundariesState:
def __init__(self):
# persisted in DB
self.width: int = 0
self.height: int = 0
class Commands(BaseCommands):
def update_boundaries(self):
return Command(f"{self._prefix}UpdateBoundaries",
"Update component boundaries",
self._owner.update_boundaries).htmx(target=f"{self._owner.get_id()}")
class Boundaries(SingleInstance):
"""
Ask the boundaries of the given control
Keep the boundaries updated
"""
def __init__(self, session, owner, container_id: str = None, on_resize=None):
super().__init__(session, Ids.Boundaries, owner)
self._owner = owner
self._container_id = container_id or owner.get_id()
self._on_resize = on_resize
self._commands = Commands(self)
self._state = BoundariesState()
self._get_boundaries_command = self._commands.update_boundaries()
@property
def width(self):
return self._state.width
@property
def height(self):
return self._state.height
def update_boundaries(self, width: int, height: int):
"""
Update the component boundaries.
Args:
width: Available width in pixels
height: Available height in pixels
"""
self._state.width = width
self._state.height = height
return self._on_resize() if self._on_resize else self._owner
def render(self):
return Script(f"initBoundaries('{self._container_id}', '{self._get_boundaries_command.url}');")
def __ft__(self):
return self.render()

View File

@@ -0,0 +1,26 @@
from myfasthtml.controls.VisNetwork import VisNetwork
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import SingleInstance, InstancesManager
from myfasthtml.core.network_utils import from_parent_child_list
class InstancesDebugger(SingleInstance):
def __init__(self, session, parent, _id=None):
super().__init__(session, Ids.InstancesDebugger, parent)
def render(self):
instances = self._get_instances()
nodes, edges = from_parent_child_list(instances,
id_getter=lambda x: x.get_id(),
label_getter=lambda x: x.get_prefix(),
parent_getter=lambda x: x.get_parent().get_id() if x.get_parent() else None
)
vis_network = VisNetwork(self, nodes=nodes, edges=edges)
return vis_network
def _get_instances(self):
return list(InstancesManager.instances.values())
def __ft__(self):
return self.render()

View File

@@ -10,11 +10,13 @@ from typing import Literal
from fasthtml.common import *
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.Boundaries import Boundaries
from myfasthtml.controls.UserProfile import UserProfile
from myfasthtml.controls.helpers import mk, Ids
from myfasthtml.core.commands import Command
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.instances import InstancesManager, SingleInstance
from myfasthtml.core.utils import get_id
from myfasthtml.icons.fluent import panel_left_expand20_regular as left_drawer_icon
from myfasthtml.icons.fluent_p2 import panel_right_expand20_regular as right_drawer_icon
@@ -33,7 +35,7 @@ class LayoutState(DbObject):
class Commands(BaseCommands):
def toggle_drawer(self, side: Literal["left", "right"]):
return Command("ToggleDrawer", "Toggle main layout drawer", self._owner.toggle_drawer, side)
return Command("ToggleDrawer", f"Toggle {side} layout drawer", self._owner.toggle_drawer, side)
def update_drawer_width(self, side: Literal["left", "right"]):
"""
@@ -64,19 +66,25 @@ class Layout(SingleInstance):
right_drawer (bool): Whether to include a right drawer
"""
class DrawerContent:
def __init__(self, owner, side: Literal["left", "right"]):
class Content:
def __init__(self, owner):
self._owner = owner
self.side = side
self._content = []
self._ids = set()
def append(self, content):
def add(self, content):
content_id = get_id(content)
if content_id in self._ids:
return
self._content.append(content)
if content_id is not None:
self._ids.add(content_id)
def get_content(self):
return self._content
def __init__(self, session, app_name):
def __init__(self, session, app_name, parent=None):
"""
Initialize the Layout component.
@@ -85,17 +93,20 @@ class Layout(SingleInstance):
left_drawer (bool): Enable left drawer. Default is True.
right_drawer (bool): Enable right drawer. Default is True.
"""
super().__init__(session, Ids.Layout)
super().__init__(session, Ids.Layout, parent)
self.app_name = app_name
# Content storage
self._header_content = None
self._footer_content = None
self._main_content = None
self._state = LayoutState(self)
self._boundaries = Boundaries(session, self)
self.commands = Commands(self)
self.left_drawer = self.DrawerContent(self, "left")
self.right_drawer = self.DrawerContent(self, "right")
self.left_drawer = self.Content(self)
self.right_drawer = self.Content(self)
self.header_left = self.Content(self)
self.header_right = self.Content(self)
self.footer_left = self.Content(self)
self.footer_right = self.Content(self)
def set_footer(self, content):
"""
@@ -159,8 +170,16 @@ class Layout(SingleInstance):
Header: FastHTML Header component
"""
return Header(
self._mk_left_drawer_icon(),
InstancesManager.get(self._session, Ids.UserProfile, UserProfile),
Div( # left
self._mk_left_drawer_icon(),
*self.header_left.get_content(),
cls="flex gap-1"
),
Div( # right
*self.header_right.get_content(),
InstancesManager.get(self._session, Ids.UserProfile, UserProfile),
cls="flex gap-1"
),
cls="mf-layout-header"
)
@@ -281,4 +300,4 @@ class Layout(SingleInstance):
Returns:
Div: The rendered layout
"""
return self.render()
return self.render()

View File

@@ -0,0 +1,91 @@
import logging
from typing import Callable, Any
from fasthtml.components import *
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.helpers import Ids, mk
from myfasthtml.core.commands import Command
from myfasthtml.core.instances import MultipleInstance, BaseInstance
from myfasthtml.core.matching_utils import subsequence_matching, fuzzy_matching
logger = logging.getLogger("Search")
class Commands(BaseCommands):
def search(self):
return (Command("Search", f"Search {self._owner.items_names}", self._owner.on_search).
htmx(target=f"#{self._owner.get_id()}-results",
trigger="keyup changed delay:300ms",
swap="innerHTML"))
class Search(MultipleInstance):
def __init__(self,
parent: BaseInstance,
_id=None,
items_names=None, # what is the name of the items to filter
items=None, # first set of items to filter
get_attr: Callable[[Any], str] = None, # items is a list of objects: how to get the str to filter
template: Callable[[Any], Any] = None): # once filtered, what to render ?
"""
Represents a component for managing and filtering a list of items based on specific criteria.
This class initializes with a session, an optional identifier, a list of item names,
a callable for extracting a string value from items, and a template callable for rendering
the filtered items. It provides functionality to handle and organize item-based operations.
:param session: The session object to maintain state or context across operations.
:param _id: Optional identifier for the component.
:param items: An optional list of names for the items to be filtered.
:param get_attr: Callable function to extract a string value from an item for filtering. Defaults to a
function that returns the item as is.
:param template: Callable function to render the filtered items. Defaults to a Div rendering function.
"""
super().__init__(Ids.Search, parent, _id=_id)
self.items_names = items_names or ''
self.items = items or []
self.filtered = self.items.copy()
self.get_attr = get_attr or (lambda x: x)
self.template = template or Div
self.commands = Commands(self)
def set_items(self, items):
self.items = items
self.filtered = self.items.copy()
return self
def on_search(self, query):
logger.debug(f"on_search {query=}")
self.search(query)
return tuple(self._mk_search_results())
def search(self, query):
logger.debug(f"search {query=}")
if query is None or query.strip() == "":
self.filtered = self.items.copy()
else:
res_seq = subsequence_matching(query, self.items, get_attr=self.get_attr)
res_fuzzy = fuzzy_matching(query, self.items, get_attr=self.get_attr)
self.filtered = res_seq + res_fuzzy
return self.filtered
def _mk_search_results(self):
return [self.template(item) for item in self.filtered]
def render(self):
return Div(
mk.mk(Input(name="query", id=f"{self._id}-search", type="text", placeholder="Search...", cls="input input-xs"),
command=self.commands.search()),
Div(
*self._mk_search_results(),
id=f"{self._id}-results",
cls="mf-search-results",
),
id=f"{self._id}",
)
def __ft__(self):
return self.render()

View File

@@ -1,25 +1,47 @@
import logging
import uuid
from dataclasses import dataclass
from typing import Any
from fasthtml.common import Div, Button, Span
from fasthtml.common import Div, Span
from fasthtml.xtend import Script
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.Search import Search
from myfasthtml.controls.VisNetwork import VisNetwork
from myfasthtml.controls.helpers import Ids, mk
from myfasthtml.core.commands import Command
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.instances import MultipleInstance, BaseInstance
from myfasthtml.icons.fluent_p3 import dismiss_circle16_regular
from myfasthtml.core.instances_helper import InstancesHelper
from myfasthtml.icons.fluent_p1 import tabs24_regular
from myfasthtml.icons.fluent_p3 import dismiss_circle16_regular, tab_add24_regular
logger = logging.getLogger("TabsManager")
vis_nodes = [
{"id": 1, "label": "Node 1"},
{"id": 2, "label": "Node 2"},
{"id": 3, "label": "Node 3"},
{"id": 4, "label": "Node 4"},
{"id": 5, "label": "Node 5"}
]
vis_edges = [
{"from": 1, "to": 3},
{"from": 1, "to": 2},
{"from": 2, "to": 4},
{"from": 2, "to": 5},
{"from": 3, "to": 3}
]
# Structure de tabs:
# {
# "tab-uuid-1": {
# "label": "Users",
# "component_type": "UsersPanel",
# "component_id": "UsersPanel-abc123",
# },
# "tab-uuid-2": { ... }
# }
@dataclass
class Boundaries:
"""Store component boundaries"""
width: int = 1020
height: int = 782
class TabsManagerState(DbObject):
def __init__(self, owner):
@@ -36,23 +58,74 @@ class TabsManagerState(DbObject):
class Commands(BaseCommands):
def show_tab(self, tab_id):
return Command(f"{self._prefix}SowTab",
return Command(f"{self._prefix}ShowTab",
"Activate or show a specific tab",
self._owner.show_tab, tab_id).htmx(target=f"#{self._id}", swap="outerHTML")
self._owner.show_tab, tab_id).htmx(target=f"#{self._id}-controller", swap="outerHTML")
def close_tab(self, tab_id):
return Command(f"{self._prefix}CloseTab",
"Close a specific tab",
self._owner.close_tab, tab_id).htmx(target=f"#{self._id}", swap="outerHTML")
def add_tab(self, label: str, component: Any, auto_increment=False):
return (Command(f"{self._prefix}AddTab",
"Add a new tab",
self._owner.on_new_tab, label, component, auto_increment).
htmx(target=f"#{self._id}-controller"))
class TabsManager(MultipleInstance):
def __init__(self, session, _id=None):
super().__init__(session, Ids.TabsManager, _id=_id)
_tab_count = 0
def __init__(self, parent, _id=None):
super().__init__(Ids.TabsManager, parent, _id=_id)
self._state = TabsManagerState(self)
self.commands = Commands(self)
self._boundaries = Boundaries()
self._search = Search(self,
items=self._get_tab_list(),
get_attr=lambda x: x["label"],
template=self._mk_tab_button)
logger.debug(f"TabsManager created with id: {self._id}")
logger.debug(f" tabs : {self._get_ordered_tabs()}")
logger.debug(f" active tab : {self._state.active_tab}")
def get_state(self):
return self._state
def on_new_tab(self, label: str, component: Any):
self.add_tab(label, component)
return self
def _get_ordered_tabs(self):
return {tab_id: self._state.tabs.get(tab_id, None) for tab_id in self._state.tabs_order}
def _get_tab_content(self, tab_id):
if tab_id not in self._state.tabs:
return None
tab_config = self._state.tabs[tab_id]
if tab_config["component_type"] is None:
return None
return InstancesHelper.dynamic_get(self, tab_config["component_type"], tab_config["component_id"])
@staticmethod
def _get_tab_count():
res = TabsManager._tab_count
TabsManager._tab_count += 1
return res
def on_new_tab(self, label: str, component: Any, auto_increment=False):
logger.debug(f"on_new_tab {label=}, {component=}, {auto_increment=}")
if auto_increment:
label = f"{label}_{self._get_tab_count()}"
component = component or VisNetwork(self, nodes=vis_nodes, edges=vis_edges)
tab_id = self._tab_already_exists(label, component)
if tab_id:
return self.show_tab(tab_id)
tab_id = self.add_tab(label, component)
return (
self._mk_tabs_controller(),
self._wrap_tab_content(self._mk_tab_content(tab_id, component)),
self._mk_tabs_header_wrapper(True),
)
def add_tab(self, label: str, component: Any, activate: bool = True) -> str:
"""
@@ -66,24 +139,18 @@ class TabsManager(MultipleInstance):
Returns:
tab_id: The UUID of the tab (new or existing)
"""
logger.debug(f"add_tab {label=}, component={component}, activate={activate}")
# copy the state to avoid multiple database call
state = self._state.copy()
# Extract component ID if the component has a get_id() method
component_type, component_id = None, None
if isinstance(component, BaseInstance):
component_type = type(component).__name__
component_type = component.get_prefix() if isinstance(component, BaseInstance) else type(component).__name__
component_id = component.get_id()
# Check if a tab with the same component_type, component_id AND label already exists
existing_tab_id = None
if component_id is not None:
for tab_id, tab_data in state.tabs.items():
if (tab_data.get('component_type') == component_type and
tab_data.get('component_id') == component_id and
tab_data.get('label') == label):
existing_tab_id = tab_id
break
existing_tab_id = self._tab_already_exists(label, component)
if existing_tab_id:
# Update existing tab (only the component instance in memory)
@@ -95,6 +162,7 @@ class TabsManager(MultipleInstance):
# Add tab metadata to state
state.tabs[tab_id] = {
'id': tab_id,
'label': label,
'component_type': component_type,
'component_id': component_id
@@ -112,87 +180,224 @@ class TabsManager(MultipleInstance):
# finally, update the state
self._state.update(state)
self._search.set_items(self._get_tab_list())
return tab_id
def show_tab(self, tab_id):
logger.debug(f"show_tab {tab_id=}")
if tab_id not in self._state.tabs:
logger.debug(f" Tab not found.")
return None
logger.debug(f" Tab label is: {self._state.tabs[tab_id]['label']}")
self._state.active_tab = tab_id
if tab_id not in self._state._tabs_content:
logger.debug(f" Content does not exist. Creating it.")
content = self._get_tab_content(tab_id)
tab_content = self._mk_tab_content(tab_id, content)
self._state._tabs_content[tab_id] = tab_content
return self._mk_tabs_controller(), self._wrap_tab_content(tab_content)
else:
logger.debug(f" Content already exists. Just switch.")
return self._mk_tabs_controller()
def close_tab(self, tab_id: str):
"""
Close a tab and remove it from the tabs manager.
Args:
tab_id: ID of the tab to close
Returns:
Self for chaining
"""
logger.debug(f"close_tab {tab_id=}")
if tab_id not in self._state.tabs:
return self
# Copy state
state = self._state.copy()
# Remove from tabs and order
del state.tabs[tab_id]
state.tabs_order.remove(tab_id)
# Remove from content
if tab_id in state._tabs_content:
del state._tabs_content[tab_id]
# If closing active tab, activate another one
if state.active_tab == tab_id:
if state.tabs_order:
# Activate the first remaining tab
state.active_tab = state.tabs_order[0]
else:
state.active_tab = None
# Update state
self._state.update(state)
self._search.set_items(self._get_tab_list())
return self
def _mk_tab_button(self, tab_id: str, tab_data: dict):
def add_tab_btn(self):
return mk.icon(tab_add24_regular,
id=f"{self._id}-add-tab-btn",
cls="mf-tab-btn",
command=self.commands.add_tab(f"Untitled",
None,
True))
def _mk_tabs_controller(self):
return Div(
Div(id=f"{self._id}-controller", data_active_tab=f"{self._state.active_tab}"),
Script(f'updateTabs("{self._id}-controller");'),
)
def _mk_tabs_header_wrapper(self, oob=False):
# Create visible tab buttons
visible_tab_buttons = [
self._mk_tab_button(self._state.tabs[tab_id])
for tab_id in self._state.tabs_order
if tab_id in self._state.tabs
]
header_content = [*visible_tab_buttons]
return Div(
Div(*header_content, cls="mf-tabs-header", id=f"{self._id}-header"),
self._mk_show_tabs_menu(),
id=f"{self._id}-header-wrapper",
cls="mf-tabs-header-wrapper",
hx_swap_oob="true" if oob else None
)
def _mk_tab_button(self, tab_data: dict, in_dropdown: bool = False):
"""
Create a single tab button with its label and close button.
Args:
tab_id: Unique identifier for the tab
tab_data: Dictionary containing tab information (label, component_type, etc.)
in_dropdown: Whether this tab is rendered in the dropdown menu
Returns:
Button element representing the tab
"""
tab_id = tab_data["id"]
is_active = tab_id == self._state.active_tab
return Button(
mk.mk(Span(tab_data.get("label", "Untitled"), cls="mf-tab-label"), command=self.commands.show_tab(tab_id)),
close_btn = mk.mk(
Span(dismiss_circle16_regular, cls="mf-tab-close-btn"),
cls=f"mf-tab-button {'mf-tab-active' if is_active else ''}",
command=self.commands.close_tab(tab_id)
)
tab_label = mk.mk(
Span(tab_data.get("label", "Untitled"), cls="mf-tab-label"),
command=self.commands.show_tab(tab_id)
)
extra_cls = "mf-tab-in-dropdown" if in_dropdown else ""
return Div(
tab_label,
close_btn,
cls=f"mf-tab-button {extra_cls} {'mf-tab-active' if is_active else ''}",
data_tab_id=tab_id,
data_manager_id=self._id
)
def _mk_tabs_header(self):
"""
Create the tabs header containing all tab buttons.
Returns:
Div element containing all tab buttons
"""
tab_buttons = [
self._mk_tab_button(tab_id, self._state.tabs[tab_id])
for tab_id in self._state.tabs_order
if tab_id in self._state.tabs
]
return Div(
*tab_buttons,
cls="mf-tabs-header",
id=f"{self._id}-header"
)
def _mk_tab_content(self):
def _mk_tab_content_wrapper(self):
"""
Create the active tab content area.
Returns:
Div element containing the active tab content or empty container
"""
content = None
if self._state.active_tab and self._state.active_tab in self._state._tabs_content:
component = self._state._tabs_content[self._state.active_tab]
content = component
if self._state.active_tab:
active_tab = self._state.active_tab
if active_tab in self._state._tabs_content:
tab_content = self._state._tabs_content[active_tab]
else:
content = self._get_tab_content(active_tab)
tab_content = self._mk_tab_content(active_tab, content)
self._state._tabs_content[active_tab] = tab_content
else:
tab_content = self._mk_tab_content(None, None)
return Div(
content if content else Div("No active tab", cls="mf-empty-content"),
cls="mf-tab-content",
id=f"{self._id}-content"
tab_content,
cls="mf-tab-content-wrapper",
id=f"{self._id}-content-wrapper",
)
def _mk_tab_content(self, tab_id: str, content):
is_active = tab_id == self._state.active_tab
return Div(
content if content else Div("No Content", cls="mf-empty-content"),
cls=f"mf-tab-content {'hidden' if not is_active else ''}", # ← ici
id=f"{self._id}-{tab_id}-content",
)
def _mk_show_tabs_menu(self):
return Div(
mk.icon(tabs24_regular,
size="32",
tabindex="0",
role="button",
cls="btn btn-xs"),
Div(
self._search,
tabindex="-1",
cls="dropdown-content menu w-52 rounded-box bg-base-300 shadow-xl"
),
cls="dropdown dropdown-end"
)
def _wrap_tab_content(self, tab_content):
return Div(
tab_content,
hx_swap_oob=f"beforeend:#{self._id}-content-wrapper",
)
def _tab_already_exists(self, label, component):
if not isinstance(component, BaseInstance):
return None
component_type = component.get_prefix() if isinstance(component, BaseInstance) else type(component).__name__
component_id = component.get_id()
if component_id is not None:
for tab_id, tab_data in self._state.tabs.items():
if (tab_data.get('component_type') == component_type and
tab_data.get('component_id') == component_id and
tab_data.get('label') == label):
return tab_id
return None
def _get_tab_list(self):
return [self._state.tabs[tab_id] for tab_id in self._state.tabs_order if tab_id in self._state.tabs]
def update_boundaries(self):
return Script(f"updateBoundaries('{self._id}');")
def render(self):
"""
Render the complete TabsManager component.
Returns:
Div element containing tabs header and content area
Div element containing tabs header, content area, and resize script
"""
return Div(
self._mk_tabs_header(),
self._mk_tab_content(),
self._mk_tabs_controller(),
self._mk_tabs_header_wrapper(),
self._mk_tab_content_wrapper(),
cls="mf-tabs-manager",
id=self._id
id=self._id,
)
def __ft__(self):

View File

@@ -35,8 +35,8 @@ class Commands(BaseCommands):
class UserProfile(SingleInstance):
def __init__(self, session):
super().__init__(session, Ids.UserProfile)
def __init__(self, session, parent=None):
super().__init__(session, Ids.UserProfile, parent)
self._state = UserProfileState(self)
self._commands = Commands(self)

View File

@@ -0,0 +1,95 @@
import json
import logging
from fasthtml.components import Script, Div
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.instances import MultipleInstance
logger = logging.getLogger("VisNetwork")
class VisNetworkState(DbObject):
def __init__(self, owner):
super().__init__(owner.get_session(), owner.get_id())
with self.initializing():
# persisted in DB
self.nodes: list = []
self.edges: list = []
self.options: dict = {
"autoResize": True,
"interaction": {
"dragNodes": True,
"zoomView": True,
"dragView": True,
},
"physics": {"enabled": True}
}
class VisNetwork(MultipleInstance):
def __init__(self, parent, _id=None, nodes=None, edges=None, options=None):
super().__init__(Ids.VisNetwork, parent, _id=_id)
logger.debug(f"VisNetwork created with id: {self._id}")
self._state = VisNetworkState(self)
self._update_state(nodes, edges, options)
def _update_state(self, nodes, edges, options):
logger.debug(f"Updating VisNetwork state with {nodes=}, {edges=}, {options=}")
if not nodes and not edges and not options:
return
state = self._state.copy()
if nodes is not None:
state.nodes = nodes
if edges is not None:
state.edges = edges
if options is not None:
state.options = options
self._state.update(state)
def render(self):
# Serialize nodes and edges to JSON
# This preserves all properties (color, shape, size, etc.) that are present
js_nodes = ",\n ".join(
json.dumps(node) for node in self._state.nodes
)
js_edges = ",\n ".join(
json.dumps(edge) for edge in self._state.edges
)
# Convert Python options to JS
js_options = json.dumps(self._state.options, indent=2)
return (
Div(
id=self._id,
cls="mf-vis",
),
# The script initializing Vis.js
Script(f"""
(function() {{
const container = document.getElementById("{self._id}");
const nodes = new vis.DataSet([
{js_nodes}
]);
const edges = new vis.DataSet([
{js_edges}
]);
const data = {{
nodes: nodes,
edges: edges
}};
const options = {js_options};
const network = new vis.Network(container, data, options);
}})();
""")
)
def __ft__(self):
return self.render()

View File

@@ -8,10 +8,15 @@ from myfasthtml.core.utils import merge_classes
class Ids:
# Please keep the alphabetical order
AuthProxy = "mf-auth-proxy"
Boundaries = "mf-boundaries"
DbManager = "mf-dbmanager"
InstancesDebugger = "mf-instances-debugger"
Layout = "mf-layout"
Root = "mf-root"
Search = "mf-search"
TabsManager = "mf-tabs-manager"
UserProfile = "mf-user-profile"
VisNetwork = "mf-vis-network"
class mk:

View File

@@ -1,11 +1,11 @@
from myfasthtml.auth.utils import login_user, save_user_info, register_user
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import special_session, UniqueInstance
from myfasthtml.core.instances import UniqueInstance, RootInstance
class AuthProxy(UniqueInstance):
def __init__(self, base_url: str = None):
super().__init__(special_session, Ids.AuthProxy)
super().__init__(Ids.AuthProxy, RootInstance)
self._base_url = base_url
def login_user(self, email: str, password: str):

View File

@@ -44,7 +44,8 @@ class BaseCommand:
def execute(self, client_response: dict = None):
raise NotImplementedError
def htmx(self, target="this", swap="innerHTML"):
def htmx(self, target="this", swap="outerHTML", trigger=None):
# Note that the default value is the same than in get_htmx_params()
if target is None:
self._htmx_extra["hx-swap"] = "none"
elif target != "this":
@@ -52,8 +53,12 @@ class BaseCommand:
if swap is None:
self._htmx_extra["hx-swap"] = "none"
elif swap != "innerHTML":
elif swap != "outerHTML":
self._htmx_extra["hx-swap"] = swap
if trigger is not None:
self._htmx_extra["hx-trigger"] = trigger
return self
def bind_ft(self, ft):
@@ -86,6 +91,10 @@ class BaseCommand:
return self
@property
def url(self):
return f"{ROUTE_ROOT}{Routes.Commands}?c_id={self.id}"
def __str__(self):
return f"Command({self.name})"
@@ -116,6 +125,19 @@ class Command(BaseCommand):
self.args = args
self.kwargs = kwargs
def _convert(self, key, value):
if key in self.callback_parameters:
param = self.callback_parameters[key]
if param.annotation == bool:
return value == "true"
elif param.annotation == int:
return int(value)
elif param.annotation == float:
return float(value)
elif param.annotation == list:
return value.split(",")
return value
def execute(self, client_response: dict = None):
ret_from_bindings = []
@@ -129,7 +151,7 @@ class Command(BaseCommand):
if client_response:
for k, v in client_response.items():
if k in self.callback_parameters:
new_kwargs[k] = v
new_kwargs[k] = self._convert(k, v)
if 'client_response' in self.callback_parameters:
new_kwargs['client_response'] = client_response
@@ -141,8 +163,8 @@ class Command(BaseCommand):
# Set the hx-swap-oob attribute on all elements returned by the callback
if isinstance(ret, (list, tuple)):
for r in ret[1:]:
if hasattr(r, 'attrs'):
r.attrs["hx-swap-oob"] = "true"
if hasattr(r, 'attrs') and r.get("id", None) is not None:
r.attrs["hx-swap-oob"] = r.attrs.get("hx-swap-oob", "true")
if not ret_from_bindings:
return ret

View File

@@ -9,8 +9,8 @@ from myfasthtml.core.utils import retrieve_user_info
class DbManager(SingleInstance):
def __init__(self, session, root=".myFastHtmlDb", auto_register: bool = True):
super().__init__(session, Ids.DbManager, auto_register=auto_register)
def __init__(self, session, parent=None, root=".myFastHtmlDb", auto_register: bool = True):
super().__init__(session, Ids.DbManager, parent, auto_register=auto_register)
self.db = DbEngine(root=root)
def save(self, entry, obj):

View File

@@ -1,4 +1,5 @@
import uuid
from typing import Self
from myfasthtml.controls.helpers import Ids
@@ -17,10 +18,11 @@ class BaseInstance:
Base class for all instances (manageable by InstancesManager)
"""
def __init__(self, session: dict, prefix: str, _id: str, auto_register: bool = True):
def __init__(self, session: dict, prefix: str, _id: str, parent: Self, auto_register: bool = True):
self._session = session
self._id = _id
self._prefix = prefix
self._parent = parent
if auto_register:
InstancesManager.register(session, self)
@@ -32,15 +34,17 @@ class BaseInstance:
def get_prefix(self):
return self._prefix
def get_parent(self):
return self._parent
class SingleInstance(BaseInstance):
"""
Base class for instances that can only have one instance at a time.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True):
super().__init__(session, prefix, prefix, auto_register)
def __init__(self, session: dict, prefix: str, parent, auto_register: bool = True):
super().__init__(session, prefix, prefix, parent, auto_register)
class UniqueInstance(BaseInstance):
@@ -49,8 +53,8 @@ class UniqueInstance(BaseInstance):
Does not throw exception if the instance already exists, it simply overwrites it.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True):
super().__init__(session, prefix, prefix, auto_register)
def __init__(self, prefix: str, parent: BaseInstance, auto_register: bool = True):
super().__init__(parent.get_session(), prefix, prefix, parent, auto_register)
self._prefix = prefix
@@ -59,8 +63,8 @@ class MultipleInstance(BaseInstance):
Base class for instances that can have multiple instances at a time.
"""
def __init__(self, session: dict, prefix: str, auto_register: bool = True, _id=None):
super().__init__(session, prefix, f"{prefix}-{_id or str(uuid.uuid4())}", auto_register)
def __init__(self, prefix: str, parent: BaseInstance, auto_register: bool = True, _id=None):
super().__init__(parent.get_session(), prefix, _id or f"{prefix}-{str(uuid.uuid4())}", parent, auto_register)
self._prefix = prefix
@@ -84,12 +88,13 @@ class InstancesManager:
return instance
@staticmethod
def get(session: dict, instance_id: str, instance_type: type = None, *args, **kwargs):
def get(session: dict, instance_id: str, instance_type: type = None, parent: BaseInstance = None, *args, **kwargs):
"""
Get or create an instance of the given type (from its id)
:param session:
:param instance_id:
:param instance_type:
:param parent:
:param args:
:param kwargs:
:return:
@@ -100,7 +105,9 @@ class InstancesManager:
return InstancesManager.instances[key]
except KeyError:
if instance_type:
return instance_type(session, *args, **kwargs) # it will be automatically registered
if not issubclass(instance_type, SingleInstance):
assert parent is not None, "Parent instance must be provided if not SingleInstance"
return instance_type(session, parent=parent, *args, **kwargs) # it will be automatically registered
else:
raise
@@ -119,3 +126,6 @@ class InstancesManager:
@staticmethod
def reset():
return InstancesManager.instances.clear()
RootInstance = SingleInstance(special_session, Ids.Root, None)

View File

@@ -0,0 +1,12 @@
from myfasthtml.controls.VisNetwork import VisNetwork
from myfasthtml.controls.helpers import Ids
from myfasthtml.core.instances import BaseInstance
class InstancesHelper:
@staticmethod
def dynamic_get(parent: BaseInstance, component_type: str, instance_id: str):
if component_type == Ids.VisNetwork:
return VisNetwork(parent, _id=instance_id)
return None

View File

@@ -0,0 +1,85 @@
from difflib import SequenceMatcher
from typing import Any
def _is_subsequence(query: str, target: str) -> tuple[bool, float]:
"""
Determines if a query string is a subsequence of a target string and calculates
a score based on the compactness of the match. The match is case-insensitive.
The function iterates through each character of the query and checks if it
exists in the target string while maintaining the order. If all characters of
the query are found in order, it calculates a score based on the smallest
window in the target that contains all the matched characters.
:param query: The query string to check as a subsequence.
:param target: The target string in which to find the subsequence.
:return: A tuple where the first value is a boolean indicating if a valid
subsequence exists, and the second value is a float representing the
compactness score of the match.
:rtype: tuple[bool, float]
"""
query = query.lower()
target = target.lower()
positions = []
idx = 0
for char in query:
idx = target.find(char, idx)
if idx == -1:
return False, 0.0
positions.append(idx)
idx += 1
# Smallest window containing all matched chars
window_size = positions[-1] - positions[0] + 1
# Score: ratio of query length vs window size (compactness)
score = len(query) / window_size
return True, score
def fuzzy_matching(query: str, choices: list[Any], similarity_threshold: float = 0.7, get_attr=None):
"""
Perform fuzzy matching on a list of items to find the items that are similar
to the given query based on a similarity threshold.
:param query: The search query to be matched, provided as a string.
:param choices: A list of strings representing the items to be compared against the query.
:param similarity_threshold: A float value representing the minimum similarity score
(between 0 and 1) an item needs to achieve to be considered a match. Defaults to 0.7.
:param get_attr: When choice is a object, give the property to use
:return: A list of strings containing the items from the input list that meet or exceed
the similarity threshold, sorted in descending order of similarity.
"""
get_attr = get_attr or (lambda x: x)
matches = []
for file_doc in choices:
# Calculate similarity between search term and filename
similarity = SequenceMatcher(None, query.lower(), get_attr(file_doc).lower()).ratio()
if similarity >= similarity_threshold:
matches.append((file_doc, similarity))
# Sort by similarity score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]
def subsequence_matching(query: str, choices: list[Any], get_attr=None):
get_attr = get_attr or (lambda x: x)
matches = []
for item in choices:
matched, score = _is_subsequence(query, get_attr(item))
if matched:
matches.append((item, score))
# Sort by score (highest first)
matches.sort(key=lambda x: x[1], reverse=True)
# Return only the FileDocument objects
return [match[0] for match in matches]

View File

@@ -0,0 +1,238 @@
from collections.abc import Callable
def from_nested_dict(trees: list[dict]) -> tuple[list, list]:
"""
Convert a list of nested dictionaries to vis.js nodes and edges format.
Args:
trees: List of nested dictionaries where keys are node names and values are
dictionaries of children (e.g., [{"root1": {"child1": {}}}, {"root2": {}}])
Returns:
tuple: (nodes, edges) where:
- nodes: list of dicts with auto-incremented numeric IDs
- edges: list of dicts with 'from' and 'to' keys
Example:
>>> trees = [{"root1": {"child1": {}}}, {"root2": {"child2": {}}}]
>>> nodes, edges = from_nested_dict(trees)
>>> # nodes = [{"id": 1, "label": "root1"}, {"id": 2, "label": "child1"}, ...]
"""
nodes = []
edges = []
node_id_counter = 1
node_id_map = {} # maps node_label -> node_id
def traverse(subtree: dict, parent_id: int | None = None):
nonlocal node_id_counter
for node_label, children in subtree.items():
# Create node with auto-incremented ID
current_id = node_id_counter
node_id_map[node_label] = current_id
nodes.append({
"id": current_id,
"label": node_label
})
node_id_counter += 1
# Create edge from parent to current node
if parent_id is not None:
edges.append({
"from": parent_id,
"to": current_id
})
# Recursively process children
if children:
traverse(children, parent_id=current_id)
# Process each tree in the list
for tree in trees:
traverse(tree)
return nodes, edges
def from_tree_with_metadata(
trees: list[dict],
id_getter: Callable = None,
label_getter: Callable = None,
children_getter: Callable = None
) -> tuple[list, list]:
"""
Convert a list of trees with metadata to vis.js nodes and edges format.
Args:
trees: List of dictionaries with 'id', 'label', and 'children' keys
(e.g., [{"id": "root1", "label": "Root 1", "children": [...]}, ...])
id_getter: Optional callback to extract node ID from dict
Default: lambda n: n.get("id")
label_getter: Optional callback to extract node label from dict
Default: lambda n: n.get("label", "")
children_getter: Optional callback to extract children list from dict
Default: lambda n: n.get("children", [])
Returns:
tuple: (nodes, edges) where:
- nodes: list of dicts with IDs from tree or auto-incremented
- edges: list of dicts with 'from' and 'to' keys
Example:
>>> trees = [
... {
... "id": "root1",
... "label": "Root Node 1",
... "children": [
... {"id": "child1", "label": "Child 1", "children": []}
... ]
... },
... {"id": "root2", "label": "Root Node 2"}
... ]
>>> nodes, edges = from_tree_with_metadata(trees)
"""
# Default getters
if id_getter is None:
id_getter = lambda n: n.get("id")
if label_getter is None:
label_getter = lambda n: n.get("label", "")
if children_getter is None:
children_getter = lambda n: n.get("children", [])
nodes = []
edges = []
node_id_counter = 1
def traverse(node_dict: dict, parent_id: int | str | None = None):
nonlocal node_id_counter
# Extract ID (use provided or auto-increment)
node_id = id_getter(node_dict)
if node_id is None:
node_id = node_id_counter
node_id_counter += 1
# Extract label
node_label = label_getter(node_dict)
# Create node
nodes.append({
"id": node_id,
"label": node_label
})
# Create edge from parent to current node
if parent_id is not None:
edges.append({
"from": parent_id,
"to": node_id
})
# Recursively process children
children = children_getter(node_dict)
if children:
for child in children:
traverse(child, parent_id=node_id)
# Process each tree in the list
for tree in trees:
traverse(tree)
return nodes, edges
def from_parent_child_list(
items: list,
id_getter: callable = None,
label_getter: callable = None,
parent_getter: callable = None,
ghost_color: str = "#ff9999"
) -> tuple[list, list]:
"""
Convert a list of items with parent references to vis.js nodes and edges format.
Args:
items: List of items (dicts or objects) with parent references
(e.g., [{"id": "child", "parent": "root", "label": "Child"}, ...])
id_getter: Optional callback to extract node ID from item
Default: lambda item: item.get("id")
label_getter: Optional callback to extract node label from item
Default: lambda item: item.get("label", "")
parent_getter: Optional callback to extract parent ID from item
Default: lambda item: item.get("parent")
ghost_color: Color to use for ghost nodes (nodes referenced as parents but not in list)
Default: "#ff9999" (light red)
Returns:
tuple: (nodes, edges) where:
- nodes: list of dicts with IDs from items, ghost nodes have color property
- edges: list of dicts with 'from' and 'to' keys
Note:
- Nodes with parent=None or parent="" are treated as root nodes
- If a parent is referenced but doesn't exist in items, a ghost node is created
with the ghost_color applied
Example:
>>> items = [
... {"id": "root", "label": "Root"},
... {"id": "child1", "parent": "root", "label": "Child 1"},
... {"id": "child2", "parent": "unknown", "label": "Child 2"}
... ]
>>> nodes, edges = from_parent_child_list(items)
>>> # "unknown" will be created as a ghost node with color="#ff9999"
"""
# Default getters
if id_getter is None:
id_getter = lambda item: item.get("id")
if label_getter is None:
label_getter = lambda item: item.get("label", "")
if parent_getter is None:
parent_getter = lambda item: item.get("parent")
nodes = []
edges = []
# Track all existing node IDs
existing_ids = set()
# First pass: create nodes for all items
for item in items:
node_id = id_getter(item)
node_label = label_getter(item)
existing_ids.add(node_id)
nodes.append({
"id": node_id,
"label": node_label
})
# Track ghost nodes to avoid duplicates
ghost_nodes = set()
# Second pass: create edges and identify ghost nodes
for item in items:
node_id = id_getter(item)
parent_id = parent_getter(item)
# Skip if no parent or parent is empty string or None
if parent_id is None or parent_id == "":
continue
# Create edge from parent to child
edges.append({
"from": parent_id,
"to": node_id
})
# Check if parent exists, if not create ghost node
if parent_id not in existing_ids and parent_id not in ghost_nodes:
ghost_nodes.add(parent_id)
nodes.append({
"id": parent_id,
"label": str(parent_id), # Use ID as label for ghost nodes
"color": ghost_color
})
return nodes, edges

View File

@@ -223,116 +223,15 @@ def debug_session(session):
return session.get("user_info", {}).get("email", "** UNKNOWN USER **")
import inspect
from typing import Optional
def build_args_kwargs(
parameters,
values,
default_args: Optional[list] = None,
default_kwargs: Optional[dict] = None
):
"""
Build (args, kwargs) from a sequence or dict of inspect.Parameter and a dict of values.
- POSITIONAL_ONLY and POSITIONAL_OR_KEYWORD fill `args`
- KEYWORD_ONLY fill `kwargs`
- VAR_POSITIONAL (*args) accepts list/tuple values
- VAR_KEYWORD (**kwargs) accepts dict values
- If not found, fallback to default_args / default_kwargs when provided
- Raises ValueError for missing required or unknown parameters
"""
if not isinstance(parameters, dict):
parameters = {p.name: p for p in parameters}
ordered_params = list(parameters.values())
has_var_positional = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in ordered_params)
has_var_keyword = any(p.kind == inspect.Parameter.VAR_KEYWORD for p in ordered_params)
args = []
kwargs = {}
consumed_names = set()
default_args = default_args or []
default_kwargs = default_kwargs or {}
# 1 Handle positional parameters
positional_params = [
p for p in ordered_params
if p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
]
for i, p in enumerate(positional_params):
if p.name in values:
args.append(values[p.name])
consumed_names.add(p.name)
elif i < len(default_args):
args.append(default_args[i])
elif p.default is not inspect.Parameter.empty:
args.append(p.default)
else:
raise ValueError(f"Missing required positional argument: {p.name}")
# 2 Handle *args
for p in ordered_params:
if p.kind == inspect.Parameter.VAR_POSITIONAL:
if p.name in values:
val = values[p.name]
if not isinstance(val, (list, tuple)):
raise ValueError(f"*{p.name} must be a list or tuple, got {type(val).__name__}")
args.extend(val)
consumed_names.add(p.name)
elif len(default_args) > len(positional_params):
# Add any remaining default_args beyond fixed positionals
args.extend(default_args[len(positional_params):])
# 3 Handle keyword-only parameters
for p in ordered_params:
if p.kind == inspect.Parameter.KEYWORD_ONLY:
if p.name in values:
kwargs[p.name] = values[p.name]
consumed_names.add(p.name)
elif p.name in default_kwargs:
kwargs[p.name] = default_kwargs[p.name]
elif p.default is not inspect.Parameter.empty:
kwargs[p.name] = p.default
else:
raise ValueError(f"Missing required keyword-only argument: {p.name}")
# 4 Handle **kwargs
for p in ordered_params:
if p.kind == inspect.Parameter.VAR_KEYWORD:
if p.name in values:
val = values[p.name]
if not isinstance(val, dict):
raise ValueError(f"**{p.name} must be a dict, got {type(val).__name__}")
kwargs.update(val)
consumed_names.add(p.name)
# Merge any unmatched names if **kwargs exists
remaining = {
k: v for k, v in values.items()
if k not in consumed_names and k not in parameters
}
# Also merge default_kwargs not used yet
for k, v in default_kwargs.items():
if k not in kwargs:
kwargs[k] = v
kwargs.update(remaining)
break
# 5 Handle unknown / unexpected parameters (if no **kwargs)
if not has_var_keyword:
unexpected = [k for k in values if k not in consumed_names and k in parameters]
if unexpected:
raise ValueError(f"Unexpected parameters: {unexpected}")
extra = [k for k in values if k not in consumed_names and k not in parameters]
if extra:
raise ValueError(f"Unknown parameters: {extra}")
return args, kwargs
def get_id(obj):
if isinstance(obj, str):
return obj
elif hasattr(obj, "id"):
return obj.id
elif hasattr(obj, "get_id"):
return obj.get_id()
else:
return str(obj)
@utils_rt(Routes.Commands)

View File

@@ -385,7 +385,7 @@ down_to_bottom = NotStr('''<svg name="carbon-DownToBottom" xmlns="http://www.w3.
basketball = NotStr('''<svg name="carbon-Basketball" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"><path d="M16 2a14 14 0 1 0 14 14A14.016 14.016 0 0 0 16 2zm11.95 13H22.04a14.409 14.409 0 0 1 2.738-7.153A11.94 11.94 0 0 1 27.95 15zM17 15V4.05a11.918 11.918 0 0 1 6.287 2.438A16.265 16.265 0 0 0 20.04 15zm-2 0h-3.04a16.265 16.265 0 0 0-3.247-8.512A11.918 11.918 0 0 1 15 4.051zm0 2v10.95a11.918 11.918 0 0 1-6.287-2.438A16.265 16.265 0 0 0 11.96 17zm2 0h3.04a16.265 16.265 0 0 0 3.248 8.512A11.918 11.918 0 0 1 17 27.949zM7.22 7.847A14.409 14.409 0 0 1 9.96 15H4.051a11.94 11.94 0 0 1 3.17-7.153zM4.05 17H9.96a14.409 14.409 0 0 1-2.738 7.153A11.94 11.94 0 0 1 4.05 17zm20.73 7.153A14.409 14.409 0 0 1 22.04 17h5.908a11.94 11.94 0 0 1-3.17 7.153z" fill="currentColor" /></svg>''')
thunderstorm_scattered_night = NotStr('''<svg name="carbon-ThunderstormScatteredNight" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M13.338 30l-1.736-1l2.287-4H10l3.993-7l1.737 1l-2.284 4h3.891l-3.999 7z" fill="currentColor" /> <path d="M29.844 13.035a1.52 1.52 0 0 0-1.231-.866a5.356 5.356 0 0 1-3.41-1.716A6.465 6.465 0 0 1 23.92 4.06a1.604 1.604 0 0 0-.3-1.546a1.455 1.455 0 0 0-1.36-.492l-.019.004a7.854 7.854 0 0 0-6.105 6.48A7.372 7.372 0 0 0 13.5 8a7.551 7.551 0 0 0-7.15 5.244A5.993 5.993 0 0 0 8 25v-2a3.993 3.993 0 0 1-.673-7.93l.663-.112l.145-.656a5.496 5.496 0 0 1 10.73 0l.145.656l.663.113A3.993 3.993 0 0 1 19 23v2a5.955 5.955 0 0 0 5.88-7.146a7.502 7.502 0 0 0 4.867-3.3a1.537 1.537 0 0 0 .097-1.52zm-5.693 2.918a5.966 5.966 0 0 0-3.502-2.709a7.508 7.508 0 0 0-2.62-3.694a6.008 6.008 0 0 1 3.77-5.333a8.458 8.458 0 0 0 1.939 7.596a7.404 7.404 0 0 0 3.902 2.228a5.442 5.442 0 0 1-3.489 1.912z" fill="currentColor" /></svg>''')
letter_uu = NotStr('''<svg name="carbon-LetterUu" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M23 23h-4a2 2 0 0 1-2-2v-8h2v8h4v-8h2v8a2 2 0 0 1-2 2z" fill="currentColor" /> <path d="M13 23H9a2 2 0 0 1-2-2V9h2v12h4V9h2v12a2 2 0 0 1-2 2z" fill="currentColor" /></svg>''')
continue = NotStr('''<svg name="carbon-Continue" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M10 28a1 1 0 0 1-1-1V5a1 1 0 0 1 1.501-.865l19 11a1 1 0 0 1 0 1.73l-19 11A.998.998 0 0 1 10 28zm1-21.266v18.532L27 16z" fill="currentColor" /> <path d="M4 4h2v24H4z" fill="currentColor" /></svg>''')
continue_ = NotStr('''<svg name="carbon-Continue" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M10 28a1 1 0 0 1-1-1V5a1 1 0 0 1 1.501-.865l19 11a1 1 0 0 1 0 1.73l-19 11A.998.998 0 0 1 10 28zm1-21.266v18.532L27 16z" fill="currentColor" /> <path d="M4 4h2v24H4z" fill="currentColor" /></svg>''')
login = NotStr('''<svg name="carbon-Login" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M26 30H14a2 2 0 0 1-2-2v-3h2v3h12V4H14v3h-2V4a2 2 0 0 1 2-2h12a2 2 0 0 1 2 2v24a2 2 0 0 1-2 2z" fill="currentColor" /> <path d="M14.59 20.59L18.17 17H4v-2h14.17l-3.58-3.59L16 10l6 6l-6 6l-1.41-1.41z" fill="currentColor" /></svg>''')
favorite = NotStr('''<svg name="carbon-Favorite" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"><path d="M22.45 6a5.47 5.47 0 0 1 3.91 1.64a5.7 5.7 0 0 1 0 8L16 26.13L5.64 15.64a5.7 5.7 0 0 1 0-8a5.48 5.48 0 0 1 7.82 0l2.54 2.6l2.53-2.58A5.44 5.44 0 0 1 22.45 6m0-2a7.47 7.47 0 0 0-5.34 2.24L16 7.36l-1.11-1.12a7.49 7.49 0 0 0-10.68 0a7.72 7.72 0 0 0 0 10.82L16 29l11.79-11.94a7.72 7.72 0 0 0 0-10.82A7.49 7.49 0 0 0 22.45 4z" fill="currentColor" /></svg>''')
cd_archive = NotStr('''<svg name="carbon-CdArchive" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 32 32"> <path d="M16 28a12 12 0 1 1 12-12a12 12 0 0 1-12 12zm0-22a10 10 0 1 0 10 10A10 10 0 0 0 16 6z" fill="currentColor" /> <path d="M16 22a6 6 0 1 1 6-6a6 6 0 0 1-6 6zm0-10a4 4 0 1 0 4 4a4 4 0 0 0-4-4z" fill="currentColor" /> <circle cx="16" cy="16" r="2" fill="currentColor" /></svg>''')

View File

@@ -31,6 +31,7 @@ def get_asset_content(filename):
def create_app(daisyui: Optional[bool] = True,
vis: Optional[bool] = True,
protect_routes: Optional[bool] = True,
mount_auth_app: Optional[bool] = False,
base_url: Optional[str] = None,
@@ -63,6 +64,11 @@ def create_app(daisyui: Optional[bool] = True,
Script(src="/myfasthtml/tailwindcss-browser@4.js"),
]
if vis:
hdrs += [
Script(src="/myfasthtml/vis-network.min.js"),
]
beforeware = create_auth_beforeware() if protect_routes else None
app, rt = fasthtml.fastapp.fast_app(before=beforeware, hdrs=tuple(hdrs), **kwargs)
@@ -96,7 +102,7 @@ def create_app(daisyui: Optional[bool] = True,
if mount_auth_app:
# Setup authentication routes
setup_auth_routes(app, rt, base_url=base_url)
# create the AuthProxy instance
AuthProxy(base_url) # using the auto register mechanism to expose it

View File

@@ -1,5 +1,7 @@
import pytest
from myfasthtml.core.instances import SingleInstance
@pytest.fixture(scope="session")
def session():
@@ -14,3 +16,8 @@ def session():
'updated_at': '2025-11-10T15:52:59.006213'
}
}
@pytest.fixture(scope="session")
def root_instance(session):
return SingleInstance(session, "TestRoot", None)

View File

@@ -1,5 +1,6 @@
import pytest
from fasthtml.components import *
from fasthtml.xtend import Script
from myfasthtml.controls.TabsManager import TabsManager
from myfasthtml.core.instances import InstancesManager
@@ -8,8 +9,8 @@ from .conftest import session
@pytest.fixture()
def tabs_manager(session):
yield TabsManager(session)
def tabs_manager(root_instance):
yield TabsManager(root_instance)
InstancesManager.reset()
@@ -20,11 +21,12 @@ class TestTabsManagerBehaviour:
assert from_instance_manager == tabs_manager
def test_i_can_add_tab(self, tabs_manager):
tab_id = tabs_manager.add_tab("Users", Div("Content 1"))
tab_id = tabs_manager.add_tab("Tab1", Div("Content 1"))
assert tab_id is not None
assert tab_id in tabs_manager.get_state().tabs
assert tabs_manager.get_state().tabs[tab_id]["label"] == "Users"
assert tabs_manager.get_state().tabs[tab_id]["label"] == "Tab1"
assert tabs_manager.get_state().tabs[tab_id]["id"] == tab_id
assert tabs_manager.get_state().tabs[tab_id]["component_type"] is None # Div is not BaseInstance
assert tabs_manager.get_state().tabs[tab_id]["component_id"] is None # Div is not BaseInstance
assert tabs_manager.get_state().tabs_order == [tab_id]
@@ -37,34 +39,57 @@ class TestTabsManagerBehaviour:
assert len(tabs_manager.get_state().tabs) == 2
assert tabs_manager.get_state().tabs_order == [tab_id1, tab_id2]
assert tabs_manager.get_state().active_tab == tab_id2
def test_i_can_show_tab(self, tabs_manager):
tab_id1 = tabs_manager.add_tab("Tab1", Div("Content 1"))
tab_id2 = tabs_manager.add_tab("Tab2", Div("Content 2"))
assert tabs_manager.get_state().active_tab == tab_id2 # last crated tab is active
tabs_manager.show_tab(tab_id1)
assert tabs_manager.get_state().active_tab == tab_id1
def test_i_can_close_tab(self, tabs_manager):
tab_id1 = tabs_manager.add_tab("Tab1", Div("Content 1"))
tab_id2 = tabs_manager.add_tab("Tab2", Div("Content 2"))
tab_id3 = tabs_manager.add_tab("Tab3", Div("Content 3"))
tabs_manager.close_tab(tab_id2)
assert len(tabs_manager.get_state().tabs) == 2
assert [tab_id for tab_id in tabs_manager.get_state().tabs] == [tab_id1, tab_id3]
assert tabs_manager.get_state().tabs_order == [tab_id1, tab_id3]
assert tabs_manager.get_state().active_tab == tab_id3 # last tab stays active
def test_i_still_have_an_active_tab_after_close(self, tabs_manager):
tab_id1 = tabs_manager.add_tab("Tab1", Div("Content 1"))
tab_id2 = tabs_manager.add_tab("Tab2", Div("Content 2"))
tab_id3 = tabs_manager.add_tab("Tab3", Div("Content 3"))
tabs_manager.close_tab(tab_id3) # close the currently active tab
assert tabs_manager.get_state().active_tab == tab_id1 # default to the first tab
class TestTabsManagerRender:
def test_i_can_render_when_no_tabs(self, tabs_manager):
res = tabs_manager.render()
expected = Div(
Div(NoChildren(), id=f"{tabs_manager.get_id()}-header"),
Div(id=f"{tabs_manager.get_id()}-content"),
id=tabs_manager.get_id(),
)
assert matches(res, expected)
def test_i_can_render_when_one_tab(self, tabs_manager):
tabs_manager.add_tab("Users", Div("Content 1"))
res = tabs_manager.render()
expected = Div(
Div(
Button(),
id=f"{tabs_manager.get_id()}-header"
Div(id=f"{tabs_manager.get_id()}-controller"),
Script(f'updateTabs("{tabs_manager.get_id()}-controller");'),
),
Div(
Div("Content 1")
Div(NoChildren(), id=f"{tabs_manager.get_id()}-header"),
id=f"{tabs_manager.get_id()}-header-wrapper"
),
Div(
Div(id=f"{tabs_manager.get_id()}-None-content"),
id=f"{tabs_manager.get_id()}-content-wrapper"
),
id=tabs_manager.get_id(),
)
assert matches(res, expected)
def test_i_can_render_when_multiple_tabs(self, tabs_manager):
@@ -75,13 +100,22 @@ class TestTabsManagerRender:
expected = Div(
Div(
Button(),
Button(),
Button(),
id=f"{tabs_manager.get_id()}-header"
Div(id=f"{tabs_manager.get_id()}-controller"),
Script(f'updateTabs("{tabs_manager.get_id()}-controller");'),
),
Div(
Div("Content 3")
Div(
Div(), # tab_button
Div(), # tab_button
Div(), # tab_button
id=f"{tabs_manager.get_id()}-header"
),
id=f"{tabs_manager.get_id()}-header-wrapper"
),
Div(
Div("Content 3"), # active tab content
# Lasy loading for the other contents
id=f"{tabs_manager.get_id()}-content-wrapper"
),
id=tabs_manager.get_id(),
)

View File

@@ -104,8 +104,8 @@ class TestCommandBind:
assert matches(updated, expected)
@pytest.mark.parametrize("return_values", [
[Div(), Div(), "hello", Div()], # list
(Div(), Div(), "hello", Div()) # tuple
[Div(), Div(id="id1"), "hello", Div(id="id2")], # list
(Div(), Div(id="id1"), "hello", Div(id="id2")) # tuple
])
def test_swap_oob_is_automatically_set_when_multiple_elements_are_returned(self, return_values):
"""Test that hx-swap-oob is automatically set, but not for the first."""
@@ -157,3 +157,29 @@ class TestCommandExecute:
command = Command('test', 'Command description', callback_with_param)
command.execute(client_response={"number": "10"})
def test_swap_oob_is_added_when_multiple_elements_are_returned(self):
"""Test that hx-swap-oob is automatically set, but not for the first."""
def another_callback():
return Div(id="first"), Div(id="second"), "hello", Div(id="third")
command = Command('test', 'Command description', another_callback)
res = command.execute()
assert "hx-swap-oob" not in res[0].attrs
assert res[1].attrs["hx-swap-oob"] == "true"
assert res[3].attrs["hx-swap-oob"] == "true"
def test_swap_oob_is_not_added_when_there_no_id(self):
"""Test that hx-swap-oob is automatically set, but not for the first."""
def another_callback():
return Div(id="first"), Div(), "hello", Div()
command = Command('test', 'Command description', another_callback)
res = command.execute()
assert "hx-swap-oob" not in res[0].attrs
assert "hx-swap-oob" not in res[1].attrs
assert "hx-swap-oob" not in res[3].attrs

View File

@@ -0,0 +1,105 @@
from dataclasses import dataclass
from myfasthtml.core.matching_utils import fuzzy_matching, subsequence_matching
class TestFuzzyMatching:
def test_i_can_find_exact_match_with_fuzzy(self):
# Exact match should always pass
choices = ["hello"]
result = fuzzy_matching("hello", choices)
assert len(result) == 1
assert result[0] == "hello"
def test_i_can_find_close_match_with_fuzzy(self):
# "helo.txt" should match "hello.txt" with high similarity
choices = ["hello"]
result = fuzzy_matching("helo", choices, similarity_threshold=0.7)
assert len(result) == 1
assert result[0] == "hello"
def test_i_cannot_find_dissimilar_match_with_fuzzy(self):
# "world.txt" should not match "hello.txt"
choices = ["hello"]
result = fuzzy_matching("world", choices, similarity_threshold=0.7)
assert len(result) == 0
def test_i_can_sort_by_similarity_in_fuzzy(self):
# hello has a higher similarity than helo
choices = [
"hello",
"helo",
]
result = fuzzy_matching("hello", choices, similarity_threshold=0.7)
assert result == ["hello", "helo"]
def test_i_can_find_on_object(self):
@dataclass
class DummyObject:
value: str
id: str
choices = [
DummyObject("helo", "1"),
DummyObject("hello", "2"),
DummyObject("xyz", "3"),
]
result = fuzzy_matching("hello", choices, get_attr=lambda x: x.value)
assert len(result) == 2
assert result == [DummyObject("hello", "2"), DummyObject("helo", "1")]
class TestSubsequenceMatching:
def test_i_can_match_subsequence_simple(self):
# "abg" should match "AlphaBetaGamma"
choices = ["AlphaBetaGamma"]
result = subsequence_matching("abg", choices)
assert len(result) == 1
assert result[0] == "AlphaBetaGamma"
def test_i_can_match_subsequence_simple_case_insensitive(self):
# "abg" should match "alphabetagamma"
choices = ["alphabetagamma"]
result = subsequence_matching("abg", choices)
assert len(result) == 1
assert result[0] == "alphabetagamma"
def test_i_cannot_match_wrong_order_subsequence(self):
# the order is wrong
choices = ["AlphaBetaGamma"]
result = subsequence_matching("gba", choices)
assert len(result) == 0
def test_i_can_match_multiple_documents_subsequence(self):
# "abg" should match both filenames, but "AlphaBetaGamma" has a higher score
choices = [
"AlphaBetaGamma",
"HalleBerryIsGone",
]
result = subsequence_matching("abg", choices)
assert len(result) == 2
assert result[0] == "AlphaBetaGamma"
assert result[1] == "HalleBerryIsGone"
def test_i_cannot_match_unrelated_subsequence(self):
# "xyz" should not match any file
choices = ["AlphaBetaGamma"]
result = subsequence_matching("xyz", choices)
assert len(result) == 0
def test_i_can_match_on_object(self):
@dataclass
class DummyObject:
value: str
id: str
choices = [
DummyObject("HalleBerryIsGone", "1"),
DummyObject("AlphaBetaGamma", "2"),
DummyObject("xyz", "3"),
]
result = subsequence_matching("abg", choices, get_attr=lambda x: x.value)
assert len(result) == 2
assert result == [DummyObject("AlphaBetaGamma", "2"), DummyObject("HalleBerryIsGone", "1")]

View File

@@ -0,0 +1,515 @@
from myfasthtml.core.network_utils import from_nested_dict, from_tree_with_metadata, from_parent_child_list
class TestFromNestedDict:
def test_i_can_convert_single_root_node(self):
"""Test conversion of a single root node without children."""
trees = [{"root": {}}]
nodes, edges = from_nested_dict(trees)
assert len(nodes) == 1
assert nodes[0] == {"id": 1, "label": "root"}
assert len(edges) == 0
def test_i_can_convert_tree_with_one_level_children(self):
"""Test conversion with direct children, verifying edge creation."""
trees = [{"root": {"child1": {}, "child2": {}}}]
nodes, edges = from_nested_dict(trees)
assert len(nodes) == 3
assert nodes[0] == {"id": 1, "label": "root"}
assert nodes[1] == {"id": 2, "label": "child1"}
assert nodes[2] == {"id": 3, "label": "child2"}
assert len(edges) == 2
assert {"from": 1, "to": 2} in edges
assert {"from": 1, "to": 3} in edges
def test_i_can_convert_tree_with_multiple_levels(self):
"""Test recursive conversion with multiple levels of nesting."""
trees = [
{
"root": {
"child1": {
"grandchild1": {},
"grandchild2": {}
},
"child2": {}
}
}
]
nodes, edges = from_nested_dict(trees)
assert len(nodes) == 5
assert len(edges) == 4
# Verify hierarchy
assert {"from": 1, "to": 2} in edges # root -> child1
assert {"from": 1, "to": 5} in edges # root -> child2
assert {"from": 2, "to": 3} in edges # child1 -> grandchild1
assert {"from": 2, "to": 4} in edges # child1 -> grandchild2
def test_i_can_generate_auto_incremented_ids(self):
"""Test that IDs start at 1 and increment correctly."""
trees = [{"a": {"b": {"c": {}}}}]
nodes, edges = from_nested_dict(trees)
ids = [node["id"] for node in nodes]
assert ids == [1, 2, 3]
def test_i_can_use_dict_keys_as_labels(self):
"""Test that dictionary keys become node labels."""
trees = [{"RootNode": {"ChildNode": {}}}]
nodes, edges = from_nested_dict(trees)
assert nodes[0]["label"] == "RootNode"
assert nodes[1]["label"] == "ChildNode"
def test_i_can_convert_empty_list(self):
"""Test that empty list returns empty nodes and edges."""
trees = []
nodes, edges = from_nested_dict(trees)
assert nodes == []
assert edges == []
def test_i_can_convert_multiple_root_nodes(self):
"""Test conversion with multiple independent trees."""
trees = [
{"root1": {"child1": {}}},
{"root2": {"child2": {}}}
]
nodes, edges = from_nested_dict(trees)
assert len(nodes) == 4
assert nodes[0] == {"id": 1, "label": "root1"}
assert nodes[1] == {"id": 2, "label": "child1"}
assert nodes[2] == {"id": 3, "label": "root2"}
assert nodes[3] == {"id": 4, "label": "child2"}
# Verify edges connect within trees, not across
assert len(edges) == 2
assert {"from": 1, "to": 2} in edges
assert {"from": 3, "to": 4} in edges
def test_i_can_maintain_id_sequence_across_multiple_trees(self):
"""Test that ID counter continues across multiple trees."""
trees = [
{"tree1": {}},
{"tree2": {}},
{"tree3": {}}
]
nodes, edges = from_nested_dict(trees)
ids = [node["id"] for node in nodes]
assert ids == [1, 2, 3]
class TestFromTreeWithMetadata:
def test_i_can_convert_single_node_with_metadata(self):
"""Test basic conversion with explicit id and label."""
trees = [{"id": "root", "label": "Root Node"}]
nodes, edges = from_tree_with_metadata(trees)
assert len(nodes) == 1
assert nodes[0] == {"id": "root", "label": "Root Node"}
assert len(edges) == 0
def test_i_can_preserve_string_ids_from_metadata(self):
"""Test that string IDs from the tree are preserved."""
trees = [
{
"id": "root_id",
"label": "Root",
"children": [
{"id": "child_id", "label": "Child"}
]
}
]
nodes, edges = from_tree_with_metadata(trees)
assert nodes[0]["id"] == "root_id"
assert nodes[1]["id"] == "child_id"
assert edges[0] == {"from": "root_id", "to": "child_id"}
def test_i_can_auto_increment_when_id_is_missing(self):
"""Test fallback to auto-increment when ID is not provided."""
trees = [
{
"label": "Root",
"children": [
{"label": "Child1"},
{"id": "child2_id", "label": "Child2"}
]
}
]
nodes, edges = from_tree_with_metadata(trees)
assert nodes[0]["id"] == 1 # auto-incremented
assert nodes[1]["id"] == 2 # auto-incremented
assert nodes[2]["id"] == "child2_id" # preserved
def test_i_can_convert_tree_with_children(self):
"""Test handling of children list."""
trees = [
{
"id": "root",
"label": "Root",
"children": [
{
"id": "child1",
"label": "Child 1",
"children": [
{"id": "grandchild", "label": "Grandchild"}
]
},
{"id": "child2", "label": "Child 2"}
]
}
]
nodes, edges = from_tree_with_metadata(trees)
assert len(nodes) == 4
assert len(edges) == 3
assert {"from": "root", "to": "child1"} in edges
assert {"from": "root", "to": "child2"} in edges
assert {"from": "child1", "to": "grandchild"} in edges
def test_i_can_use_custom_id_getter(self):
"""Test custom callback for extracting node ID."""
trees = [
{
"node_id": "custom_root",
"label": "Root"
}
]
def custom_id_getter(node):
return node.get("node_id")
nodes, edges = from_tree_with_metadata(
trees,
id_getter=custom_id_getter
)
assert nodes[0]["id"] == "custom_root"
def test_i_can_use_custom_label_getter(self):
"""Test custom callback for extracting node label."""
trees = [
{
"id": "root",
"name": "Custom Label"
}
]
def custom_label_getter(node):
return node.get("name", "")
nodes, edges = from_tree_with_metadata(
trees,
label_getter=custom_label_getter
)
assert nodes[0]["label"] == "Custom Label"
def test_i_can_use_custom_children_getter(self):
"""Test custom callback for extracting children."""
trees = [
{
"id": "root",
"label": "Root",
"kids": [
{"id": "child", "label": "Child"}
]
}
]
def custom_children_getter(node):
return node.get("kids", [])
nodes, edges = from_tree_with_metadata(
trees,
children_getter=custom_children_getter
)
assert len(nodes) == 2
assert nodes[1]["id"] == "child"
def test_i_can_handle_missing_label_with_default(self):
"""Test that missing label returns empty string."""
trees = [{"id": "root"}]
nodes, edges = from_tree_with_metadata(trees)
assert nodes[0]["label"] == ""
def test_i_can_handle_missing_children_with_default(self):
"""Test that missing children returns empty list (no children processed)."""
trees = [{"id": "root", "label": "Root"}]
nodes, edges = from_tree_with_metadata(trees)
assert len(nodes) == 1
assert len(edges) == 0
def test_i_can_convert_multiple_root_trees(self):
"""Test conversion with multiple independent trees with metadata."""
trees = [
{
"id": "root1",
"label": "Root 1",
"children": [
{"id": "child1", "label": "Child 1"}
]
},
{
"id": "root2",
"label": "Root 2",
"children": [
{"id": "child2", "label": "Child 2"}
]
}
]
nodes, edges = from_tree_with_metadata(trees)
assert len(nodes) == 4
assert nodes[0]["id"] == "root1"
assert nodes[1]["id"] == "child1"
assert nodes[2]["id"] == "root2"
assert nodes[3]["id"] == "child2"
# Verify edges connect within trees, not across
assert len(edges) == 2
assert {"from": "root1", "to": "child1"} in edges
assert {"from": "root2", "to": "child2"} in edges
def test_i_can_maintain_id_counter_across_multiple_trees_when_missing_ids(self):
"""Test that auto-increment counter continues across multiple trees."""
trees = [
{"label": "Tree1"},
{"label": "Tree2"},
{"label": "Tree3"}
]
nodes, edges = from_tree_with_metadata(trees)
assert nodes[0]["id"] == 1
assert nodes[1]["id"] == 2
assert nodes[2]["id"] == 3
def test_i_can_convert_empty_list(self):
"""Test that empty list returns empty nodes and edges."""
trees = []
nodes, edges = from_tree_with_metadata(trees)
assert nodes == []
assert edges == []
class TestFromParentChildList:
def test_i_can_convert_single_root_node_without_parent(self):
"""Test conversion of a single root node without parent."""
items = [{"id": "root", "label": "Root"}]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 1
assert nodes[0] == {"id": "root", "label": "Root"}
assert len(edges) == 0
def test_i_can_convert_simple_parent_child_relationship(self):
"""Test conversion with basic parent-child relationship."""
items = [
{"id": "root", "label": "Root"},
{"id": "child", "parent": "root", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 2
assert {"id": "root", "label": "Root"} in nodes
assert {"id": "child", "label": "Child"} in nodes
assert len(edges) == 1
assert edges[0] == {"from": "root", "to": "child"}
def test_i_can_convert_multiple_children_with_same_parent(self):
"""Test that one parent can have multiple children."""
items = [
{"id": "root", "label": "Root"},
{"id": "child1", "parent": "root", "label": "Child 1"},
{"id": "child2", "parent": "root", "label": "Child 2"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 3
assert len(edges) == 2
assert {"from": "root", "to": "child1"} in edges
assert {"from": "root", "to": "child2"} in edges
def test_i_can_convert_multi_level_hierarchy(self):
"""Test conversion with multiple levels (root -> child -> grandchild)."""
items = [
{"id": "root", "label": "Root"},
{"id": "child", "parent": "root", "label": "Child"},
{"id": "grandchild", "parent": "child", "label": "Grandchild"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 3
assert len(edges) == 2
assert {"from": "root", "to": "child"} in edges
assert {"from": "child", "to": "grandchild"} in edges
def test_i_can_handle_parent_none_as_root(self):
"""Test that parent=None identifies a root node."""
items = [
{"id": "root", "parent": None, "label": "Root"},
{"id": "child", "parent": "root", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 2
assert len(edges) == 1
assert edges[0] == {"from": "root", "to": "child"}
def test_i_can_handle_parent_empty_string_as_root(self):
"""Test that parent='' identifies a root node."""
items = [
{"id": "root", "parent": "", "label": "Root"},
{"id": "child", "parent": "root", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 2
assert len(edges) == 1
assert edges[0] == {"from": "root", "to": "child"}
def test_i_can_create_ghost_node_for_missing_parent(self):
"""Test automatic creation of ghost node when parent doesn't exist."""
items = [
{"id": "child", "parent": "missing_parent", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 2
# Find the ghost node
ghost_node = [n for n in nodes if n["id"] == "missing_parent"][0]
assert ghost_node is not None
assert len(edges) == 1
assert edges[0] == {"from": "missing_parent", "to": "child"}
def test_i_can_apply_ghost_color_to_missing_parent(self):
"""Test that ghost nodes have the default ghost color."""
items = [
{"id": "child", "parent": "ghost", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
ghost_node = [n for n in nodes if n["id"] == "ghost"][0]
assert "color" in ghost_node
assert ghost_node["color"] == "#ff9999"
def test_i_can_use_custom_ghost_color(self):
"""Test that custom ghost_color parameter is applied."""
items = [
{"id": "child", "parent": "ghost", "label": "Child"}
]
nodes, edges = from_parent_child_list(items, ghost_color="#0000ff")
ghost_node = [n for n in nodes if n["id"] == "ghost"][0]
assert ghost_node["color"] == "#0000ff"
def test_i_can_create_multiple_ghost_nodes(self):
"""Test handling of multiple missing parents."""
items = [
{"id": "child1", "parent": "ghost1", "label": "Child 1"},
{"id": "child2", "parent": "ghost2", "label": "Child 2"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 4 # 2 real + 2 ghost
ghost_ids = [n["id"] for n in nodes if "color" in n]
assert "ghost1" in ghost_ids
assert "ghost2" in ghost_ids
def test_i_can_avoid_duplicate_ghost_nodes(self):
"""Test that same missing parent creates only one ghost node."""
items = [
{"id": "child1", "parent": "ghost", "label": "Child 1"},
{"id": "child2", "parent": "ghost", "label": "Child 2"}
]
nodes, edges = from_parent_child_list(items)
assert len(nodes) == 3 # 2 real + 1 ghost
ghost_nodes = [n for n in nodes if n["id"] == "ghost"]
assert len(ghost_nodes) == 1
assert len(edges) == 2
assert {"from": "ghost", "to": "child1"} in edges
assert {"from": "ghost", "to": "child2"} in edges
def test_i_can_use_custom_id_getter(self):
"""Test custom callback for extracting node ID."""
items = [
{"node_id": "root", "label": "Root"}
]
def custom_id_getter(item):
return item.get("node_id")
nodes, edges = from_parent_child_list(
items,
id_getter=custom_id_getter
)
assert nodes[0]["id"] == "root"
def test_i_can_use_custom_label_getter(self):
"""Test custom callback for extracting node label."""
items = [
{"id": "root", "name": "Custom Label"}
]
def custom_label_getter(item):
return item.get("name", "")
nodes, edges = from_parent_child_list(
items,
label_getter=custom_label_getter
)
assert nodes[0]["label"] == "Custom Label"
def test_i_can_use_custom_parent_getter(self):
"""Test custom callback for extracting parent ID."""
items = [
{"id": "root", "label": "Root"},
{"id": "child", "parent_id": "root", "label": "Child"}
]
def custom_parent_getter(item):
return item.get("parent_id")
nodes, edges = from_parent_child_list(
items,
parent_getter=custom_parent_getter
)
assert len(edges) == 1
assert edges[0] == {"from": "root", "to": "child"}
def test_i_can_handle_empty_list(self):
"""Test that empty list returns empty nodes and edges."""
items = []
nodes, edges = from_parent_child_list(items)
assert nodes == []
assert edges == []
def test_i_can_use_id_as_label_for_ghost_nodes(self):
"""Test that ghost nodes use their ID as label by default."""
items = [
{"id": "child", "parent": "ghost_parent", "label": "Child"}
]
nodes, edges = from_parent_child_list(items)
ghost_node = [n for n in nodes if n["id"] == "ghost_parent"][0]
assert ghost_node["label"] == "ghost_parent"