2 Commits

26 changed files with 2807 additions and 775 deletions

View File

@@ -0,0 +1,116 @@
# DataGrid Refactoring
## Objective
Clearly separate data management and rendering responsibilities in the DataGrid system.
The current architecture mixes data mutation, formula computation, and rendering in the
same `DataGrid` class, which complicates cross-table formula management and code reasoning.
## Guiding Principles
- `DataService` can exist without rendering. The reverse is not true.
- All data mutations go through `DataService`.
- Columns have two facets: data semantics (`ColumnDefinition`) and UI presentation (`ColumnUiState`).
- No more parent hierarchy where avoidable — access via `InstancesManager.get_by_type()`.
- The persistence key is `grid_id` (stable), not `table_name` (can change over time).
---
## New Classes (`core/data/`)
### `DataServicesManager` — SingleInstance
- Owns the `FormulaEngine` (cross-table formula coordination)
- Creates `DataService` instances on demand from `DataGridsManager`
- Provides access to `DataService` instances by `grid_id`
- Provides the resolver callback for `FormulaEngine`: `grid_id → DataStore`
### `DataService` — companion to `DataGrid`
- Owns `DataStore` and `list[ColumnDefinition]`
- Holds a reference to `DataServicesManager` for `FormulaEngine` access
- Methods: `load_dataframe(df)`, `add_row()`, `add_column()`, `set_data(col_id, row_index, value)`
- Mutations call `mark_data_changed()` → set dirty flag
- `ensure_ready()` → recalculates formulas if dirty (called by `mk_body_content_page()`)
- Can exist without any rendering
### `DataStore` — renamed from `DatagridStore`
- Pure persistence: `ne_df`, `ns_fast_access`, `ns_row_data`, `ns_total_rows`
- `DbObject` with no business logic
### `ColumnDefinition`
- Data semantics: `col_id`, `title`, `type`, `formula`, `col_index`
---
## Modified Classes
### `DataGridsRegistry` — streamlined
- Persistence only: `put()`, `remove()`, `get_all_entries()`
- **Loses**: `get_columns()`, `get_column_type()`, `get_column_values()`, `get_row_count()`
### `DatagridMetadataProvider` — becomes a concrete SingleInstance
- No longer abstract / interface (only one concrete implementation exists)
- Reads from `DataServicesManager` and `DataGridsRegistry`
- Holds: `style_presets`, `formatter_presets`, `all_tables_formats`
- Exposes: `list_tables()`, `list_columns()`, `list_column_values()`, `get_column_type()`,
`list_style_presets()`, `list_format_presets()`
### `DataGridsManager` — pure UI
- **Keeps**: `TreeView`, `TabsManager`, document state, `Commands`
- **Loses**: `FormulaEngine`, presets, `DatagridMetadataProvider`, `_resolve_store_for_table()`
### `DataGrid` — pure rendering
- **Keeps**: `mk_*`, `render()`, `__ft__()`, `_state`, `_settings`
- **Keeps**: `_apply_sort()`, `_apply_filter()`, `_get_filtered_df()`
- **Loses**: `add_new_row()`, `add_new_column()`, `init_from_dataframe()`,
`_recalculate_formulas()`, `_register_existing_formulas()`, `_df_store`
- Accesses its `DataService` via its `grid_id`:
`InstancesManager.get_by_type(DataServicesManager).get_service(grid_id)`
- `mk_body_content_page()` calls `data_service.ensure_ready()` before rendering
### `DatagridState`
- `columns` changes from `list[DataGridColumnState]``list[ColumnUiState]`
- Everything else remains unchanged
### `DataGridColumnState` — split into two classes
| Class | Belongs to | Fields |
|---|---|---|
| `ColumnDefinition` | `DataService` | `col_id`, `title`, `type`, `formula`, `col_index` |
| `ColumnUiState` | `DatagridState` | `col_id`, `width`, `visible`, `format` |
---
## Structural Fix
**Current bug**: `mark_data_changed()` is defined in `FormulaEngine` but is never called
by `DataGrid`. Formulas are only recalculated defensively at render time.
**After refactoring**:
- Every mutation in `DataService` calls `mark_data_changed()` → dirty flag set
- `mk_body_content_page()` calls `data_service.ensure_ready()` → recalculates if dirty
- Multiple mutations before a render = a single recalculation
---
## Progress Tracking
- [x] Create `DataStore` (rename `DatagridStore`)
- [x] Create `ColumnDefinition`
- [x] Create `DataService`
- [x] Create `DataServicesManager`
- [x] Refactor `DataGridsRegistry` (streamline)
- [x] Refactor `DatagridMetadataProvider` (make concrete)
- [x] Refactor `DataGridsManager` (pure UI)
- [x] Refactor `DataGrid` (pure rendering, split `DataGridColumnState`)
- [x] Update tests
- [ ] Remove `init_from_dataframe` from `DataGrid` (kept temporarily for transition)
- [ ] Full split of `DataGridColumnState` into `ColumnDefinition` + `ColumnUiState` in `DatagridState`

View File

@@ -1,9 +1,7 @@
import json
import logging.config
import pandas as pd
import yaml
from dbengine.handlers import BaseRefHandler, handlers
from dbengine.handlers import handlers
from fasthtml import serve
from fasthtml.components import Div
@@ -15,7 +13,6 @@ from myfasthtml.controls.InstancesDebugger import InstancesDebugger
from myfasthtml.controls.Keyboard import Keyboard
from myfasthtml.controls.Layout import Layout
from myfasthtml.controls.TabsManager import TabsManager
from myfasthtml.controls.TreeView import TreeView, TreeNode
from myfasthtml.controls.helpers import Ids, mk
from myfasthtml.core.dbengine_utils import DataFrameHandler
from myfasthtml.core.instances import UniqueInstance
@@ -39,53 +36,6 @@ app, rt = create_app(protect_routes=True,
base_url="http://localhost:5003")
def create_sample_treeview(parent):
"""
Create a sample TreeView with a file structure for testing.
Args:
parent: Parent instance for the TreeView
Returns:
TreeView: Configured TreeView instance with sample data
"""
tree_view = TreeView(parent, _id="-treeview")
# Create sample file structure
projects = TreeNode(label="Projects", type="folder")
tree_view.add_node(projects)
myfasthtml = TreeNode(label="MyFastHtml", type="folder")
tree_view.add_node(myfasthtml, parent_id=projects.id)
app_py = TreeNode(label="app.py", type="file")
tree_view.add_node(app_py, parent_id=myfasthtml.id)
readme = TreeNode(label="README.md", type="file")
tree_view.add_node(readme, parent_id=myfasthtml.id)
src_folder = TreeNode(label="src", type="folder")
tree_view.add_node(src_folder, parent_id=myfasthtml.id)
controls_py = TreeNode(label="controls.py", type="file")
tree_view.add_node(controls_py, parent_id=src_folder.id)
documents = TreeNode(label="Documents", type="folder")
tree_view.add_node(documents, parent_id=projects.id)
notes = TreeNode(label="notes.txt", type="file")
tree_view.add_node(notes, parent_id=documents.id)
todo = TreeNode(label="todo.md", type="file")
tree_view.add_node(todo, parent_id=documents.id)
# Expand all nodes to show the full structure
# tree_view.expand_all()
return tree_view
@rt("/")
def index(session):
session_instance = UniqueInstance(session=session,
@@ -120,19 +70,15 @@ def index(session):
btn_popup = mk.label("Popup",
command=add_tab("Popup", Dropdown(layout, "Content", button="button", _id="-dropdown")))
# Create TreeView with sample data
tree_view = create_sample_treeview(layout)
layout.header_left.add(tabs_manager.add_tab_btn())
layout.header_right.add(btn_show_right_drawer)
layout.left_drawer.add(btn_show_instances_debugger, "Debugger")
layout.left_drawer.add(btn_show_commands_debugger, "Debugger")
layout.left_drawer.add(btn_file_upload, "Test")
layout.left_drawer.add(btn_popup, "Test")
layout.left_drawer.add(tree_view, "TreeView")
# data grids
dgs_manager = DataGridsManager(layout, _id="-datagrids")
dgs_manager = DataGridsManager(session_instance)
layout.left_drawer.add_group("Documents", Div("Documents",
dgs_manager.mk_main_icons(),
cls="mf-layout-group flex gap-3"))

View File

@@ -5,10 +5,8 @@ from dataclasses import dataclass
from functools import lru_cache
from typing import Optional
import pandas as pd
from fasthtml.common import NotStr
from fasthtml.components import *
from pandas import DataFrame
from myfasthtml.controls.BaseCommands import BaseCommands
from myfasthtml.controls.CycleStateControl import CycleStateControl
@@ -17,23 +15,28 @@ from myfasthtml.controls.DataGridFormattingEditor import DataGridFormattingEdito
from myfasthtml.controls.DslEditor import DslEditorConf
from myfasthtml.controls.IconsHelper import IconsHelper
from myfasthtml.controls.Keyboard import Keyboard
from myfasthtml.controls.Mouse import Mouse
from myfasthtml.controls.Panel import Panel, PanelConf
from myfasthtml.controls.Query import Query, QUERY_FILTER
from myfasthtml.controls.datagrid_objects import DataGridColumnState, DataGridRowState, \
DatagridSelectionState, DataGridHeaderFooterConf, DatagridEditionState
from myfasthtml.controls.helpers import mk, column_type_defaults
from myfasthtml.controls.datagrid_objects import DataGridColumnState, DataGridRowUiState, \
DatagridSelectionState, DataGridHeaderFooterConf, DatagridEditionState, DataGridColumnUiState, \
DataGridRowSelectionColumnState
from myfasthtml.controls.helpers import mk
from myfasthtml.core.commands import Command
from myfasthtml.core.constants import ColumnType, ROW_INDEX_ID, FooterAggregation, DATAGRID_PAGE_SIZE, FILTER_INPUT_CID, \
ROW_SELECTION_ID
from myfasthtml.core.constants import ColumnType, FooterAggregation, DATAGRID_PAGE_SIZE, FILTER_INPUT_CID
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
from myfasthtml.core.data.DataService import DataService
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.dsls import DslsManager
from myfasthtml.core.formatting.dsl.completion.FormattingCompletionEngine import FormattingCompletionEngine
from myfasthtml.core.formatting.dsl.completion.provider import DatagridMetadataProvider
from myfasthtml.core.formatting.dsl.definition import FormattingDSL
from myfasthtml.core.formatting.dsl.parser import DSLParser
from myfasthtml.core.formatting.engine import FormattingEngine
from myfasthtml.core.instances import MultipleInstance
from myfasthtml.core.instances import MultipleInstance, InstancesManager
from myfasthtml.core.optimized_ft import OptimizedDiv
from myfasthtml.core.utils import make_safe_id, merge_classes, make_unique_safe_id, is_null
from myfasthtml.core.utils import merge_classes, is_null
from myfasthtml.icons.carbon import row, column, grid
from myfasthtml.icons.fluent import checkbox_unchecked16_regular
from myfasthtml.icons.fluent_p2 import checkbox_checked16_regular, column_edit20_regular, add12_filled
@@ -70,8 +73,8 @@ class DatagridState(DbObject):
super().__init__(owner, name=f"{owner.get_id()}#state", save_state=save_state)
self.sidebar_visible: bool = False
self.selected_view: str = None
self.columns: list[DataGridColumnState] = []
self.rows: list[DataGridRowState] = [] # only the rows that have a specific state
self.columns: list[DataGridColumnUiState] = []
self.rows: list[DataGridRowUiState] = [] # only the rows that have a specific state
self.headers: list[DataGridHeaderFooterConf] = []
self.footers: list[DataGridHeaderFooterConf] = []
self.sorted: list = []
@@ -101,20 +104,6 @@ class DatagridSettings(DbObject):
self.enable_edition: bool = True
class DatagridStore(DbObject):
"""
Store Dataframes
"""
def __init__(self, owner, save_state):
with self.initializing():
super().__init__(owner, name=f"{owner.get_id()}#df", save_state=save_state)
self.ne_df = None
self.ns_fast_access = None
self.ns_row_data = None
self.ns_total_rows = None
class Commands(BaseCommands):
def get_page(self, page_index: int):
return Command("GetPage",
@@ -231,11 +220,15 @@ class DataGrid(MultipleInstance):
name, namespace = (conf.name, conf.namespace) if conf else ("No name", "__default__")
self._settings = DatagridSettings(self, save_state=save_state, name=name, namespace=namespace)
self._state = DatagridState(self, save_state=self._settings.save_state)
self._df_store = DatagridStore(self, save_state=self._settings.save_state)
self._formatting_engine = FormattingEngine()
self._columns = None
self.commands = Commands(self)
self.init_from_dataframe(self._df_store.ne_df, init_state=False) # data comes from DatagridStore
# Obtain DataService from DataServicesManager (no parent hierarchy)
data_services_manager = InstancesManager.get_by_type(self._session, DataServicesManager)
data_service_id = self.get_data_service_id_from_data_grid_id(self._id)
self._data_service = data_services_manager.restore_service(data_service_id)
self._init_columns()
# add Panel
self._panel = Panel(self, conf=PanelConf(show_display_right=False), _id="-panel")
@@ -253,15 +246,17 @@ class DataGrid(MultipleInstance):
# add Selection Selector
selection_types = {
"cell": mk.icon(grid, tooltip="Cell selection"), # default
"row": mk.icon(row, tooltip="Row selection"),
"column": mk.icon(column, tooltip="Column selection"),
"cell": mk.icon(grid, tooltip="Cell selection")
}
self._selection_mode_selector = CycleStateControl(self,
controls=selection_types,
save_state=False,
_id="-cycle_state")
self._selection_mode_selector.bind_command("CycleState", self.commands.change_selection_mode())
if self._state.selection.selection_mode is None:
self.change_selection_mode()
# add columns manager
self._columns_manager = DataGridColumnsManager(self)
@@ -269,7 +264,8 @@ class DataGrid(MultipleInstance):
self._columns_manager.bind_command("SaveColumnDetails", self.commands.on_column_changed())
if self._settings.enable_formatting:
completion_engine = FormattingCompletionEngine(self._parent, self.get_table_name())
provider = InstancesManager.get_by_type(self._session, DatagridMetadataProvider, None)
completion_engine = FormattingCompletionEngine(provider, self.get_table_name())
editor_conf = DslEditorConf(engine_id=completion_engine.get_id())
dsl = FormattingDSL()
self._formatting_editor = DataGridFormattingEditor(self,
@@ -301,7 +297,13 @@ class DataGrid(MultipleInstance):
@property
def _df(self):
return self._df_store.ne_df
if self._data_service is None:
return None
return self._data_service.get_store().ne_df
@property
def _fast_access(self):
return self._data_service.get_store().ns_fast_access
def _apply_sort(self, df):
if df is None:
@@ -344,7 +346,8 @@ class DataGrid(MultipleInstance):
df = self._df.copy()
df = self._apply_sort(df) # need to keep the real type to sort
df = self._apply_filter(df)
self._df_store.ns_total_rows = len(df)
if self._data_service is not None:
self._data_service.get_store().ns_total_rows = len(df)
return df
@@ -375,36 +378,6 @@ class DataGrid(MultipleInstance):
self._state.selection.selected = pos
self._state.save()
def _register_existing_formulas(self) -> None:
"""
Re-register all formula columns with the FormulaEngine.
Called after data reload to ensure the engine knows about all
formula columns and their expressions.
"""
engine = self.get_formula_engine()
if engine is None:
return
table = self.get_table_name()
for col_def in self._state.columns:
if col_def.formula:
try:
engine.set_formula(table, col_def.col_id, col_def.formula)
except Exception as e:
logger.warning("Failed to register formula for %s.%s: %s", table, col_def.col_id, e)
def _recalculate_formulas(self) -> None:
"""
Recalculate dirty formula columns before rendering.
Called at the start of mk_body_content_page() to ensure formula
columns are up-to-date before cells are rendered.
"""
engine = self.get_formula_engine()
if engine is None:
return
engine.recalculate_if_needed(self.get_table_name(), self._df_store)
def _get_format_rules(self, col_pos, row_index, col_def):
"""
Get format rules for a cell, returning only the most specific level defined.
@@ -440,197 +413,55 @@ class DataGrid(MultipleInstance):
if self._state.table_format:
return self._state.table_format
# Get global tables formatting from manager
return self._parent.all_tables_formats
# Get global tables formatting from DatagridMetadataProvider
provider = InstancesManager.get_by_type(self._session, DatagridMetadataProvider, None)
return provider.all_tables_formats if provider is not None else []
def _init_columns(self):
self._columns = self._state.columns.copy()
# Populate UI state from DataService columns when creating a new grid
columns_defs = self._data_service.columns
columns = []
if self._state.columns:
# we need to make sure that we match the DataGridColumnUiState and the ColumnDefinition
for col_ui_state in self._state.columns:
col_def = [col_def for col_def in columns_defs if col_def.col_id == col_ui_state.col_id][0]
columns.append(DataGridColumnState(col_def, col_ui_state))
self._columns = columns
else:
# init the ui states
ui_states = [DataGridColumnUiState(col_def.col_id) for col_def in columns_defs]
self._state.columns = ui_states # this saves the state of the columns
self._columns = [DataGridColumnState(col_def, col_ui_state)
for col_def, col_ui_state in zip(columns_defs, ui_states)]
if self._settings.enable_edition:
self._columns.insert(0, DataGridColumnState(ROW_SELECTION_ID,
-1,
"",
ColumnType.RowSelection_))
def init_from_dataframe(self, df, init_state=True):
def _get_column_type(dtype):
if pd.api.types.is_integer_dtype(dtype):
return ColumnType.Number
elif pd.api.types.is_float_dtype(dtype):
return ColumnType.Number
elif pd.api.types.is_bool_dtype(dtype):
return ColumnType.Bool
elif pd.api.types.is_datetime64_any_dtype(dtype):
return ColumnType.Datetime
else:
return ColumnType.Text # Default to Text if no match
def _init_columns(_df):
columns = [DataGridColumnState(make_safe_id(col_id),
col_index,
col_id,
_get_column_type(self._df[make_safe_id(col_id)].dtype))
for col_index, col_id in enumerate(_df.columns)]
return columns
def _init_fast_access(_df):
"""
Generates a fast-access dictionary for a DataFrame.
This method converts the columns of the provided DataFrame into NumPy arrays
and stores them as values in a dictionary, using the column names as keys.
This allows for efficient access to the data stored in the DataFrame.
Args:
_df (DataFrame): The input pandas DataFrame whose columns are to be converted
into a dictionary of NumPy arrays.
Returns:
dict: A dictionary where the keys are the column names of the input DataFrame
and the values are the corresponding column values as NumPy arrays.
"""
if _df is None:
return {}
res = {col: _df[col].to_numpy() for col in _df.columns}
res[ROW_INDEX_ID] = _df.index.to_numpy()
return res
def _init_row_data(_df):
"""
Generates a list of row data dictionaries for column references in formatting conditions.
Each dict contains {col_id: value} for a single row, used by FormattingEngine
to evaluate conditions that reference other columns (e.g., {"col": "budget"}).
Args:
_df (DataFrame): The input pandas DataFrame.
Returns:
list[dict]: A list where each element is a dict of column values for that row.
"""
if _df is None:
return []
return _df.to_dict(orient='records')
if df is not None:
self._df_store.ne_df = df
if init_state:
self._df.columns = self._df.columns.map(make_safe_id) # make sure column names are trimmed
self._df_store.save()
self._state.rows = [] # sparse: only rows with non-default state are stored
self._state.columns = _init_columns(df) # use df not self._df to keep the original title
self._init_columns()
self._df_store.ns_fast_access = _init_fast_access(self._df)
self._df_store.ns_row_data = _init_row_data(self._df)
self._df_store.ns_total_rows = len(self._df) if self._df is not None else 0
self._register_existing_formulas()
return self
self._columns.insert(0, DataGridRowSelectionColumnState())
def add_new_column(self, col_def: DataGridColumnState) -> None:
"""Add a new column to the DataGrid.
For Formula columns, only _state.columns is updated; the FormulaEngine
computes values on demand via recalculate_if_needed().
For other column types, also adds the column to the DataFrame and updates
ns_fast_access and ns_row_data incrementally with type-appropriate defaults.
"""Add a new column, delegating data mutation to DataService.
Args:
col_def: Column definition with title and type already set.
col_id is derived from title via make_safe_id.
col_def: Column definition with title and type set. col_id will be
assigned by DataService.
"""
col_def.col_id = make_unique_safe_id(col_def.title, [c.col_id for c in self._state.columns])
if col_def.type == ColumnType.Formula:
col_def.col_index = -1
self._state.columns.append(col_def)
return
default_value = column_type_defaults.get(col_def.type, "")
col_def.col_index = len(self._df.columns) if self._df is not None else 0
self._state.columns.append(col_def)
if self._df is not None:
self._df_store.ne_df[col_def.col_id] = default_value
self._df_store.ns_fast_access[col_def.col_id] = self._df_store.ne_df[col_def.col_id].to_numpy()
for row_dict in self._df_store.ns_row_data:
row_dict[col_def.col_id] = default_value
self._df_store.save()
def add_new_row(self, row_data: dict = None) -> None:
"""Add a new row to the DataGrid with incremental updates.
Creates default values based on column types and handles formula columns.
Updates ns_fast_access and ns_row_data incrementally for better performance.
Args:
row_data: Optional dict with initial values. If None, uses type defaults.
"""
import numpy as np
def _get_default_value(col_def: DataGridColumnState):
"""Get default value for a column based on its type."""
if col_def.type == ColumnType.Formula:
return None # Will be calculated by FormulaEngine
return column_type_defaults.get(col_def.type, "")
if row_data is None:
# Create default values for all non-formula, non-selection columns
row_data = {}
for col in self._state.columns:
if col.type not in (ColumnType.Formula, ColumnType.RowSelection_):
row_data[col.col_id] = _get_default_value(col)
# 1. Add row to DataFrame (only non-formula columns)
new_index = len(self._df)
self._df.loc[new_index] = row_data
# 2. Incremental update of ns_fast_access
for col_id, value in row_data.items():
if col_id in self._df_store.ns_fast_access:
self._df_store.ns_fast_access[col_id] = np.append(
self._df_store.ns_fast_access[col_id],
value
)
else:
# First value for this column (rare case)
self._df_store.ns_fast_access[col_id] = np.array([value])
# Update row index
if ROW_INDEX_ID in self._df_store.ns_fast_access:
self._df_store.ns_fast_access[ROW_INDEX_ID] = np.append(
self._df_store.ns_fast_access[ROW_INDEX_ID],
new_index
if self._data_service is not None:
data_col = ColumnDefinition(
col_id="",
col_index=col_def.col_index,
title=col_def.title,
type=col_def.type,
formula=col_def.formula,
)
else:
self._df_store.ns_fast_access[ROW_INDEX_ID] = np.array([new_index])
# 3. Incremental update of ns_row_data
self._df_store.ns_row_data.append(row_data.copy())
# 4. Handle formula columns
engine = self.get_formula_engine()
if engine is not None:
table_name = self.get_table_name()
# Check if there are any formula columns
has_formulas = any(col.type == ColumnType.Formula for col in self._state.columns)
if has_formulas:
try:
# Recalculate all formulas (engine handles efficiency)
engine.recalculate_if_needed(table_name, self._df_store)
except Exception as e:
logger.warning(f"Failed to calculate formulas for new row: {e}")
# 5. Update total row count
self._df_store.ns_total_rows = len(self._df)
# Save changes
self._df_store.save()
self._data_service.add_column(data_col)
col_def.col_id = data_col.col_id
col_def.col_index = data_col.col_index
self._state.columns.append(col_def)
self._init_columns()
self._state.save()
def move_column(self, source_col_id: str, target_col_id: str):
"""Move column to new position. Called via Command from JS."""
@@ -676,7 +507,7 @@ class DataGrid(MultipleInstance):
Returns:
Optimal width in pixels (between 50 and 500)
"""
col_def = next((c for c in self._state.columns if c.col_id == col_id), None)
col_def = next((c for c in self._columns if c.col_id == col_id), None)
if not col_def:
logger.warning(f"calculate_optimal_column_width: column not found {col_id=}")
return 150 # default width
@@ -686,8 +517,8 @@ class DataGrid(MultipleInstance):
# Max data length
max_data_length = 0
if col_id in self._df_store.ns_fast_access:
col_array = self._df_store.ns_fast_access[col_id]
if col_id in self._fast_access:
col_array = self._fast_access[col_id]
if col_array is not None and len(col_array) > 0:
max_data_length = max(len(str(v)) for v in col_array)
@@ -774,7 +605,6 @@ class DataGrid(MultipleInstance):
def change_selection_mode(self):
logger.debug(f"change_selection_mode")
new_state = self._selection_mode_selector.get_state()
logger.debug(f" {new_state=}")
self._state.selection.selection_mode = new_state
self._state.save()
return self.render_partial()
@@ -812,7 +642,8 @@ class DataGrid(MultipleInstance):
self._state.save()
def handle_add_row(self):
self.add_new_row() # Add row to data
if self._data_service is not None:
self._data_service.add_row()
row_index = len(self._df) - 1 # Index of the newly added row
len_columns_1 = len(self._columns) - 1
rows = [self.mk_row(row_index, None, len_columns_1)]
@@ -835,8 +666,17 @@ class DataGrid(MultipleInstance):
return f"{self._settings.namespace}.{self._settings.name}" if self._settings.namespace else self._settings.name
def get_formula_engine(self):
"""Return the FormulaEngine from the DataGridsManager, if available."""
return self._parent.get_formula_engine()
"""Return the shared FormulaEngine from DataServicesManager."""
manager = InstancesManager.get_by_type(self._session, DataServicesManager, None)
return manager.get_formula_engine() if manager is not None else None
@staticmethod
def get_grid_id_from_data_service_id(data_service_id):
return data_service_id.replace(DataService.compute_prefix(), DataGrid.compute_prefix(), 1)
@staticmethod
def get_data_service_id_from_data_grid_id(datagrid_id):
return datagrid_id.replace(DataGrid.compute_prefix(), DataService.compute_prefix(), 1)
def mk_headers(self):
resize_cmd = self.commands.set_column_width()
@@ -917,7 +757,7 @@ class DataGrid(MultipleInstance):
# Formula column: safe read — ns_fast_access entry may not exist yet if the
# engine hasn't run its first recalculation pass.
if column_type == ColumnType.Formula:
col_array = self._df_store.ns_fast_access.get(col_def.col_id)
col_array = self._fast_access.get(col_def.col_id)
if col_array is None or row_index >= len(col_array):
return NotStr('<span class="dt2-cell-content-text truncate">—</span>')
value = col_array[row_index]
@@ -932,7 +772,7 @@ class DataGrid(MultipleInstance):
else:
column_type = ColumnType.Text
else:
value = self._df_store.ns_fast_access[col_def.col_id][row_index]
value = self._fast_access[col_def.col_id][row_index]
# Boolean type - uses cached HTML (only 2 possible values)
if column_type == ColumnType.Bool:
@@ -981,7 +821,7 @@ class DataGrid(MultipleInstance):
if col_def.type == ColumnType.RowSelection_:
return OptimizedDiv(cls="dt2-row-selection")
col_array = self._df_store.ns_fast_access.get(col_def.col_id)
col_array = self._fast_access.get(col_def.col_id)
value = col_array[row_index] if col_array is not None and row_index < len(col_array) else None
content = self.mk_body_cell_content(col_pos, row_index, col_def, filter_keyword_lower)
@@ -1013,7 +853,8 @@ class DataGrid(MultipleInstance):
OPTIMIZED: Extract filter keyword once instead of 10,000 times.
OPTIMIZED: Uses OptimizedDiv for rows instead of Div for faster rendering.
"""
self._recalculate_formulas()
if self._data_service is not None:
self._data_service.ensure_ready()
df = self._get_filtered_df()
if df is None:
return []
@@ -1026,7 +867,8 @@ class DataGrid(MultipleInstance):
len_columns_1 = len(self._columns) - 1
rows = [self.mk_row(row_index, filter_keyword_lower, len_columns_1) for row_index in df.index[start:end]]
if self._df_store.ns_total_rows > end:
store = self._data_service.get_store() if self._data_service is not None else None
if store is not None and store.ns_total_rows > end:
rows[-1].attrs.extend(self.commands.get_page(page_index + 1).get_htmx_params(escaped=True))
self._mk_append_add_row_if_needed(rows)
@@ -1184,7 +1026,7 @@ class DataGrid(MultipleInstance):
)
def render(self):
if self._df_store.ne_df is None:
if self._df is None:
return Div("No data to display !")
return Div(
@@ -1201,8 +1043,10 @@ class DataGrid(MultipleInstance):
cls="flex items-center justify-between mb-2"),
self._panel.set_main(self.mk_table_wrapper()),
Script(f"initDataGrid('{self._id}');"),
# Mouse(self, combinations=self._mouse_support, _id="-mouse"),
Keyboard(self, combinations=self._key_support, _id="-keyboard"),
Div(
Mouse(self, combinations=self._mouse_support, _id="-mouse"),
Keyboard(self, combinations=self._key_support, _id="-keyboard"),
),
id=self._id,
cls="grid",
style="height: 100%; grid-template-rows: auto 1fr;"

View File

@@ -159,24 +159,20 @@ class DataGridColumnsManager(MultipleInstance):
return self.mk_column_details(col_def)
def _register_formula(self, col_def) -> None:
"""Register or remove a formula column with the FormulaEngine.
"""Register or remove a formula column with the FormulaEngine via DataService.
Registers only when col_def.type is Formula and the formula text is
non-empty. Removes the formula in all other cases so the engine stays
consistent with the column definition.
"""
engine = self._parent.get_formula_engine()
if engine is None:
data_service = getattr(self._parent, "_data_service", None)
if data_service is None:
return
table = self._parent.get_table_name()
if col_def.type == ColumnType.Formula and col_def.formula:
try:
engine.set_formula(table, col_def.col_id, col_def.formula)
logger.debug("Registered formula for %s.%s", table, col_def.col_id)
except Exception as e:
logger.warning("Formula error for %s.%s: %s", table, col_def.col_id, e)
data_service.register_formula(col_def.col_id, col_def.formula)
logger.debug("Registered formula for col %s", col_def.col_id)
else:
engine.remove_formula(table, col_def.col_id)
data_service.remove_formula(col_def.col_id)
def mk_column_label(self, col_def: DataGridColumnState):
return Div(

View File

@@ -2,7 +2,7 @@ import logging
from collections import defaultdict
from myfasthtml.controls.DslEditor import DslEditor
from myfasthtml.controls.datagrid_objects import DataGridRowState
from myfasthtml.controls.datagrid_objects import DataGridRowUiState
from myfasthtml.core.formatting.dsl import parse_dsl, DSLSyntaxError, ColumnScope, RowScope, CellScope, TableScope, TablesScope
from myfasthtml.core.instances import InstancesManager
@@ -111,7 +111,7 @@ class DataGridFormattingEditor(DslEditor):
for row_index, rules in rows_rules.items():
row_state = next((r for r in state.rows if r.row_id == row_index), None)
if row_state is None:
row_state = DataGridRowState(row_id=row_index)
row_state = DataGridRowUiState(row_id=row_index)
state.rows.append(row_state)
row_state.format = rules

View File

@@ -13,10 +13,8 @@ from myfasthtml.controls.TreeView import TreeView, TreeNode, TreeViewConf
from myfasthtml.controls.helpers import mk
from myfasthtml.core.DataGridsRegistry import DataGridsRegistry
from myfasthtml.core.commands import Command
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.formatting.dsl.completion.provider import DatagridMetadataProvider
from myfasthtml.core.formatting.presets import DEFAULT_STYLE_PRESETS, DEFAULT_FORMATTER_PRESETS
from myfasthtml.core.formula.engine import FormulaEngine
from myfasthtml.core.instances import InstancesManager, SingleInstance
from myfasthtml.icons.fluent_p1 import table_add20_regular
from myfasthtml.icons.fluent_p3 import folder_open20_regular
@@ -50,7 +48,7 @@ class Commands(BaseCommands):
return Command("NewGrid",
"New grid",
self._owner,
self._owner.new_grid).htmx(target=f"#{self._owner._tree.get_id()}")
self._owner.handle_new_grid).htmx(target=f"#{self._owner._tree.get_id()}")
def open_from_excel(self, tab_id, file_upload):
return Command("OpenFromExcel",
@@ -81,11 +79,16 @@ class Commands(BaseCommands):
key="DeleteNode")
class DataGridsManager(SingleInstance, DatagridMetadataProvider):
class DataGridsManager(SingleInstance):
"""UI manager for DataGrids.
Responsible for the visual organisation of DataGrids: TreeView, TabsManager,
and document lifecycle (create, open, delete). All data concerns are handled
by DataServicesManager and DataService.
"""
def __init__(self, parent, _id=None):
if not getattr(self, "_is_new_instance", False):
# Skip __init__ if instance already existed
return
super().__init__(parent, _id=_id)
self.commands = Commands(self)
@@ -96,53 +99,47 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
self._tabs_manager = InstancesManager.get_by_type(self._session, TabsManager, None)
self._registry = DataGridsRegistry(parent)
# Global presets shared across all DataGrids
self.style_presets: dict = DEFAULT_STYLE_PRESETS.copy()
self.formatter_presets: dict = DEFAULT_FORMATTER_PRESETS.copy()
self.all_tables_formats: list = []
# Formula engine shared across all DataGrids in this session
self._formula_engine = FormulaEngine(
registry_resolver=self._resolve_store_for_table
)
# Data layer — session-scoped singletons
self._data_services_manager = DataServicesManager(self._parent)
def upload_from_source(self):
file_upload = FileUpload(self)
tab_id = self._tabs_manager.create_tab("Upload Datagrid", file_upload)
file_upload.set_on_ok(self.commands.open_from_excel(tab_id, file_upload))
return self._tabs_manager.show_tab(tab_id)
# ------------------------------------------------------------------
# Grid lifecycle
# ------------------------------------------------------------------
def _create_and_register_grid(self, namespace: str, name: str, df: pd.DataFrame) -> DataGrid:
"""
Create and register a DataGrid.
"""Create a DataGrid and its companion DataService, then register both.
Args:
namespace: Grid namespace
name: Grid name
df: DataFrame to initialize the grid with
namespace: Grid namespace.
name: Grid name.
df: DataFrame to initialise the grid with.
Returns:
Created DataGrid instance
Created DataGrid instance.
"""
table_name = f"{namespace}.{name}" if namespace else name
data_service = self._data_services_manager.create_service(table_name, save_state=True)
data_service.load_dataframe(df)
grid_id = DataGrid.get_grid_id_from_data_service_id(data_service.get_id())
dg_conf = DatagridConf(namespace=namespace, name=name)
dg = DataGrid(self, conf=dg_conf, save_state=True)
dg.init_from_dataframe(df)
dg = DataGrid(self, conf=dg_conf, save_state=True, _id=grid_id)
self._registry.put(namespace, name, dg.get_id())
return dg
def _create_document(self, namespace: str, name: str, datagrid: DataGrid, tab_id: str = None) -> tuple[
str, DocumentDefinition]:
"""
Create a DocumentDefinition and its associated tab.
"""Create a DocumentDefinition and its associated tab.
Args:
namespace: Document namespace
name: Document name
datagrid: Associated DataGrid instance
tab_id: Optional existing tab ID. If None, creates a new tab
namespace: Document namespace.
name: Document name.
datagrid: Associated DataGrid instance.
tab_id: Optional existing tab ID. If None, creates a new tab.
Returns:
Tuple of (tab_id, document)
Tuple of (tab_id, document).
"""
if tab_id is None:
tab_id = self._tabs_manager.create_tab(name, datagrid)
@@ -159,15 +156,14 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
return tab_id, document
def _add_document_to_tree(self, document: DocumentDefinition, parent_id: str) -> TreeNode:
"""
Add a document to the tree view.
"""Add a document node to the tree view.
Args:
document: Document to add
parent_id: Parent node ID in the tree
document: Document to add.
parent_id: Parent node ID in the tree.
Returns:
Created TreeNode
Created TreeNode.
"""
tree_node = TreeNode(
id=document.document_id,
@@ -179,8 +175,17 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
self._tree.add_node(tree_node, parent_id=parent_id)
return tree_node
def new_grid(self):
# Determine parent folder
# ------------------------------------------------------------------
# Commands handlers
# ------------------------------------------------------------------
def upload_from_source(self):
file_upload = FileUpload(self)
tab_id = self._tabs_manager.create_tab("Upload Datagrid", file_upload)
file_upload.set_on_ok(self.commands.open_from_excel(tab_id, file_upload))
return self._tabs_manager.show_tab(tab_id)
def handle_new_grid(self):
selected_id = self._tree.get_selected_id()
if selected_id is None:
parent_id = self._tree.ensure_path("Untitled")
@@ -188,52 +193,40 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
node = self._tree._state.items[selected_id]
if node.type == "folder":
parent_id = selected_id
else: # leaf
else:
parent_id = node.parent
# Get namespace and generate unique name
namespace = self._tree._state.items[parent_id].label
namespace = self._tree.get_state().items[parent_id].label
name = self._generate_unique_sheet_name(parent_id)
# Create and register DataGrid
dg = self._create_and_register_grid(namespace, name, pd.DataFrame())
# Create document and tab
tab_id, document = self._create_document(namespace, name, dg)
# Add to tree
self._add_document_to_tree(document, parent_id)
# UI-specific handling: open parent, select node, start rename
if parent_id not in self._tree._state.opened:
self._tree._state.opened.append(parent_id)
self._tree._state.selected = document.document_id
if parent_id not in self._tree.get_state().opened:
self._tree.get_state().opened.append(parent_id)
self._tree.get_state().selected = document.document_id
self._tree._start_rename(document.document_id)
return self._tree, self._tabs_manager.show_tab(tab_id)
def _generate_unique_sheet_name(self, parent_id: str) -> str:
children = self._tree._state.items[parent_id].children
existing_labels = {self._tree._state.items[c].label for c in children}
children = self._tree.get_state().items[parent_id].children
existing_labels = {self._tree.get_state().items[c].label for c in children}
n = 1
while f"Sheet{n}" in existing_labels:
n += 1
return f"Sheet{n}"
def open_from_excel(self, tab_id, file_upload: FileUpload):
# Read Excel data
excel_content = file_upload.get_content()
df = pd.read_excel(BytesIO(excel_content), file_upload.get_sheet_name())
namespace = file_upload.get_file_basename()
name = file_upload.get_sheet_name()
# Create and register DataGrid
dg = self._create_and_register_grid(namespace, name, df)
# Create document with existing tab
tab_id, document = self._create_document(namespace, name, dg, tab_id=tab_id)
# Add to tree
parent_id = self._tree.ensure_path(document.namespace)
self._add_document_to_tree(document, parent_id)
@@ -243,80 +236,61 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
document_id = self._tree.get_bag(node_id)
try:
document = next(filter(lambda x: x.document_id == document_id, self._state.elements))
dg = DataGrid(self, _id=document.datagrid_id) # reload the state & settings
dg = DataGrid(self, _id=document.datagrid_id)
return self._tabs_manager.show_or_create_tab(document.tab_id, document.name, dg)
except StopIteration:
# the selected node is not a document (it's a folder)
return None
def delete_grid(self, node_id):
"""
Delete a grid and all its associated resources.
"""Delete a grid and all its associated resources.
This method is called BEFORE TreeView._delete_node() to ensure we can
access the node's bag to retrieve the document_id.
Called BEFORE TreeView._delete_node() so we can access the node bag.
Args:
node_id: ID of the TreeView node to delete
node_id: ID of the TreeView node to delete.
Returns:
None (TreeView will handle the node removal)
List of UI updates, or None.
"""
document_id = self._tree.get_bag(node_id)
if document_id is None:
# Node is a folder, not a document - nothing to clean up
return None
res = []
try:
# Find the document
document = next(filter(lambda x: x.document_id == document_id, self._state.elements))
# Get the DataGrid instance
dg = DataGrid(self, _id=document.datagrid_id)
# Close the tab
close_tab_res = self._tabs_manager.close_tab(document.tab_id)
res.append(close_tab_res)
# Remove from registry
self._registry.remove(document.datagrid_id)
self._data_services_manager.remove_service(document.datagrid_id)
# Clean up DataGrid (delete DBEngine entries)
dg.delete()
# Remove from InstancesManager
InstancesManager.remove(self._session, document.datagrid_id)
# Remove DocumentDefinition from state
self._state.elements = [d for d in self._state.elements if d.document_id != document_id]
self._state.save()
except StopIteration:
# Document not found - already deleted or invalid state
pass
return res
def create_tab_content(self, tab_id):
"""
Recreate the content for a tab managed by this DataGridsManager.
Called by TabsManager when the content is not in cache (e.g., after restart).
"""Recreate tab content after restart.
Args:
tab_id: ID of the tab to recreate content for
tab_id: ID of the tab to recreate.
Returns:
The recreated component (Panel with DataGrid)
DataGrid instance for the tab.
"""
# Find the document associated with this tab
document = next((d for d in self._state.elements if d.tab_id == tab_id), None)
if document is None:
raise ValueError(f"No document found for tab {tab_id}")
# Recreate the DataGrid with its saved state
dg = DataGrid(self, _id=document.datagrid_id) # reload the state & settings
dg = DataGrid(self, _id=document.datagrid_id)
return dg
def clear_tree(self):
@@ -324,106 +298,22 @@ class DataGridsManager(SingleInstance, DatagridMetadataProvider):
self._tree.clear()
return self._tree
# === DatagridMetadataProvider ===
def list_tables(self):
return self._registry.get_all_tables()
def list_columns(self, table_name):
return self._registry.get_columns(table_name)
def list_column_values(self, table_name, column_name):
return self._registry.get_column_values(table_name, column_name)
def get_row_count(self, table_name):
return self._registry.get_row_count(table_name)
def get_column_type(self, table_name, column_name):
return self._registry.get_column_type(table_name, column_name)
def list_style_presets(self) -> list[str]:
return list(self.style_presets.keys())
def list_format_presets(self) -> list[str]:
return list(self.formatter_presets.keys())
def _resolve_store_for_table(self, table_name: str):
"""
Resolve the DatagridStore for a given table name.
Used by FormulaEngine as the registry_resolver callback.
Args:
table_name: Full table name in ``"namespace.name"`` format.
Returns:
DatagridStore instance or None if not found.
"""
try:
as_fullname_dict = self._registry._get_entries_as_full_name_dict()
grid_id = as_fullname_dict.get(table_name)
if grid_id is None:
return None
datagrid = InstancesManager.get(self._session, grid_id, None)
if datagrid is None:
return None
return datagrid._df_store
except Exception:
return None
def get_style_presets(self) -> dict:
"""Get the global style presets."""
return self.style_presets
def get_formatter_presets(self) -> dict:
"""Get the global formatter presets."""
return self.formatter_presets
def get_formula_engine(self) -> FormulaEngine:
"""The FormulaEngine shared across all DataGrids in this session."""
return self._formula_engine
def add_style_preset(self, name: str, preset: dict):
"""
Add or update a style preset.
Args:
name: Preset name (e.g., "custom_highlight")
preset: Dict with CSS properties (e.g., {"background-color": "yellow", "color": "black"})
"""
self.style_presets[name] = preset
def add_formatter_preset(self, name: str, preset: dict):
"""
Add or update a formatter preset.
Args:
name: Preset name (e.g., "custom_currency")
preset: Dict with formatter config (e.g., {"type": "number", "prefix": "CHF ", "precision": 2})
"""
self.formatter_presets[name] = preset
def remove_style_preset(self, name: str):
"""Remove a style preset."""
if name in self.style_presets:
del self.style_presets[name]
def remove_formatter_preset(self, name: str):
"""Remove a formatter preset."""
if name in self.formatter_presets:
del self.formatter_presets[name]
# === UI ===
# ------------------------------------------------------------------
# UI
# ------------------------------------------------------------------
def mk_main_icons(self):
return Div(
mk.icon(folder_open20_regular, tooltip="Upload from source", command=self.commands.upload_from_source()),
mk.icon(table_add20_regular, tooltip="New grid", command=self.commands.new_grid()),
mk.icon(folder_open20_regular, tooltip="Upload from source",
command=self.commands.upload_from_source()),
mk.icon(table_add20_regular, tooltip="New grid",
command=self.commands.new_grid()),
cls="flex"
)
def _mk_tree(self):
conf = TreeViewConf(add_leaf=False, icons={"folder": "database20_regular", "excel": "table20_regular"})
conf = TreeViewConf(add_leaf=False,
icons={"folder": "database20_regular", "excel": "table20_regular"})
tree = TreeView(self, conf=conf, _id="-treeview")
for element in self._state.elements:
parent_id = tree.ensure_path(element.namespace, node_type="folder")

View File

@@ -293,6 +293,9 @@ class TreeView(MultipleInstance):
return self._state.items[node_id].bag
except KeyError:
return None
def get_state(self) -> TreeViewState:
return self._state
def _toggle_node(self, node_id: str):
"""Toggle expand/collapse state of a node."""

View File

@@ -1,10 +1,11 @@
from dataclasses import dataclass, field
from myfasthtml.core.constants import ColumnType, DATAGRID_DEFAULT_COLUMN_WIDTH, ViewType
from myfasthtml.core.constants import DATAGRID_DEFAULT_COLUMN_WIDTH, ViewType, ROW_SELECTION_ID, ColumnType
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
@dataclass
class DataGridRowState:
class DataGridRowUiState:
row_id: int
visible: bool = True
height: int | None = None
@@ -12,19 +13,32 @@ class DataGridRowState:
@dataclass
class DataGridColumnState:
col_id: str # name of the column: cannot be changed
col_index: int # index of the column in the dataframe: cannot be changed
title: str = None
type: ColumnType = ColumnType.Text
class DataGridColumnUiState:
"""UI presentation state of a DataGrid column.
Holds only the visual properties of a column. Keyed by col_id to match
the corresponding ColumnDefinition held by DataService.
Attributes:
col_id: Column identifier. Must match the col_id in ColumnDefinition.
width: Column width in pixels.
visible: Whether the column is displayed.
format: List of format rules applied during rendering.
"""
col_id: str
visible: bool = True
width: int = DATAGRID_DEFAULT_COLUMN_WIDTH
format: list = field(default_factory=list) #
formula: str = "" # formula expression for ColumnType.Formula columns
format: list = field(default_factory=list)
def copy(self):
props = {k: v for k, v in self.__dict__.items() if not k.startswith("_")}
return DataGridColumnState(**props)
def copy(self) -> "DataGridColumnUiState":
"""Return a shallow copy of this state."""
return DataGridColumnUiState(
col_id=self.col_id,
width=self.width,
visible=self.visible,
format=list(self.format),
)
@dataclass
@@ -56,4 +70,71 @@ class DataGridHeaderFooterConf:
class DatagridView:
name: str
type: ViewType = ViewType.Table
columns: list[DataGridColumnState] = None
columns: list[DataGridColumnUiState] = None
class DataGridColumnState:
def __init__(self, col_def: ColumnDefinition, col_ui_state: DataGridColumnUiState):
self._col_def = col_def
self._col_ui_state = col_ui_state
@property
def col_id(self):
return self._col_def.col_id
@property
def col_index(self):
return self._col_def.col_index
@property
def title(self):
return self._col_def.title
@property
def type(self):
return self._col_def.type
@property
def visible(self):
return self._col_ui_state.visible
@property
def width(self):
return self._col_ui_state.width
@property
def format(self):
return self._col_ui_state.format
class DataGridRowSelectionColumnState(DataGridColumnState):
def __init__(self):
super().__init__(None, None)
@property
def col_id(self):
return ROW_SELECTION_ID
@property
def col_index(self):
return -1
@property
def title(self):
return ""
@property
def type(self):
return ColumnType.RowSelection_
@property
def visible(self):
return True # not used
@property
def width(self):
return 24 # not used
@property
def format(self):
return ""

View File

@@ -37,75 +37,10 @@ class DataGridsRegistry(SingleInstance):
del all_entries[datagrid_id]
self._db_manager.save(DATAGRIDS_REGISTRY_ENTRY_KEY, all_entries)
def get_all_tables(self):
all_entries = self._get_all_entries()
return [f"{namespace}.{name}" for (namespace, name) in all_entries.values()]
def get_columns(self, table_name):
try:
as_fullname_dict = self._get_entries_as_full_name_dict()
grid_id = as_fullname_dict[table_name]
# load datagrid state
state_id = f"{grid_id}#state"
state = self._db_manager.load(state_id)
return [c.col_id for c in state["columns"]] if state else []
except KeyError:
return []
def get_column_values(self, table_name, column_name):
try:
as_fullname_dict = self._get_entries_as_full_name_dict()
grid_id = as_fullname_dict[table_name]
# load dataframe from dedicated store
store = self._db_manager.load(f"{grid_id}#df")
df = store["ne_df"] if store else None
return df[column_name].tolist() if df is not None else []
except KeyError:
return []
def get_row_count(self, table_name):
try:
as_fullname_dict = self._get_entries_as_full_name_dict()
grid_id = as_fullname_dict[table_name]
# load dataframe from dedicated store
store = self._db_manager.load(f"{grid_id}#df")
df = store["ne_df"] if store else None
return len(df) if df is not None else 0
except KeyError:
return 0
def get_column_type(self, table_name, column_name):
"""
Get the type of a column.
def get_all_entries(self) -> dict:
"""Return all registry entries as {datagrid_id: (namespace, name)}."""
return self._get_all_entries()
Args:
table_name: The DataGrid name
column_name: The column name
Returns:
ColumnType enum value or None if not found
"""
try:
as_fullname_dict = self._get_entries_as_full_name_dict()
grid_id = as_fullname_dict[table_name]
# load datagrid state
state_id = f"{grid_id}#state"
state = self._db_manager.load(state_id)
if state and "columns" in state:
for col in state["columns"]:
if col.col_id == column_name:
return col.type
return None
except KeyError:
return None
def _get_all_entries(self):
return {k: v for k, v in self._db_manager.load(DATAGRIDS_REGISTRY_ENTRY_KEY).items()
if not k.startswith("__")}

View File

@@ -0,0 +1,37 @@
from dataclasses import dataclass, field
from myfasthtml.core.constants import ColumnType
@dataclass
class ColumnDefinition:
"""Data semantics of a DataGrid column.
Holds the structural and computational properties of a column.
Does not contain any UI-related attributes (width, visibility, formatting).
Those are stored in ColumnUiState within DatagridState.
Attributes:
col_id: Unique identifier for the column. Cannot be changed after creation.
col_index: Index of the column in the DataFrame. -1 for virtual columns
(Formula, RowIndex).
title: Display title of the column.
type: Column data type, determines rendering and mutation behaviour.
formula: DSL expression for ColumnType.Formula columns. Empty string otherwise.
"""
col_id: str
col_index: int
title: str = None
type: ColumnType = ColumnType.Text
formula: str = ""
def copy(self) -> "ColumnDefinition":
"""Return a shallow copy of this definition."""
return ColumnDefinition(
col_id=self.col_id,
col_index=self.col_index,
title=self.title,
type=self.type,
formula=self.formula,
)

View File

@@ -0,0 +1,386 @@
import logging
from typing import Optional
import numpy as np
import pandas as pd
from myfasthtml.core.constants import ColumnType, ROW_INDEX_ID
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
from myfasthtml.core.dbmanager import DbObject
from myfasthtml.core.instances import MultipleInstance
from myfasthtml.core.utils import make_safe_id, make_unique_safe_id
logger = logging.getLogger(__name__)
_COLUMN_TYPE_DEFAULTS = {
ColumnType.Number: 0,
ColumnType.Text: "",
ColumnType.Bool: False,
ColumnType.Datetime: pd.NaT,
}
class DataStore(DbObject):
"""Persistent storage for a DataGrid's tabular data.
Holds the DataFrame and its derived caches used for rendering and formula
evaluation. Contains no business logic — all mutations are performed by
DataService.
Attributes:
ne_df: The pandas DataFrame. Source of truth for non-formula columns.
ns_fast_access: Dict mapping col_id to a numpy array. O(1) column
lookup used by FormulaEngine and rendering.
ns_row_data: List of row dicts built from ns_fast_access. Used by
FormattingEngine for rule evaluation.
ns_total_rows: Cached total row count after filtering.
"""
def __init__(self, owner, save_state: bool = True):
with self.initializing():
super().__init__(owner, name=f"{owner.get_id()}#store", save_state=save_state)
self.ne_df = None
self.ns_fast_access = None
self.ns_row_data = None
self.ns_total_rows = None
class DataServiceState(DbObject):
"""Persistent state for DataService.
Stores the column definitions and the table name associated with the
DataGrid. Persists across sessions via DbObject.
Attributes:
columns: Ordered list of column data definitions.
table_name: Fully qualified table name used by FormulaEngine
(format: "namespace.name" or "name").
"""
def __init__(self, owner, save_state: bool = True):
with self.initializing():
super().__init__(owner, name="#state", save_state=save_state)
self.columns: list[ColumnDefinition] = []
self.table_name: str = ""
class DataService(MultipleInstance):
"""Data companion to DataGrid.
Owns the DataStore and the list of ColumnDefinition objects for one
DataGrid. All data mutations go through this class. Holds a reference to
DataServicesManager to access the shared FormulaEngine.
This class can exist and operate independently of any rendering component.
Attributes:
_state: Persistent state (columns, table_name).
_store: Persistent storage (DataFrame, caches).
"""
def __init__(self, parent, _id: Optional[str] = None, save_state: bool = True):
super().__init__(parent, _id=_id)
self._state = DataServiceState(self, save_state=save_state)
self._store = DataStore(self, save_state=save_state)
self._init_store()
@property
def columns(self) -> list[ColumnDefinition]:
"""Return the list of column definitions."""
return self._state.columns
@property
def table_name(self) -> str:
"""Return the fully qualified table name used by FormulaEngine."""
return self._state.table_name
def set_table_name(self, table_name: str) -> None:
"""Update the table name (e.g. after a rename)."""
self._state.table_name = table_name
def get_store(self) -> DataStore:
"""Return the underlying DataStore."""
return self._store
def get_formula_engine(self):
"""Return the shared FormulaEngine from DataServicesManager."""
return self._parent.get_formula_engine()
# ------------------------------------------------------------------
# Data initialisation
# ------------------------------------------------------------------
def load_dataframe(self, df: pd.DataFrame, init_columns: bool = True) -> None:
"""Load a DataFrame into the store and initialise caches.
Args:
df: Source DataFrame. Column names are normalised to safe IDs.
init_columns: When True, build ColumnDefinition list from the
DataFrame columns and register any existing formula columns
with the FormulaEngine.
"""
if df is None:
return
df.columns = df.columns.map(make_safe_id)
self._store.ne_df = df
if init_columns:
self._state.columns = self._build_column_definitions(df)
self._state.save()
self._init_store()
# ------------------------------------------------------------------
# Mutations
# ------------------------------------------------------------------
def add_column(self, col_def: ColumnDefinition) -> None:
"""Add a new column to the DataGrid data layer.
Assigns a unique safe col_id from the title. For Formula and RowIndex
columns, no DataFrame column is created. For all other types, a column
with a type-appropriate default value is added to the DataFrame.
Args:
col_def: Column definition. col_id will be set by this method.
"""
col_def.col_id = make_unique_safe_id(
col_def.title, [c.col_id for c in self._state.columns]
)
if col_def.type == ColumnType.Formula:
col_def.col_index = -1
self._state.columns.append(col_def)
self._state.save()
return
if col_def.type == ColumnType.RowIndex:
col_def.col_index = -1
self._state.columns.append(col_def)
if self._store.ne_df is not None:
self._store.ns_fast_access[col_def.col_id] = (
self._store.ne_df.index.to_numpy()
)
self._state.save()
self._store.save()
return
default_value = _COLUMN_TYPE_DEFAULTS.get(col_def.type, "")
col_def.col_index = (
len(self._store.ne_df.columns) if self._store.ne_df is not None else 0
)
self._state.columns.append(col_def)
if self._store.ne_df is not None:
self._store.ne_df[col_def.col_id] = default_value
self._store.ns_fast_access[col_def.col_id] = (
self._store.ne_df[col_def.col_id].to_numpy()
)
for row_dict in self._store.ns_row_data:
row_dict[col_def.col_id] = default_value
self._state.save()
self._store.save()
self._mark_changed(col_def.col_id)
def add_row(self, row_data: Optional[dict] = None) -> None:
"""Append a new row with incremental cache updates.
Creates default values for all non-virtual columns when row_data is
not provided. Marks formula columns dirty so ensure_ready() will
recalculate them on the next render.
Args:
row_data: Optional dict of {col_id: value}. Defaults to
type-appropriate values for each column.
"""
if self._store.ne_df is None:
return
new_index = len(self._store.ne_df)
if row_data is None:
row_data = {}
for col in self._state.columns:
if col.type not in (ColumnType.Formula, ColumnType.RowSelection_):
value = (
new_index
if col.type == ColumnType.RowIndex
else _COLUMN_TYPE_DEFAULTS.get(col.type, "")
)
row_data[col.col_id] = value
self._store.ne_df.loc[new_index] = row_data
for col_id, value in row_data.items():
if col_id in self._store.ns_fast_access:
self._store.ns_fast_access[col_id] = np.append(
self._store.ns_fast_access[col_id], value
)
else:
self._store.ns_fast_access[col_id] = np.array([value])
self._store.ns_row_data.append(row_data.copy())
self._store.ns_total_rows = len(self._store.ne_df)
self._store.save()
self._mark_all_formula_columns_dirty()
def set_data(self, col_id: str, row_index: int, value) -> None:
"""Update a single cell value.
Updates the DataFrame, fast-access cache, and row data dict, then
marks dependent formula columns dirty.
Args:
col_id: Column identifier.
row_index: Zero-based row index.
value: New cell value.
"""
if self._store.ne_df is None:
return
self._store.ne_df.at[row_index, col_id] = value
if self._store.ns_fast_access and col_id in self._store.ns_fast_access:
self._store.ns_fast_access[col_id][row_index] = value
if self._store.ns_row_data and row_index < len(self._store.ns_row_data):
self._store.ns_row_data[row_index][col_id] = value
self._store.save()
self._mark_changed(col_id, rows=[row_index])
# ------------------------------------------------------------------
# Formula management
# ------------------------------------------------------------------
def register_formula(self, col_id: str, formula_text: str) -> None:
"""Register or update a formula for a column with the FormulaEngine.
Args:
col_id: Column identifier.
formula_text: DSL formula expression.
"""
engine = self.get_formula_engine()
if engine is None:
return
try:
engine.set_formula(self._state.table_name, col_id, formula_text)
except Exception as e:
logger.warning("Failed to register formula for %s.%s: %s",
self._state.table_name, col_id, e)
def remove_formula(self, col_id: str) -> None:
"""Remove a formula for a column from the FormulaEngine.
Args:
col_id: Column identifier.
"""
engine = self.get_formula_engine()
if engine is None:
return
engine.remove_formula(self._state.table_name, col_id)
def ensure_ready(self) -> None:
"""Recalculate dirty formula columns before rendering.
Called by DataGrid.mk_body_content_page() to ensure formula columns
are up-to-date. No-op when no columns are dirty.
"""
engine = self.get_formula_engine()
if engine is None:
return
engine.recalculate_if_needed(self._state.table_name, self._store)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _init_store(self):
df = self._store.ne_df
if df is None:
return
self._store.ns_fast_access = self._build_fast_access(df)
self._store.ns_row_data = df.to_dict(orient="records")
self._store.ns_total_rows = len(df)
self._store.save()
self._register_existing_formulas()
def _build_column_definitions(self, df: pd.DataFrame) -> list[ColumnDefinition]:
"""Build ColumnDefinition objects from DataFrame columns.
Args:
df: Source DataFrame with normalised column names.
Returns:
Ordered list of ColumnDefinition objects.
"""
return [
ColumnDefinition(
col_id=make_safe_id(col_id),
col_index=col_index,
title=col_id,
type=self._infer_column_type(df[make_safe_id(col_id)].dtype),
)
for col_index, col_id in enumerate(df.columns)
]
@staticmethod
def _infer_column_type(dtype) -> ColumnType:
"""Infer ColumnType from a pandas dtype."""
if pd.api.types.is_integer_dtype(dtype):
return ColumnType.Number
if pd.api.types.is_float_dtype(dtype):
return ColumnType.Number
if pd.api.types.is_bool_dtype(dtype):
return ColumnType.Bool
if pd.api.types.is_datetime64_any_dtype(dtype):
return ColumnType.Datetime
return ColumnType.Text
@staticmethod
def _build_fast_access(df: pd.DataFrame) -> dict:
"""Build ns_fast_access from a DataFrame.
Args:
df: Source DataFrame.
Returns:
Dict mapping col_id to numpy array, plus ROW_INDEX_ID.
"""
result = {col: df[col].to_numpy() for col in df.columns}
result[ROW_INDEX_ID] = df.index.to_numpy()
return result
def _register_existing_formulas(self) -> None:
"""Re-register all formula columns with the FormulaEngine."""
engine = self.get_formula_engine()
if engine is None:
return
for col_def in self._state.columns:
if col_def.formula:
self.register_formula(col_def.col_id, col_def.formula)
def _mark_changed(self, col_id: str, rows: Optional[list[int]] = None) -> None:
"""Notify FormulaEngine that a column's data has changed.
Args:
col_id: Changed column identifier.
rows: Optional list of changed row indices. None means all rows.
"""
engine = self.get_formula_engine()
if engine is None:
return
engine.mark_data_changed(self._state.table_name, col_id, rows)
def _mark_all_formula_columns_dirty(self) -> None:
"""Mark all formula columns dirty after a structural change (e.g. add_row)."""
engine = self.get_formula_engine()
if engine is None:
return
table = self._state.table_name
for col in self._state.columns:
if col.type == ColumnType.Formula and col.formula:
engine.mark_data_changed(table, col.col_id)

View File

@@ -0,0 +1,121 @@
import logging
from typing import Optional
from myfasthtml.core.data.DataService import DataService
from myfasthtml.core.formula.engine import FormulaEngine
from myfasthtml.core.instances import SingleInstance
logger = logging.getLogger(__name__)
class DataServicesManager(SingleInstance):
"""Session-scoped manager for all DataService instances.
Owns the shared FormulaEngine and acts as the single entry point for
creating and retrieving DataService instances. Provides the resolver
callback that allows the FormulaEngine to access any table's DataStore
by table name.
Access pattern (from any component):
manager = InstancesManager.get_by_type(session, DataServicesManager)
service = manager.get_service(grid_id)
"""
def __init__(self, parent=None, _id: Optional[str] = None):
if not getattr(self, "_is_new_instance", False):
return
super().__init__(parent, _id)
self._services: dict[str, DataService] = {}
self._formula_engine = FormulaEngine(registry_resolver=self._resolve_store_for_table)
# ------------------------------------------------------------------
# Service lifecycle
# ------------------------------------------------------------------
def create_service(self, table_name: str, _id=None, save_state: bool = True) -> DataService:
"""Create and register a new DataService for a DataGrid.
Called by DataGridsManager when a new grid is created.
Args:
table_name: Fully qualified table name ("namespace.name" or "name").
save_state: Whether to persist the DataService state to DB.
_id: Unique identifier of the DataGrid.
Returns:
The newly created DataService instance.
"""
service = DataService(self, _id=_id, save_state=save_state)
service.set_table_name(table_name)
self._services[service.get_id()] = service
logger.debug(f"DataService created for '{table_name}' (grid_id={service.get_id()})")
return service
def get_service(self, grid_id: str) -> Optional[DataService]:
"""Return the DataService for a given grid_id.
Args:
grid_id: Unique identifier of the DataGrid.
Returns:
DataService instance, or None if not found.
"""
return self._services.get(grid_id)
def restore_service(self, grid_id: str) -> Optional[DataService]:
"""Restore a DataService from persisted state on session restart.
Called by DataGrid on restart to re-attach its DataService.
The DataService state (columns, table_name) and DataStore (DataFrame)
are loaded from DB automatically via DbObject.
Args:
grid_id: Unique identifier of the DataGrid.
Returns:
The restored DataService instance.
"""
if grid_id in self._services:
return self._services[grid_id]
service = DataService(self, _id=grid_id)
self._services[grid_id] = service
logger.debug("DataService restored for grid_id=%s", grid_id)
return service
def remove_service(self, grid_id: str) -> None:
"""Unregister and discard a DataService.
Called by DataGridsManager when a grid is deleted.
Args:
grid_id: Unique identifier of the DataGrid.
"""
self._services.pop(grid_id, None)
logger.debug("DataService removed for grid_id=%s", grid_id)
# ------------------------------------------------------------------
# FormulaEngine
# ------------------------------------------------------------------
def get_formula_engine(self) -> FormulaEngine:
"""Return the shared FormulaEngine for this session."""
return self._formula_engine
def _resolve_store_for_table(self, table_name: str):
"""Resolve the DataStore for a given table name.
Used by FormulaEngine as the registry_resolver callback for
cross-table formula evaluation.
Args:
table_name: Fully qualified table name ("namespace.name").
Returns:
DataStore instance, or None if the table is not found.
"""
for service in self._services.values():
if service.table_name == table_name:
return service.get_store()
logger.warning(f"DataServicesManager: table '{table_name}' not found")
return None

View File

View File

@@ -1,89 +1,203 @@
"""
Metadata provider for DataGrid formatting DSL autocompletion.
Provides access to DataGrid metadata (columns, values, row counts)
for context-aware autocompletion.
Provides access to DataGrid metadata (columns, values, row counts, presets)
for context-aware autocompletion. Delegates live data queries to
DataServicesManager and holds global formatting presets.
"""
from typing import Any
import logging
from typing import Any, Optional
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.dsl.base_provider import BaseMetadataProvider
from myfasthtml.core.formatting.presets import DEFAULT_FORMATTER_PRESETS, DEFAULT_STYLE_PRESETS
from myfasthtml.core.instances import SingleInstance, InstancesManager
logger = logging.getLogger(__name__)
class DatagridMetadataProvider(BaseMetadataProvider):
"""
Protocol for providing DataGrid metadata to the autocompletion engine.
class DatagridMetadataProvider(SingleInstance, BaseMetadataProvider):
"""Concrete session-scoped metadata provider for DataGrid DSL engines.
Implementations must provide access to:
- Available DataGrids (tables)
- Column names for each DataGrid
- Distinct values for each column
- Row count for each DataGrid
- Style and format presets
Implements BaseMetadataProvider by delegating live data queries to
DataServicesManager. Also holds the global formatting presets and the
all_tables_formats rule applied to every table.
DataGrid names follow the pattern namespace.name (multi-level namespaces).
"""
def list_tables(self) -> list[str]:
Access pattern (from any component):
provider = InstancesManager.get_by_type(session, DatagridMetadataProvider)
Attributes:
style_presets: Dict of named style presets available in the DSL.
formatter_presets: Dict of named formatter presets available in the DSL.
all_tables_formats: Global format rules applied to all tables.
"""
Return the list of available DataGrid names.
Returns:
List of DataGrid names (e.g., ["app.orders", "app.customers"])
"""
...
def list_columns(self, table_name: str) -> list[str]:
"""
Return the column names for a specific DataGrid.
def __init__(self, parent=None, session: Optional[dict] = None,
_id: Optional[str] = None):
super().__init__(parent, session, _id)
with self.initializing():
self.style_presets: dict = DEFAULT_STYLE_PRESETS.copy()
self.formatter_presets: dict = DEFAULT_FORMATTER_PRESETS.copy()
self.all_tables_formats: list = []
Args:
table_name: The DataGrid name
# ------------------------------------------------------------------
# Table and column metadata — delegated to DataServicesManager
# ------------------------------------------------------------------
Returns:
List of column names (e.g., ["id", "amount", "status"])
"""
...
def list_column_values(self, table_name, column_name: str) -> list[Any]:
"""
Return the distinct values for a column in the current DataGrid.
def list_tables(self) -> list[str]:
"""Return the list of all registered table names.
This is used to suggest values in conditions like `value == |`.
Returns:
List of table names in "namespace.name" format.
"""
manager = self._get_data_services_manager()
if manager is None:
return []
return [s.table_name for s in manager._services.values() if s.table_name]
Args:
column_name: The column name
def list_columns(self, table_name: str) -> list[str]:
"""Return the column identifiers for a table.
Returns:
List of distinct values in the column
"""
...
def get_row_count(self, table_name: str) -> int:
"""
Return the number of rows in a DataGrid.
Args:
table_name: Fully qualified table name.
Used to suggest row indices for row scope and cell scope.
Returns:
List of col_id strings.
"""
service = self._get_service(table_name)
if service is None:
return []
return [c.col_id for c in service.columns]
Args:
table_name: The DataGrid name
def list_column_values(self, table_name: str, column_name: str) -> list[Any]:
"""Return the distinct values present in a column.
Returns:
Number of rows
"""
...
Args:
table_name: Fully qualified table name.
column_name: Column identifier.
def get_column_type(self, table_name: str, column_name: str):
"""
Return the type of a column.
Returns:
List of distinct values, empty list if not found.
"""
service = self._get_service(table_name)
if service is None:
return []
store = service.get_store()
if store.ne_df is None or column_name not in store.ne_df.columns:
return []
return store.ne_df[column_name].dropna().unique().tolist()
Used to filter suggestions based on column type.
def get_row_count(self, table_name: str) -> int:
"""Return the number of rows in a table.
Args:
table_name: The DataGrid name
column_name: The column name
Args:
table_name: Fully qualified table name.
Returns:
ColumnType enum value or None if not found
"""
...
Returns:
Row count, or 0 if not found.
"""
service = self._get_service(table_name)
if service is None:
return 0
store = service.get_store()
return store.ns_total_rows or 0
def get_column_type(self, table_name: str, column_name: str):
"""Return the ColumnType for a column.
Args:
table_name: Fully qualified table name.
column_name: Column identifier.
Returns:
ColumnType enum value, or None if not found.
"""
service = self._get_service(table_name)
if service is None:
return None
for col in service.columns:
if col.col_id == column_name:
return col.type
return None
# ------------------------------------------------------------------
# Preset metadata — held locally
# ------------------------------------------------------------------
def list_style_presets(self) -> list[str]:
"""Return the names of all registered style presets."""
return list(self.style_presets.keys())
def list_format_presets(self) -> list[str]:
"""Return the names of all registered formatter presets."""
return list(self.formatter_presets.keys())
def get_style_presets(self) -> dict:
"""Return the full style presets dict."""
return self.style_presets
def get_formatter_presets(self) -> dict:
"""Return the full formatter presets dict."""
return self.formatter_presets
def add_style_preset(self, name: str, preset: dict) -> None:
"""Add or update a named style preset.
Args:
name: Preset name.
preset: Style definition dict.
"""
self.style_presets[name] = preset
def add_formatter_preset(self, name: str, preset: dict) -> None:
"""Add or update a named formatter preset.
Args:
name: Preset name.
preset: Formatter definition dict.
"""
self.formatter_presets[name] = preset
def remove_style_preset(self, name: str) -> None:
"""Remove a style preset by name.
Args:
name: Preset name to remove.
"""
self.style_presets.pop(name, None)
def remove_formatter_preset(self, name: str) -> None:
"""Remove a formatter preset by name.
Args:
name: Preset name to remove.
"""
self.formatter_presets.pop(name, None)
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _get_data_services_manager(self) -> Optional[DataServicesManager]:
"""Return the DataServicesManager for this session."""
return InstancesManager.get_by_type(
self._session, DataServicesManager, default=None
)
def _get_service(self, table_name: str):
"""Return the DataService for a given table name.
Args:
table_name: Fully qualified table name.
Returns:
DataService instance, or None if not found.
"""
manager = self._get_data_services_manager()
if manager is None:
return None
for service in manager._services.values():
if service.table_name == table_name:
return service
return None

View File

@@ -75,7 +75,7 @@ FORMULA_GRAMMAR = r"""
where_clause: TABLE_COL_REF "=" COL_NAME
// TABLE_COL_REF matches "TableName.ColumnName" (dot-separated, no spaces)
TABLE_COL_REF: /[A-Za-z_][A-Za-z0-9_]*\.[A-Za-z_][A-Za-z0-9_]*/
TABLE_COL_REF: /[A-Za-z_][A-Za-z0-9_]*(\.[A-Za-z_][A-Za-z0-9_]*)+/
COL_NAME: /[A-Za-z_][A-Za-z0-9_ ]*/
// ==================== Functions ====================

View File

@@ -171,16 +171,16 @@ class FormulaTransformer(Transformer):
# ==================== References ====================
def cross_ref_simple(self, items):
"""{ Table.Column }"""
"""{ Table.Column } or { namespace.Table.Column }"""
table_col = str(items[0])
table, column = table_col.split(".", 1)
table, column = table_col.rsplit(".", 1)
return CrossTableRef(table=table, column=column)
def cross_ref_where(self, items):
"""{ Table.Column WHERE remote_table.remote_col = local_col }"""
table_col = str(items[0])
where = items[1]
table, column = table_col.split(".", 1)
table, column = table_col.rsplit(".", 1)
return CrossTableRef(table=table, column=column, where_clause=where)
def column_ref(self, items):
@@ -192,7 +192,7 @@ class FormulaTransformer(Transformer):
"""TABLE_COL_REF = COL_NAME"""
remote_table_col = str(items[0])
local_col = str(items[1]).strip()
remote_table, remote_col = remote_table_col.split(".", 1)
remote_table, remote_col = remote_table_col.rsplit(".", 1)
return WhereClause(
remote_table=remote_table,
remote_column=remote_col,

View File

@@ -12,6 +12,7 @@ from typing import Any, Callable, Optional
import numpy as np
from myfasthtml.core.dsl.exceptions import DSLSyntaxError
from .dataclasses import FormulaDefinition, WhereClause
from .dependency_graph import DependencyGraph
from .dsl.parser import get_parser
@@ -43,8 +44,8 @@ def parse_formula(text: str) -> FormulaDefinition | None:
parser = get_parser()
tree = parser.parse(text)
if tree is None:
return None
raise DSLSyntaxError(message=f"Formula could not be parsed: '{text}'")
transformer = FormulaTransformer()
formula = transformer.transform(tree)
formula.source_text = text

View File

@@ -0,0 +1,955 @@
import pandas as pd
import pytest
from fasthtml.components import Div, Script
from myfasthtml.controls.DataGrid import DataGrid, DatagridConf
from myfasthtml.controls.DataGridsManager import DataGridsManager
from myfasthtml.controls.TabsManager import TabsManager
from myfasthtml.core.constants import ColumnType
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.instances import InstancesManager
from myfasthtml.test.matcher import AnyValue, Contains, NoChildren, find, find_one, matches, TestLabel, TestObject
@pytest.fixture
def datagrids_manager(root_instance):
"""Create a DataGridsManager instance for testing."""
InstancesManager.reset()
TabsManager(root_instance) # just define it
return DataGridsManager(root_instance)
@pytest.fixture
def datagrid(datagrids_manager):
"""Create an empty DataGrid for testing."""
return DataGrid(datagrids_manager)
@pytest.fixture
def datagrid_with_data(datagrids_manager):
"""Create a DataGrid loaded with a sample DataFrame (name, age, active columns)."""
session = datagrids_manager._session
dsm = InstancesManager.get_by_type(session, DataServicesManager)
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"],
"age": [25, 30, 35],
"active": [True, False, True],
})
data_service = dsm.create_service("test.grid1", save_state=False)
data_service.load_dataframe(df)
grid_id = DataGrid.get_grid_id_from_data_service_id(data_service.get_id())
conf = DatagridConf(namespace="test", name="grid1")
return DataGrid(datagrids_manager, conf=conf, save_state=False, _id=grid_id)
@pytest.fixture
def datagrid_with_full_data(datagrids_manager):
"""Create a DataGrid covering all basic column types: Text, Number and Bool.
Designed to be extended with additional column types (Datetime, Enum, Formula)
as they are added to the test suite.
"""
session = datagrids_manager._session
dsm = InstancesManager.get_by_type(session, DataServicesManager)
df = pd.DataFrame({
"name": ["Alice", "Bob", "Charlie"], # Text
"age": [25, 30, 35], # Number
"active": [True, False, True], # Bool
})
data_service = dsm.create_service("test.full_types", save_state=False)
data_service.load_dataframe(df)
grid_id = DataGrid.get_grid_id_from_data_service_id(data_service.get_id())
conf = DatagridConf(namespace="test", name="full_types")
dg = DataGrid(datagrids_manager, conf=conf, save_state=False, _id=grid_id)
# Assign distinct widths (name=150, age=80, active=120) so tests that rely
# on column widths are not masked by a uniform default value of 100px.
for col_ui_state, width in zip(dg._state.columns, [150, 80, 120]):
col_ui_state.width = width
dg._init_columns()
return dg
@pytest.fixture
def datagrid_no_edition(datagrid_with_data):
"""DataGrid with edition disabled (no RowSelection column, no add-column button)."""
dg = datagrid_with_data
dg._settings.enable_edition = False
dg._init_columns()
return dg
class TestDataGridBehaviour:
def test_i_can_create_empty_datagrid(self, datagrids_manager):
dg = DataGrid(datagrids_manager)
assert dg is not None
# ------------------------------------------------------------------
# Table Name
# ------------------------------------------------------------------
def test_i_can_get_table_name_with_namespace(self, datagrids_manager):
"""Test that get_table_name() returns 'namespace.name' when a namespace is set.
The dot-separated format is required by FormulaEngine for cross-table
reference resolution.
"""
conf = DatagridConf(namespace="reports", name="sales")
dg = DataGrid(datagrids_manager, conf=conf)
assert dg.get_table_name() == "reports.sales"
def test_i_can_get_table_name_without_namespace(self, datagrids_manager):
"""Test that get_table_name() returns just the name when namespace is absent.
Grids without a namespace use the plain name, which is valid for
single-namespace applications.
"""
conf = DatagridConf(name="employees")
dg = DataGrid(datagrids_manager, conf=conf)
assert dg.get_table_name() == "employees"
# ------------------------------------------------------------------
# Element ID Generation
# ------------------------------------------------------------------
@pytest.mark.parametrize("mode, pos, expected_template", [
("row", (2, 5), "trow_{id}-5"),
("column", (2, 5), "tcol_{id}-2"),
("cell", (2, 5), "tcell_{id}-2-5"),
])
def test_i_can_get_element_id_from_pos(self, datagrid, mode, pos, expected_template):
"""Test that _get_element_id_from_pos generates the correct element ID per selection mode.
Why these cases matter:
- 'row' mode: ID references the row index (pos[1]) to target HTMX row updates.
- 'column' mode: ID references the column index (pos[0]) for column highlighting.
- 'cell' mode: ID includes both indices for precise cell targeting and navigation.
"""
result = datagrid._get_element_id_from_pos(mode, pos)
expected = expected_template.format(id=datagrid._id)
assert result == expected
def test_i_can_get_element_id_when_pos_is_none(self, datagrid):
"""Test that _get_element_id_from_pos returns None when position is None.
None position means no cell is selected; the caller must receive None
to know that no DOM element should be targeted.
"""
assert datagrid._get_element_id_from_pos("cell", None) is None
# ------------------------------------------------------------------
# Element ID Parsing
# ------------------------------------------------------------------
def test_i_can_get_pos_from_cell_element_id(self, datagrid):
"""Test that _get_pos_from_element_id correctly parses (col, row) from a cell ID.
The position tuple (col, row) is used for cell navigation and selection
state tracking. Correct parsing is required for keyboard navigation and
mouse selection to target the right cell.
"""
element_id = f"tcell_{datagrid._id}-3-7"
assert datagrid._get_pos_from_element_id(element_id) == (3, 7)
def test_i_can_get_pos_returns_none_for_non_cell_id(self, datagrid):
"""Test that _get_pos_from_element_id returns None for row IDs and None input.
Row and column IDs don't carry a (col, row) position. Returning None
signals that no cell-level position can be derived.
"""
assert datagrid._get_pos_from_element_id(f"trow_{datagrid._id}-5") is None
assert datagrid._get_pos_from_element_id(None) is None
# ------------------------------------------------------------------
# Static ID Conversions
# ------------------------------------------------------------------
def test_i_can_convert_grid_id_to_data_service_id_and_back(self, datagrid):
"""Test that the grid↔data-service ID conversion is a perfect round-trip.
DataGrid and DataService share the same UUID but use different prefixes.
The round-trip property ensures neither conversion loses information, so
DataGrid can always locate its companion DataService from its own ID.
"""
grid_id = datagrid.get_id()
data_service_id = DataGrid.get_data_service_id_from_data_grid_id(grid_id)
assert data_service_id != grid_id
assert DataGrid.get_grid_id_from_data_service_id(data_service_id) == grid_id
# ------------------------------------------------------------------
# Column Management
# ------------------------------------------------------------------
def test_i_can_move_column(self, datagrid_with_data):
"""Test that move_column reorders _state.columns correctly.
Column order in _state.columns drives both header and body rendering.
Moving the last column to the first position verifies the pop-then-insert
logic for the source-after-target case.
"""
dg = datagrid_with_data
initial_order = [c.col_id for c in dg._state.columns]
last_col_id = initial_order[-1]
first_col_id = initial_order[0]
dg.move_column(last_col_id, first_col_id)
new_order = [c.col_id for c in dg._state.columns]
assert new_order[0] == last_col_id
# init_columns keeps the order
dg._init_columns()
assert [c.col_id for c in dg._columns][1:] == new_order
def test_i_cannot_move_column_when_column_not_found(self, datagrid_with_data):
"""Test that move_column does not alter state when a col_id is unknown.
An unknown column ID must be silently ignored (logged as a warning) so
that stale JS events cannot corrupt the column order.
"""
dg = datagrid_with_data
initial_order = [c.col_id for c in dg._state.columns]
dg.move_column("nonexistent_col", initial_order[0])
assert [c.col_id for c in dg._state.columns] == initial_order
def test_i_can_move_column_to_same_position(self, datagrid_with_data):
"""Test that move_column is a no-op when source and target are the same column.
Dropping a column header back onto itself must not change the order,
preventing unnecessary state saves and re-renders.
"""
dg = datagrid_with_data
initial_order = [c.col_id for c in dg._state.columns]
first_col_id = initial_order[0]
dg.move_column(first_col_id, first_col_id)
assert [c.col_id for c in dg._state.columns] == initial_order
def test_i_can_set_column_width(self, datagrid_with_data):
"""Test that handle_set_column_width persists the new pixel width in state.
Column width in _state.columns is the source of truth for header and
body cell sizing. A correct update ensures the resized width survives
the next render cycle.
"""
dg = datagrid_with_data
col_id = dg._state.columns[0].col_id
dg.handle_set_column_width(col_id, "300")
col = next(c for c in dg._state.columns if c.col_id == col_id)
assert col.width == 300
# ------------------------------------------------------------------
# Column Width Calculation
# ------------------------------------------------------------------
def test_i_can_calculate_optimal_column_width_returns_default_for_unknown_column(self, datagrid):
"""Test that calculate_optimal_column_width returns 150 for an unknown column.
A safe default prevents layout breakage when a col_id from JS does not
match any known column (e.g. after a column was deleted mid-session).
"""
result = datagrid.calculate_optimal_column_width("nonexistent_col")
assert result == 150
@pytest.mark.parametrize("col_title, expected_width", [
("name", 86), # data dominates: max("name"=4, "Charlie"=7) → 7*8+30=86
("age", 54), # title dominates: max("age"=3, "35"=2) → 3*8+30=54
("active", 78), # title dominates: max("active"=6, "False"=5) → 6*8+30=78
])
def test_i_can_calculate_optimal_column_width_with_data(self, datagrid_with_data, col_title, expected_width):
"""Test that calculate_optimal_column_width returns the correct pixel width based on content.
The formula max(title_length, max_data_length) * 8 + 30 must produce the
right width for each column so that the auto-size feature fits all visible
content without truncation.
Why these cases matter:
- 'name' (data dominates): "Charlie" (7 chars) > "name" (4 chars) → 86px
- 'age' (title dominates): "age" (3 chars) > "35" (2 chars) → 54px
- 'active' (title dominates): "active" (6 chars) > "False" (5 chars) → 78px
"""
dg = datagrid_with_data
col = next(c for c in dg._columns if c.title == col_title)
result = dg.calculate_optimal_column_width(col.col_id)
assert result == expected_width
# ------------------------------------------------------------------
# Selection and Interaction
# ------------------------------------------------------------------
def test_i_can_on_key_pressed_esc_clears_selection(self, datagrid):
"""Test that pressing ESC resets both the focused cell and extra selections.
ESC is the standard 'deselect all' shortcut. Both selected and
extra_selected must be cleared so the grid visually deselects everything
and subsequent navigation starts from a clean state.
"""
dg = datagrid
dg._state.selection.selected = (1, 2)
dg._state.selection.extra_selected.append(("range", (0, 0, 2, 2)))
dg.on_key_pressed("esc", has_focus=True, is_inside=True)
assert dg._state.selection.selected is None
assert dg._state.selection.extra_selected == []
def test_i_can_on_click_outside_does_not_update_position(self, datagrid):
"""Test that a click outside the grid does not change the selected position.
Clicks on surrounding UI elements must not accidentally move the grid
cursor, so is_inside=False must be a complete no-op for selection state.
"""
dg = datagrid
dg._state.selection.selected = (1, 2)
dg.on_click("click", is_inside=False, cell_id=f"tcell_{dg._id}-1-2")
assert dg._state.selection.selected == (1, 2)
def test_i_can_on_click_on_cell_updates_position(self, datagrid):
"""Test that clicking a cell sets selection.selected to the cell's (col, row) position.
The selected position drives the focus highlight rendered by
mk_selection_manager. Correct parsing and storage ensures the right cell
is visually highlighted after each click.
"""
dg = datagrid
cell_id = f"tcell_{dg._id}-2-5"
dg.on_click("click", is_inside=True, cell_id=cell_id)
assert dg._state.selection.selected == (2, 5)
def test_i_can_change_selection_mode(self, datagrid):
"""Test that change_selection_mode updates selection_mode from the CycleStateControl.
The selection mode (row / column / cell) determines how clicks and
keyboard events interpret the selected position. The grid must store
the mode chosen by the cycle selector so rendering and JS handlers
receive a consistent value.
"""
dg = datagrid
assert dg._state.selection.selection_mode == "cell"
dg._selection_mode_selector.cycle_state()
dg.change_selection_mode()
assert dg._state.selection.selection_mode == "row"
dg._selection_mode_selector.cycle_state()
dg.change_selection_mode()
assert dg._state.selection.selection_mode == "column"
dg._selection_mode_selector.cycle_state()
dg.change_selection_mode()
assert dg._state.selection.selection_mode == "cell"
class TestDataGridRender:
# ------------------------------------------------------------------
# Global structure (UTR-11.1)
# ------------------------------------------------------------------
def test_no_data_layout_is_rendered(self, datagrid):
"""Test that DataGrid renders a placeholder when no DataFrame is loaded.
Why these elements matter:
- Div tag: render must return a valid element, not raise
- "No data to display !": gives clear feedback and prevents a crash
when the DataService has no loaded DataFrame
"""
html = datagrid.render()
assert matches(html, Div("No data to display !"))
def test_layout_is_rendered(self, datagrid_with_data):
"""Test that DataGrid renders all 4 main structural sections when data is loaded.
Why these elements matter:
- id=dg._id: root ID required for JS init (initDataGrid) and HTMX targeting
- cls Contains 'grid': CSS grid layout controls header/body row sizing
- child[0] Div: filter bar + toolbar icons
- child[1] DoNotCheck: Panel containing the scrollable table
- child[2] Script: initDataGrid call that activates JS behaviour
- child[3] DoNotCheck: Keyboard handler for in-grid shortcuts
"""
dg = datagrid_with_data
html = dg.render()
expected = Div(
Div(
TestObject("Query"), # filter bar
Div(), # toolbar icons
cls=Contains("flex"),
),
TestObject("Panel"), # Panel containing the table
Script(Contains("initDataGrid")), # initDataGrid script
Div(), # Mouse and Keyboard handler
id=dg._id,
cls=Contains("grid"),
)
assert matches(html, expected)
# ------------------------------------------------------------------
# Selection Manager
# ------------------------------------------------------------------
def test_i_can_render_selection_manager_with_no_selection(self, datagrid):
"""Test that the selection manager renders with no children when nothing is selected.
Why these elements matter:
- id=tsm_{id}: required by updateDatagridSelection JS to locate the manager
- NoChildren: an empty manager signals JS that nothing is selected,
preventing stale highlight artefacts between renders
"""
dg = datagrid
html = dg.mk_selection_manager()
expected = Div(
NoChildren(),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
def test_i_can_render_selection_manager_with_selected_cell(self, datagrid):
"""Test that the selection manager renders one focus element when a cell is selected.
Why these elements matter:
- Div child: each selected element becomes a child directive read by JS
- selection_type='focus': tells JS to apply the focus highlight style
- element_id: the cell DOM ID that JS will scroll to and highlight
"""
dg = datagrid
dg._state.selection.selected = (2, 5)
html = dg.mk_selection_manager()
expected = Div(
Div(selection_type="focus", element_id=f"tcell_{dg._id}-2-5"),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
@pytest.mark.parametrize("mode", ["row", "column", "range", None])
def test_i_can_render_selection_manager_selection_mode_attribute(self, datagrid, mode):
"""Test that the selection_mode attribute on the manager Div reflects the current mode.
Why this element matters:
- selection_mode attribute: read by updateDatagridSelection JS to know which
highlight strategy to apply (row stripe, column stripe, range rectangle, or
single cell focus). All four valid values must round-trip correctly.
"""
dg = datagrid
dg._state.selection.selection_mode = mode
html = dg.mk_selection_manager()
expected = Div(
id=f"tsm_{dg._id}",
selection_mode=f"{mode}",
)
assert matches(html, expected)
def test_i_can_render_extra_selected_row(self, datagrid):
"""Test that a row extra-selection entry renders as a Div with selection_type='row'.
Why these elements matter:
- selection_type='row': JS applies the row-stripe highlight to the entire row
- element_id: the DOM ID of the row element that JS will highlight
"""
dg = datagrid
row_element_id = f"trow_{dg._id}-3"
dg._state.selection.extra_selected.append(("row", row_element_id))
html = dg.mk_selection_manager()
expected = Div(
Div(selection_type="row", element_id=row_element_id),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
def test_i_can_render_extra_selected_column(self, datagrid):
"""Test that a column extra-selection entry renders as a Div with selection_type='column'.
Why these elements matter:
- selection_type='column': JS applies the column-stripe highlight to the entire column
- element_id: the DOM ID of the column header element that JS will highlight
"""
dg = datagrid
col_element_id = f"tcol_{dg._id}-2"
dg._state.selection.extra_selected.append(("column", col_element_id))
html = dg.mk_selection_manager()
expected = Div(
Div(selection_type="column", element_id=col_element_id),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
def test_i_can_render_extra_selected_range(self, datagrid):
"""Test that a range extra-selection entry renders with the tuple stringified as element_id.
Why these elements matter:
- selection_type='range': JS draws a rectangular highlight over the cell region
- element_id=str(tuple): the range bounds (min_col, min_row, max_col, max_row)
are passed as a string; JS parses this to locate all cells in the rectangle
"""
dg = datagrid
range_bounds = (0, 0, 2, 2)
dg._state.selection.extra_selected.append(("range", range_bounds))
html = dg.mk_selection_manager()
expected = Div(
Div(selection_type="range", element_id=f"{range_bounds}"),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
def test_i_can_render_selection_manager_with_multiple_extra_selected(self, datagrid):
"""Test that all extra_selected entries are rendered as individual children.
Why these elements matter:
- 3 children: each extra selection must produce exactly one Div child;
missing entries would cause JS to miss highlights, extra entries would
cause phantom highlights on unselected cells
"""
dg = datagrid
dg._state.selection.extra_selected.append(("row", f"trow_{dg._id}-1"))
dg._state.selection.extra_selected.append(("column", f"tcol_{dg._id}-0"))
dg._state.selection.extra_selected.append(("range", (1, 1, 3, 3)))
html = dg.mk_selection_manager()
children = find(html, Div(selection_type=AnyValue()))
assert len(children) == 3, "Each extra_selected entry must produce exactly one child Div"
def test_i_can_render_selection_manager_with_focus_and_extra_selected(self, datagrid):
"""Test that a focused cell and an extra selection are both rendered as children.
Why these elements matter:
- focus child: the primary selection must always be present when selected is set
- extra child: secondary selections must not replace the focus entry; both must
coexist so JS can apply focus highlight and secondary highlights simultaneously
"""
dg = datagrid
dg._state.selection.selected = (2, 5)
dg._state.selection.extra_selected.append(("row", f"trow_{dg._id}-5"))
html = dg.mk_selection_manager()
expected = Div(
Div(selection_type="focus", element_id=f"tcell_{dg._id}-2-5"),
Div(selection_type="row", element_id=f"trow_{dg._id}-5"),
id=f"tsm_{dg._id}",
)
assert matches(html, expected)
# ------------------------------------------------------------------
# Headers
# ------------------------------------------------------------------
def test_i_can_render_headers(self, datagrid_with_data):
"""Test that the header row renders with the correct ID and one cell per visible data column.
Why these elements matter:
- id=th_{id}: targeted by reset_column_width to swap the header after auto-size
- cls Contains 'dt2-header': CSS hook for header styling and sticky positioning
- 3 data cells: one resizable cell per data column; verifies all DataFrame
columns are represented (RowSelection and add-button columns excluded)
"""
dg = datagrid_with_data
html = dg.mk_headers()
# Step 1: Validate global header structure
expected = Div(
id=f"th_{dg._id}",
cls=Contains("dt2-header"),
)
assert matches(html, expected)
# Step 2: Count the visible data column headers
col_headers = find(html, Div(cls=Contains("dt2-cell", "dt2-resizable")))
assert len(col_headers) == 3, "Should have one resizable header cell per visible data column"
def test_i_can_render_row_selection_header_in_edition_mode(self, datagrid_with_data):
"""Test that a RowSelection header cell is rendered when edition mode is enabled.
Why these elements matter:
- dt2-row-selection: the selection checkbox column is only meaningful in edition
mode where rows can be individually selected for bulk operations; JS uses this
cell to anchor the row-selection toggle handler
- exactly 1 cell: a second dt2-row-selection would double the checkbox column
"""
dg = datagrid_with_data
html = dg.mk_headers()
row_sel_cells = find(html, Div(cls=Contains("dt2-row-selection")))
assert len(row_sel_cells) == 1, "Edition mode must render exactly one row-selection header cell"
def test_i_cannot_render_row_selection_header_without_edition_mode(self, datagrid_no_edition):
"""Test that no RowSelection header cell is rendered when edition mode is disabled.
Why this matters:
- Without edition, there is no row selection column in _columns; rendering one
would create an orphan cell misaligned with the body rows
"""
dg = datagrid_no_edition
html = dg.mk_headers()
row_sel_cells = find(html, Div(cls=Contains("dt2-row-selection")))
assert len(row_sel_cells) == 0, "Without edition mode, no row-selection header cell should be rendered"
def test_i_can_render_add_column_button_in_edition_mode(self, datagrid_with_data):
"""Test that the add-column button is appended to the header in edition mode.
Why this element matters:
- dt2-add-column: the '+' icon at the end of the header lets users add new
columns interactively; it must be present in edition mode and absent otherwise
to avoid exposing mutation UI in read-only grids
"""
dg = datagrid_with_data
html = dg.mk_headers()
add_col_cells = find(html, Div(cls=Contains("dt2-add-column")))
assert len(add_col_cells) == 1, "Edition mode must render exactly one add-column button"
def test_i_cannot_render_add_column_button_without_edition_mode(self, datagrid_no_edition):
"""Test that no add-column button is rendered when edition mode is disabled.
Why this matters:
- Read-only grids must not expose mutation controls; the absence of dt2-add-column
guarantees that the JS handler for toggling the column editor is never reachable
"""
dg = datagrid_no_edition
html = dg.mk_headers()
add_col_cells = find(html, Div(cls=Contains("dt2-add-column")))
assert len(add_col_cells) == 0, "Without edition mode, no add-column button should be rendered"
def test_i_can_render_headers_in_column_order(self, datagrid_with_data):
"""Test that resizable header cells appear in the same order as self._columns.
Why this matters:
- The header order drives column alignment with body rows; a mismatch between
header and body column order produces visually broken tables where header
labels do not correspond to the values beneath them
- data-col: JS drag-and-drop uses this attribute to identify the column being
moved; it must match the col_id in _columns for move_column to work correctly
"""
dg = datagrid_with_data
html = dg.mk_headers()
expected_order = [c.col_id for c in dg._columns if c.type != ColumnType.RowSelection_]
rendered_cells = find(html, Div(cls=Contains("dt2-cell", "dt2-resizable")))
rendered_order = [cell.attrs.get("data-col") for cell in rendered_cells]
assert rendered_order == expected_order, (
f"Header column order {rendered_order} does not match _columns order {expected_order}"
)
def test_i_can_render_hidden_column_is_not_rendered(self, datagrid_with_data):
"""Test that a hidden column produces no header cell.
Why this matters:
- Columns with visible=False must be completely absent from the header so that
body cell alignment is preserved; a hidden header cell would shift all
subsequent column headers one position to the right
"""
dg = datagrid_with_data
dg._state.columns[0].visible = False
dg._init_columns()
html = dg.mk_headers()
col_headers = find(html, Div(cls=Contains("dt2-cell", "dt2-resizable")))
assert len(col_headers) == 2, "Hiding one column must reduce the resizable header count from 3 to 2"
@pytest.mark.parametrize("col_title, expected_icon", [
("name", "text_field20_regular"), # Text column
("age", "number_row20_regular"), # Number column
("active", "checkbox_checked20_filled"), # Bool column
])
def test_i_can_render_header_cell_with_correct_icon_for_column_type(
self, datagrid_with_full_data, col_title, expected_icon):
"""Test that each column header renders the correct type icon in its label.
Why these elements matter:
- TestLabel(col_title, icon): the label inside _mk_header_name must carry the
right icon so the user can visually identify the column type at a glance
- data-col: used to locate the cell in the tree; ensures we test the right column
- Parametrization covers the three fundamental types (Text, Number, Bool) so
that adding a new type requires only a new row in the parametrize decorator
"""
dg = datagrid_with_full_data
col = next(c for c in dg._columns if c.title == col_title)
html = dg.mk_headers()
# Step 1: Find the header cell for this column
cell = find_one(html, Div(data_col=col.col_id, cls=Contains("dt2-cell", "dt2-resizable")))
# Step 2: Verify the header name section contains the correct icon
expected = Div(
Div(
TestLabel(col_title, icon=expected_icon),
),
)
assert matches(cell, expected)
@pytest.mark.parametrize("col_title, expected_width", [
("name", 150),
("age", 80),
("active", 120),
])
def test_i_can_render_header_cell_width_matches_state(
self, datagrid_with_full_data, col_title, expected_width):
"""Test that the style attribute of each header cell reflects its width in _columns.
Why these elements matter:
- style Contains 'width:{n}px': the inline style is the sole mechanism that
sizes each column; a mismatch between _columns state and the rendered style
would produce misaligned headers and body cells after a resize operation
- Distinct widths (150 / 80 / 120) in the fixture guarantee that a correct
match cannot happen by accident with a uniform default value
"""
dg = datagrid_with_full_data
col = next(c for c in dg._columns if c.title == col_title)
html = dg.mk_headers()
cell = find_one(html, Div(data_col=col.col_id, cls=Contains("dt2-cell", "dt2-resizable")))
assert matches(cell, Div(style=Contains(f"width:{expected_width}px")))
def test_i_can_render_header_resize_handle_has_correct_commands(self, datagrid_with_full_data):
"""Test that every resizable header cell contains a resize handle with the correct command IDs.
Why these elements matter:
- dt2-resize-handle: the DOM target for JS drag-to-resize; its absence completely
disables column resizing for the affected column
- data-command-id: JS fires this command on mouseup to persist the new width to
the server; an incorrect ID would silently discard every resize action
- data-reset-command-id: JS fires this command on double-click to auto-size the
column; an incorrect ID would break the double-click reset feature
- 3 handles: one per data column; a missing handle disables resize for that column
"""
dg = datagrid_with_full_data
resize_cmd_id = dg.commands.set_column_width().id
reset_cmd_id = dg.commands.reset_column_width().id
html = dg.mk_headers()
handles = find(html, Div(
cls=Contains("dt2-resize-handle"),
data_command_id=resize_cmd_id,
data_reset_command_id=reset_cmd_id,
))
assert len(handles) == 3, "Each data column must have exactly one resize handle with correct command IDs"
# ------------------------------------------------------------------
# Body
# ------------------------------------------------------------------
def test_i_can_render_body_wrapper(self, datagrid_with_data):
"""Test that the body wrapper renders with the correct ID and CSS class.
Why these elements matter:
- id=tb_{id}: targeted by get_page to append new rows during lazy loading
and by render_partial('body') to swap the whole body on filter/sort
- cls Contains 'dt2-body-container': CSS hook that enables the custom
JS scrollbar and overflow handling
"""
dg = datagrid_with_data
html = dg.mk_body_wrapper()
expected = Div(
id=f"tb_{dg._id}",
cls=Contains("dt2-body-container"),
)
assert matches(html, expected)
# ------------------------------------------------------------------
# Body rows
# ------------------------------------------------------------------
def test_i_can_render_body_row_count(self, datagrid_with_full_data):
"""Test that mk_body_content_page returns one row per DataFrame row plus the add-row button.
Why these elements matter:
- 3 rows: one per DataFrame row (Alice, Bob, Charlie); a missing row means data
was silently dropped during rendering
- 1 add-row button: required for edition mode; its absence disables row insertion
- total 4: the exact count prevents both missing and phantom rows
"""
dg = datagrid_with_full_data
rows = dg.mk_body_content_page(0)
assert len(rows) == 4, "Expected 3 data rows and 1 add-row button in edition mode"
def test_i_can_render_row_structure(self, datagrid_with_full_data):
"""Test that a rendered row carries the correct ID, CSS class and data-row attribute.
Why these elements matter:
- id=tr_{id}-{row_index}: targeted by handle_add_row (outerHTML swap) and lazy
loading; an incorrect ID breaks row-level HTMX updates
- cls Contains 'dt2-row': CSS hook for row styling and hover effects
- data_row: read by JS click handlers to identify which row was interacted with
"""
dg = datagrid_with_full_data
len_columns_1 = len(dg._columns) - 1
row = dg.mk_row(0, None, len_columns_1)
expected = Div(
id=f"tr_{dg._id}-0",
cls=Contains("dt2-row"),
data_row="0",
)
assert matches(row, expected)
def test_i_can_render_row_has_correct_cell_count(self, datagrid_with_full_data):
"""Test that each row renders exactly one cell per column in _columns.
Why this matters:
- cell count must equal column count: a mismatch shifts body cells out of
alignment with header cells, producing a visually broken table
- includes RowSelection_: the selection column must be present so the body
grid matches the header grid column by column
"""
dg = datagrid_with_full_data
len_columns_1 = len(dg._columns) - 1
row = dg.mk_row(0, None, len_columns_1)
assert len(row.children) == len(dg._columns), (
f"Row must have {len(dg._columns)} cells (one per column), got {len(row.children)}"
)
# ------------------------------------------------------------------
# Body cells
# ------------------------------------------------------------------
def test_i_can_render_body_cell_structure(self, datagrid_with_full_data):
"""Test that a data cell carries the correct ID, class, style and data-col attribute.
Why these elements matter:
- id=tcell_{id}-{col_pos}-{row_index}: parsed by _get_pos_from_element_id for
click/keyboard selection; an incorrect ID breaks cell-level navigation
- cls Contains 'dt2-cell': base CSS class for cell layout and borders
- style Contains 'width:{n}px': inline style that aligns the cell with its header;
a mismatch causes the column to appear misaligned after rendering
- data_col: read by JS resize drag to identify which column is being resized
"""
dg = datagrid_with_full_data
name_col = next(c for c in dg._columns if c.title == "name")
col_pos = dg._columns.index(name_col)
cell = dg.mk_body_cell(col_pos, 0, name_col, None, is_last=False)
expected = Div(
id=f"tcell_{dg._id}-{col_pos}-0",
cls=Contains("dt2-cell"),
style=Contains(f"width:{name_col.width}px"),
data_col=name_col.col_id,
)
assert matches(cell, expected)
def test_i_can_render_last_body_cell_has_dt2_last_cell_class(self, datagrid_with_full_data):
"""Test that the last cell in a row carries the dt2-last-cell class.
Why this element matters:
- dt2-last-cell: CSS hook that removes the right border on the final column
so the table edge looks clean; applied only to the last column to avoid
removing borders on intermediate cells
"""
dg = datagrid_with_full_data
last_col_pos = len(dg._columns) - 1
last_col_def = dg._columns[last_col_pos]
cell = dg.mk_body_cell(last_col_pos, 0, last_col_def, None, is_last=True)
assert matches(cell, Div(cls=Contains("dt2-cell", "dt2-last-cell")))
def test_i_can_render_body_cell_row_selection(self, datagrid_with_full_data):
"""Test that the RowSelection_ column renders a selection cell, not a data cell.
Why this element matters:
- dt2-row-selection: the DOM hook used by JS to toggle per-row checkboxes in
edition mode; it must not carry dt2-cell markup which would give it a data
cell appearance and break the selection column layout
"""
dg = datagrid_with_full_data
row_sel_col = dg._columns[0] # RowSelection_ is always inserted first
cell = dg.mk_body_cell(0, 0, row_sel_col, None, is_last=False)
assert matches(cell, Div(cls=Contains("dt2-row-selection")))
def test_i_cannot_render_body_cell_when_hidden(self, datagrid_with_full_data):
"""Test that mk_body_cell returns None for a column marked as not visible.
Why this matters:
- None return: the row renderer filters out None values so the hidden column
produces no DOM element; any non-None return would insert a phantom cell
and shift all subsequent cells one position to the right
"""
dg = datagrid_with_full_data
dg._state.columns[0].visible = False # hide "name" (first data column)
dg._init_columns()
hidden_col = next(c for c in dg._columns if c.title == "name")
col_pos = dg._columns.index(hidden_col)
cell = dg.mk_body_cell(col_pos, 0, hidden_col, None, is_last=False)
assert cell is None
# ------------------------------------------------------------------
# Body cell content
# ------------------------------------------------------------------
@pytest.mark.parametrize("col_title, expected_css_class, expected_value", [
("name", "dt2-cell-content-text", "Alice"),
("age", "dt2-cell-content-number", "25"),
("active", "dt2-cell-content-checkbox", None),
])
def test_i_can_render_body_cell_content_for_column_type(
self, datagrid_with_full_data, col_title, expected_css_class, expected_value):
"""Test that cell content carries the correct CSS class and value for each column type.
Why these elements matter:
- dt2-cell-content-text / number / checkbox: type-specific CSS classes that
control text alignment, font weight and boolean display; an incorrect class
makes numbers left-aligned or text right-aligned
- expected_value ('Alice', '25'): the actual DataFrame value must appear in the
rendered content so the cell is not empty or showing a wrong row
- Bool uses None for expected_value: the value is an icon, not a text string,
so only the wrapper class is verified
"""
dg = datagrid_with_full_data
col_def = next(c for c in dg._columns if c.title == col_title)
col_pos = dg._columns.index(col_def)
content = dg.mk_body_cell_content(col_pos, 0, col_def, None)
assert expected_css_class in str(content), (
f"Expected CSS class '{expected_css_class}' in cell content for column '{col_title}'"
)
if expected_value is not None:
assert expected_value in str(content), (
f"Expected value '{expected_value}' in cell content for column '{col_title}'"
)

View File

@@ -9,7 +9,7 @@ import pytest
from myfasthtml.controls.DataGrid import DataGrid
from myfasthtml.controls.DataGridFormattingEditor import DataGridFormattingEditor
from myfasthtml.controls.DataGridsManager import DataGridsManager
from myfasthtml.controls.datagrid_objects import DataGridColumnState, DataGridRowState
from myfasthtml.controls.datagrid_objects import DataGridColumnState, DataGridRowUiState
from myfasthtml.core.constants import ColumnType
from myfasthtml.core.formatting.dataclasses import FormatRule, Style
from myfasthtml.core.formatting.dsl.definition import FormattingDSL
@@ -38,8 +38,8 @@ def datagrid(manager):
# Add some rows
grid._state.rows = [
DataGridRowState(0),
DataGridRowState(1),
DataGridRowUiState(0),
DataGridRowUiState(1),
]
yield grid

View File

@@ -3,9 +3,11 @@ import shutil
import pytest
from myfasthtml.controls.DataGrid import DataGrid
from myfasthtml.controls.DataGridsManager import DataGridsManager
from myfasthtml.controls.TabsManager import TabsManager
from myfasthtml.controls.TreeView import TreeNode
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.instances import InstancesManager
from .conftest import root_instance
@@ -16,7 +18,7 @@ def cleanup_db():
@pytest.fixture
def datagrid_manager(root_instance):
def datagrids_manager(root_instance):
"""Create a DataGridsManager instance for testing."""
InstancesManager.reset()
TabsManager(root_instance) # just define it
@@ -26,7 +28,7 @@ def datagrid_manager(root_instance):
class TestDataGridsManagerBehaviour:
"""Tests for DataGridsManager behavior and logic."""
def test_i_can_create_new_grid_with_nothing_selected(self, datagrid_manager):
def test_i_can_create_new_grid_with_nothing_selected(self, datagrids_manager):
"""Test creating a new grid when no node is selected.
Verifies that:
@@ -35,10 +37,10 @@ class TestDataGridsManagerBehaviour:
- Node is selected and in edit mode
- Document definition is created
"""
result = datagrid_manager.new_grid()
datagrids_manager.handle_new_grid()
# Verify tree structure
tree = datagrid_manager._tree
tree = datagrids_manager._tree
assert len(tree._state.items) == 2, "Should have Untitled folder + Sheet1 node"
# Find the Untitled folder and Sheet1 node
@@ -55,13 +57,13 @@ class TestDataGridsManagerBehaviour:
assert tree._state.editing == sheet.id, "Sheet1 should be in edit mode"
# Verify document definition
assert len(datagrid_manager._state.elements) == 1, "Should have one document"
doc = datagrid_manager._state.elements[0]
assert len(datagrids_manager._state.elements) == 1, "Should have one document"
doc = datagrids_manager._state.elements[0]
assert doc.namespace == "Untitled"
assert doc.name == "Sheet1"
assert doc.type == "excel"
def test_i_can_create_new_grid_under_selected_folder(self, datagrid_manager):
def test_i_can_create_new_grid_under_selected_folder(self, datagrids_manager):
"""Test creating a new grid when a folder is selected.
Verifies that:
@@ -69,24 +71,24 @@ class TestDataGridsManagerBehaviour:
- Namespace matches folder name
"""
# Create a folder and select it
folder_id = datagrid_manager._tree.ensure_path("MyFolder")
datagrid_manager._tree._select_node(folder_id)
folder_id = datagrids_manager._tree.ensure_path("MyFolder")
datagrids_manager._tree._select_node(folder_id)
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
# Verify the new grid is under MyFolder
tree = datagrid_manager._tree
tree = datagrids_manager._tree
nodes = list(tree._state.items.values())
sheet = [n for n in nodes if n.label == "Sheet1"][0]
assert sheet.parent == folder_id, "Sheet1 should be under MyFolder"
# Verify document definition
doc = datagrid_manager._state.elements[0]
doc = datagrids_manager._state.elements[0]
assert doc.namespace == "MyFolder"
assert doc.name == "Sheet1"
def test_i_can_create_new_grid_under_selected_leaf_parent(self, datagrid_manager):
def test_i_can_create_new_grid_under_selected_leaf_parent(self, datagrids_manager):
"""Test creating a new grid when a leaf node is selected.
Verifies that:
@@ -94,62 +96,62 @@ class TestDataGridsManagerBehaviour:
- Not under the leaf itself
"""
# Create a folder with a leaf
folder_id = datagrid_manager._tree.ensure_path("MyFolder")
folder_id = datagrids_manager._tree.ensure_path("MyFolder")
leaf = TreeNode(label="ExistingSheet", type="excel", parent=folder_id)
datagrid_manager._tree.add_node(leaf, parent_id=folder_id)
datagrids_manager._tree.add_node(leaf, parent_id=folder_id)
# Select the leaf
datagrid_manager._tree._select_node(leaf.id)
datagrids_manager._tree._select_node(leaf.id)
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
# Verify the new grid is under MyFolder (not under ExistingSheet)
tree = datagrid_manager._tree
tree = datagrids_manager._tree
nodes = list(tree._state.items.values())
new_sheet = [n for n in nodes if n.label == "Sheet1"][0]
assert new_sheet.parent == folder_id, "Sheet1 should be under MyFolder (leaf's parent)"
assert new_sheet.parent != leaf.id, "Sheet1 should not be under the leaf"
def test_new_grid_generates_unique_sheet_names(self, datagrid_manager):
def test_new_grid_generates_unique_sheet_names(self, datagrids_manager):
"""Test that new_grid generates unique sequential sheet names.
Verifies Sheet1, Sheet2, Sheet3... generation.
"""
# Create first grid
datagrid_manager.new_grid()
assert datagrid_manager._state.elements[0].name == "Sheet1"
datagrids_manager.handle_new_grid()
assert datagrids_manager._state.elements[0].name == "Sheet1"
# Create second grid
datagrid_manager.new_grid()
assert datagrid_manager._state.elements[1].name == "Sheet2"
datagrids_manager.handle_new_grid()
assert datagrids_manager._state.elements[1].name == "Sheet2"
# Create third grid
datagrid_manager.new_grid()
assert datagrid_manager._state.elements[2].name == "Sheet3"
datagrids_manager.handle_new_grid()
assert datagrids_manager._state.elements[2].name == "Sheet3"
def test_new_grid_expands_parent_folder(self, datagrid_manager):
def test_new_grid_expands_parent_folder(self, datagrids_manager):
"""Test that creating a new grid automatically expands the parent folder.
Verifies parent is added to tree._state.opened.
"""
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
tree = datagrid_manager._tree
tree = datagrids_manager._tree
nodes = list(tree._state.items.values())
untitled = [n for n in nodes if n.label == "Untitled"][0]
# Verify parent is expanded
assert untitled.id in tree._state.opened, "Parent folder should be expanded"
def test_new_grid_selects_and_edits_new_node(self, datagrid_manager):
def test_new_grid_selects_and_edits_new_node(self, datagrids_manager):
"""Test that new grid node is both selected and in edit mode.
Verifies _state.selected and _state.editing are set to new node.
"""
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
tree = datagrid_manager._tree
tree = datagrids_manager._tree
nodes = list(tree._state.items.values())
sheet = [n for n in nodes if n.label == "Sheet1"][0]
@@ -159,17 +161,17 @@ class TestDataGridsManagerBehaviour:
# Verify edit mode
assert tree._state.editing == sheet.id, "New node should be in edit mode"
def test_new_grid_creates_document_definition(self, datagrid_manager):
def test_new_grid_creates_document_definition(self, datagrids_manager):
"""Test that new_grid creates a DocumentDefinition with correct fields.
Verifies document_id, namespace, name, type, tab_id, datagrid_id.
"""
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
# Verify document was created
assert len(datagrid_manager._state.elements) == 1, "Should have one document"
assert len(datagrids_manager._state.elements) == 1, "Should have one document"
doc = datagrid_manager._state.elements[0]
doc = datagrids_manager._state.elements[0]
# Verify all fields
assert doc.document_id is not None, "Should have document_id"
@@ -180,33 +182,34 @@ class TestDataGridsManagerBehaviour:
assert doc.tab_id is not None, "Should have tab_id"
assert doc.datagrid_id is not None, "Should have datagrid_id"
def test_new_grid_creates_datagrid_and_registers(self, datagrid_manager):
def test_new_grid_creates_datagrid_and_registers(self, datagrids_manager):
"""Test that new_grid creates a DataGrid and registers it.
Verifies DataGrid exists and is in registry with namespace.name.
"""
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
doc = datagrid_manager._state.elements[0]
doc = datagrids_manager._state.elements[0]
# Verify DataGrid is registered
tables = datagrid_manager._registry.get_all_tables()
assert "Untitled.Sheet1" in tables, "DataGrid should be registered as Untitled.Sheet1"
entries = datagrids_manager._registry.get_all_entries()
assert doc.datagrid_id in entries, "DataGrid should be registered by grid_id"
assert entries[doc.datagrid_id] == ("Untitled", "Sheet1"), "Registry entry should match namespace and name"
# Verify DataGrid exists in InstancesManager
from myfasthtml.core.instances import InstancesManager
datagrid = InstancesManager.get(datagrid_manager._session, doc.datagrid_id, None)
datagrid = InstancesManager.get(datagrids_manager._session, doc.datagrid_id, None)
assert datagrid is not None, "DataGrid instance should exist"
def test_new_grid_creates_tab_with_datagrid(self, datagrid_manager):
def test_new_grid_creates_tab_with_datagrid(self, datagrids_manager):
"""Test that new_grid creates a tab with correct label and content.
Verifies tab is created via TabsManager with DataGrid as content.
"""
result = datagrid_manager.new_grid()
result = datagrids_manager.handle_new_grid()
doc = datagrid_manager._state.elements[0]
tabs_manager = datagrid_manager._tabs_manager
doc = datagrids_manager._state.elements[0]
tabs_manager = datagrids_manager._tabs_manager
# Verify tab exists in TabsManager
assert doc.tab_id in tabs_manager._state.tabs, "Tab should exist in TabsManager"
@@ -215,47 +218,82 @@ class TestDataGridsManagerBehaviour:
tab_metadata = tabs_manager._state.tabs[doc.tab_id]
assert tab_metadata['label'] == "Sheet1", "Tab label should be Sheet1"
def test_generate_unique_sheet_name_with_no_children(self, datagrid_manager):
def test_i_can_access_data_service_after_new_grid(self, datagrids_manager):
"""Test that a DataService is accessible via datagrid_id after new_grid().
Why this matters:
- DataService is now created first and its id becomes the datagrid_id
- Verifies the DataService is properly registered in DataServicesManager
- Ensures the link between datagrid_id and DataService is established
"""
datagrids_manager.handle_new_grid()
doc = datagrids_manager._state.elements[0]
datagrid_id = doc.datagrid_id
dataservice_id = DataGrid.get_data_service_id_from_data_grid_id(datagrid_id)
dataservice_manager = InstancesManager.get_by_type(datagrids_manager._session, DataServicesManager)
service = dataservice_manager.get_service(dataservice_id)
assert service is not None, "DataService should be accessible via dataservice_id (taken from datagrid_id)"
def test_new_grid_data_service_has_correct_table_name(self, datagrids_manager):
"""Test that the DataService receives the correct table_name after new_grid().
Why this matters:
- table_name is now passed to create_service() before the DataGrid exists
- Verifies the namespace.name format is correctly built and assigned
"""
datagrids_manager.handle_new_grid()
doc = datagrids_manager._state.elements[0]
dataservice_manager = InstancesManager.get_by_type(datagrids_manager._session, DataServicesManager)
dataservice_id = DataGrid.get_data_service_id_from_data_grid_id(doc.datagrid_id)
data_service = dataservice_manager.get_service(dataservice_id)
assert data_service.table_name == "Untitled.Sheet1", "DataService table_name should be 'Untitled.Sheet1'"
def test_generate_unique_sheet_name_with_no_children(self, datagrids_manager):
"""Test _generate_unique_sheet_name on an empty folder.
Verifies it returns "Sheet1" when no children exist.
"""
folder_id = datagrid_manager._tree.ensure_path("EmptyFolder")
folder_id = datagrids_manager._tree.ensure_path("EmptyFolder")
name = datagrid_manager._generate_unique_sheet_name(folder_id)
name = datagrids_manager._generate_unique_sheet_name(folder_id)
assert name == "Sheet1", "Should generate Sheet1 for empty folder"
def test_generate_unique_sheet_name_with_existing_sheets(self, datagrid_manager):
def test_generate_unique_sheet_name_with_existing_sheets(self, datagrids_manager):
"""Test _generate_unique_sheet_name with existing sheets.
Verifies it generates the next sequential number.
"""
folder_id = datagrid_manager._tree.ensure_path("MyFolder")
folder_id = datagrids_manager._tree.ensure_path("MyFolder")
# Add Sheet1 and Sheet2 manually
sheet1 = TreeNode(label="Sheet1", type="excel", parent=folder_id)
sheet2 = TreeNode(label="Sheet2", type="excel", parent=folder_id)
datagrid_manager._tree.add_node(sheet1, parent_id=folder_id)
datagrid_manager._tree.add_node(sheet2, parent_id=folder_id)
datagrids_manager._tree.add_node(sheet1, parent_id=folder_id)
datagrids_manager._tree.add_node(sheet2, parent_id=folder_id)
name = datagrid_manager._generate_unique_sheet_name(folder_id)
name = datagrids_manager._generate_unique_sheet_name(folder_id)
assert name == "Sheet3", "Should generate Sheet3 when Sheet1 and Sheet2 exist"
def test_generate_unique_sheet_name_skips_gaps(self, datagrid_manager):
def test_generate_unique_sheet_name_skips_gaps(self, datagrids_manager):
"""Test _generate_unique_sheet_name fills gaps in sequence.
Verifies it generates Sheet2 when Sheet1 and Sheet3 exist (missing Sheet2).
"""
folder_id = datagrid_manager._tree.ensure_path("MyFolder")
folder_id = datagrids_manager._tree.ensure_path("MyFolder")
# Add Sheet1 and Sheet3 (skip Sheet2)
sheet1 = TreeNode(label="Sheet1", type="excel", parent=folder_id)
sheet3 = TreeNode(label="Sheet3", type="excel", parent=folder_id)
datagrid_manager._tree.add_node(sheet1, parent_id=folder_id)
datagrid_manager._tree.add_node(sheet3, parent_id=folder_id)
datagrids_manager._tree.add_node(sheet1, parent_id=folder_id)
datagrids_manager._tree.add_node(sheet3, parent_id=folder_id)
name = datagrid_manager._generate_unique_sheet_name(folder_id)
name = datagrids_manager._generate_unique_sheet_name(folder_id)
assert name == "Sheet2", "Should generate Sheet2 to fill the gap"

View File

View File

@@ -0,0 +1,44 @@
import shutil
import pytest
from dbengine.handlers import handlers
from myfasthtml.core.data.DataServicesManager import DataServicesManager
from myfasthtml.core.dbengine_utils import DataFrameHandler
from myfasthtml.core.dbmanager import DbManager
from myfasthtml.core.instances import SingleInstance, InstancesManager
@pytest.fixture(scope="session")
def session():
handlers.register_handler(DataFrameHandler())
return {
"user_info": {
"id": "test_tenant_id",
"email": "test@email.com",
"username": "test user",
"role": [],
}
}
@pytest.fixture
def parent(session):
instance = SingleInstance(session=session, _id="test_parent_id")
return instance
@pytest.fixture
def db_manager(parent):
shutil.rmtree("TestDb", ignore_errors=True)
db_manager_instance = DbManager(parent, root="TestDb", auto_register=True)
yield db_manager_instance
shutil.rmtree("TestDb", ignore_errors=True)
InstancesManager.reset()
@pytest.fixture
def dsm(parent, db_manager):
return DataServicesManager(parent, parent._session)

View File

@@ -0,0 +1,213 @@
"""Unit tests for DataService."""
import pandas as pd
import pytest
from myfasthtml.core.constants import ColumnType
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
class TestDataInitialisation:
"""Tests for the Data initialisation section of DataService."""
@pytest.fixture
def service(self, dsm):
return dsm.create_service("ns.tbl", save_state=False)
def test_i_can_load_a_dataframe(self, service):
"""load_dataframe() populates the store and column definitions."""
df = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
service.load_dataframe(df)
assert service.get_store().ne_df is not None
assert service.get_store().ns_total_rows == 2
assert len(service.columns) == 2
def test_i_can_load_an_empty_dataframe(self, service):
"""load_dataframe() with empty DataFrame sets total_rows to 0."""
service.load_dataframe(pd.DataFrame())
assert service.get_store().ns_total_rows == 0
assert service.columns == []
def test_i_can_load_dataframe_without_reinitializing_columns(self, service):
"""load_dataframe(init_columns=False) preserves existing column definitions."""
df = pd.DataFrame({"a": [1]})
service.load_dataframe(df)
original_columns = list(service.columns)
df2 = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
service.load_dataframe(df2, init_columns=False)
assert service.columns == original_columns
def test_i_can_load_none_dataframe_without_error(self, service):
"""load_dataframe(None) is a no-op and does not raise.
Why this matters:
- Early return on None protects against uninitialized callers.
- ne_df must remain None (no side effects on the store).
"""
service.load_dataframe(None)
assert service.get_store().ne_df is None
def test_i_can_load_dataframe_with_column_name_normalization(self, service):
"""load_dataframe() normalizes column names to safe IDs via make_safe_id.
Why this matters:
- Columns with spaces or special characters must be accessible as safe IDs.
- make_safe_id lowercases and replaces non-safe characters with underscores.
"""
df = pd.DataFrame({"First Name": ["Alice"], "Last Name": ["Smith"]})
service.load_dataframe(df)
col_ids = [c.col_id for c in service.columns]
assert col_ids == ["first_name", "last_name"]
class TestMutations:
"""Tests for the Mutations section of DataService."""
@pytest.fixture
def service(self, dsm):
svc = dsm.create_service("ns.mutations", save_state=False)
svc.load_dataframe(pd.DataFrame({"value": [1, 2, 3]}))
return svc
def test_i_can_add_a_row(self, service):
"""add_row() appends a row with default values and updates the caches."""
service.add_row()
assert service.get_store().ns_total_rows == 4
assert len(service.get_store().ne_df) == 4
def test_i_can_add_a_row_with_custom_data(self, service):
"""add_row() with explicit data stores the provided values."""
service.add_row(row_data={"value": 99})
assert service.get_store().ne_df.iloc[-1]["value"] == 99
def test_i_can_set_data(self, service):
"""set_data() updates the cell in the DataFrame, fast-access cache, and row data."""
service.set_data("value", 1, 99)
assert service.get_store().ne_df.at[1, "value"] == 99
assert service.get_store().ns_fast_access["value"][1] == 99
assert service.get_store().ns_row_data[1]["value"] == 99
@pytest.mark.parametrize("col_type, expected_default", [
(ColumnType.Text, ""),
(ColumnType.Number, 0),
(ColumnType.Bool, False),
(ColumnType.Datetime, pd.NaT),
(ColumnType.Choice, ""),
(ColumnType.Enum, ""),
(ColumnType.RowSelection_, ""),
])
def test_i_can_add_column_with_correct_default_value(self, service, col_type, expected_default):
"""add_column() creates a DataFrame column with the type-appropriate default value.
Why these assertions matter:
- col_id in ne_df.columns: Confirms the column is materialized in the DataFrame.
- len(columns) == 2: Confirms the column is registered in the metadata.
- default value: Each type has a specific sentinel value; wrong defaults corrupt data.
- pd.isna() for Datetime: pd.NaT does not support equality comparison.
"""
col_def = ColumnDefinition(col_id="__new__", col_index=-1, title="New Col", type=col_type)
service.add_column(col_def)
assert col_def.col_id in service.get_store().ne_df.columns
assert len(service.columns) == 2
actual = service.get_store().ne_df[col_def.col_id].iloc[0]
if pd.isna(expected_default):
assert pd.isna(actual)
else:
assert actual == expected_default
@pytest.mark.parametrize("col_type", [ColumnType.Formula, ColumnType.RowIndex])
def test_i_can_add_virtual_column_without_dataframe_column(self, service, col_type):
"""add_column() with virtual types does not create a DataFrame column.
Why these assertions matter:
- col_id not in ne_df.columns: Virtual columns are computed, not stored in the DataFrame.
- col_index == -1: Sentinel value marking virtual columns.
- len(columns) == 2: Column is registered in the state metadata despite being virtual.
"""
col_def = ColumnDefinition(col_id="__new__", col_index=-1, title="Virtual", type=col_type)
service.add_column(col_def)
assert col_def.col_id not in service.get_store().ne_df.columns
assert col_def.col_index == -1
assert len(service.columns) == 2
def test_i_can_add_row_without_loaded_dataframe_without_error(self, dsm):
"""add_row() is a no-op and does not raise when no DataFrame is loaded."""
service = dsm.create_service("ns.nodf_row", save_state=False)
service.add_row()
assert service.get_store().ne_df is None
def test_i_can_set_data_without_loaded_dataframe_without_error(self, dsm):
"""set_data() is a no-op and does not raise when no DataFrame is loaded."""
service = dsm.create_service("ns.nodf_set", save_state=False)
service.set_data("x", 0, 42)
assert service.get_store().ne_df is None
class TestFormulaManagement:
"""Tests for the Formula management section of DataService."""
@pytest.fixture
def service(self, dsm):
svc = dsm.create_service("ns.formula", save_state=False)
svc.load_dataframe(pd.DataFrame({"a": [1, 2, 3]}))
return svc
def test_i_can_get_table_name(self, service):
"""table_name property returns the value set at creation."""
assert service.table_name == "ns.formula"
def test_i_can_update_table_name(self, service):
"""set_table_name() updates the table name."""
service.set_table_name("ns.new_name")
assert service.table_name == "ns.new_name"
def test_i_can_register_formula(self, service):
"""register_formula() registers a formula in the shared FormulaEngine.
Why these assertions matter:
- has_formula: Confirms the formula was registered in the engine's DAG.
- get_formula_text: Confirms the source expression is stored as-is.
"""
service.register_formula("computed", "{a} + 1")
engine = service.get_formula_engine()
assert engine.has_formula("ns.formula", "computed")
assert engine.get_formula_text("ns.formula", "computed") == "{a} + 1"
def test_i_can_remove_formula(self, service):
"""remove_formula() unregisters a formula from the FormulaEngine."""
service.register_formula("computed", "{a} + 1")
service.remove_formula("computed")
engine = service.get_formula_engine()
assert not engine.has_formula("ns.formula", "computed")
def test_i_cannot_register_invalid_formula(self, service):
"""register_formula() with invalid DSL syntax does not register the formula.
Why this matters:
- parse_formula() raises DSLSyntaxError when it cannot parse the expression.
- register_formula() catches the exception to protect the caller, but the
formula must remain absent from the engine — not silently removed.
"""
service.register_formula("computed", "invalid syntax without braces")
engine = service.get_formula_engine()
assert not engine.has_formula("ns.formula", "computed")

View File

@@ -0,0 +1,210 @@
"""Integration tests for DataService formula evaluation through DataServicesManager.
These tests exercise the full stack: DataServicesManager owns the FormulaEngine
and provides the registry_resolver that enables cross-table formula resolution.
Each test uses real DataService instances and real DataStore objects — no fakes.
"""
import pytest
import pandas as pd
from myfasthtml.core.constants import ColumnType
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
class TestIntraTableFormula:
"""Single-table formula evaluation through the DSM/DataService stack."""
@pytest.fixture
def service(self, dsm):
svc = dsm.create_service("ns.sales", save_state=False)
svc.load_dataframe(pd.DataFrame({"price": [10, 20, 30], "qty": [2, 3, 4]}))
return svc
def test_i_can_evaluate_formula_on_single_table(self, service):
"""register_formula() + ensure_ready() computes the column for all rows.
Why these assertions matter:
- ns_fast_access["total"]: ensure_ready() writes results to the cache used by rendering.
- All three rows: verifies the formula is applied to every row, not just the first.
"""
service.register_formula("total", "{price} * {qty}")
service.ensure_ready()
result = service.get_store().ns_fast_access["total"]
assert result[0] == 20
assert result[1] == 60
assert result[2] == 120
def test_i_can_reevaluate_formula_after_data_change(self, service):
"""set_data() marks dependent formula columns dirty; ensure_ready() recomputes them.
Why these assertions matter:
- result[0] updated: confirms the dirty flag propagated and row 0 was recomputed.
- result[1] unchanged: confirms only affected rows are recomputed (no unnecessary work).
"""
service.register_formula("total", "{price} * {qty}")
service.ensure_ready()
service.set_data("price", 0, 100)
service.ensure_ready()
result = service.get_store().ns_fast_access["total"]
assert result[0] == 200
assert result[1] == 60
class TestCrossTableFormula:
"""Cross-table formula resolution via DataServicesManager.registry_resolver.
Table names use namespace notation (e.g. "ns.products"). The DSL grammar
now supports multiple dots in TABLE_COL_REF; the transformer splits on the
last dot to separate the table name from the column name.
"""
@pytest.fixture
def orders_service(self, dsm):
svc = dsm.create_service("ns.orders", save_state=False)
svc.load_dataframe(pd.DataFrame({"qty": [2, 3]}))
return svc
@pytest.fixture
def products_service(self, dsm):
svc = dsm.create_service("ns.products", save_state=False)
svc.load_dataframe(pd.DataFrame({"price": [10, 20]}))
return svc
def test_i_can_evaluate_cross_table_formula(self, orders_service, products_service):
"""A formula in one service can reference a namespaced table from another service.
Why these assertions matter:
- result[0] and result[1]: confirms the registry_resolver resolved "ns.products"
correctly and combined its data with the orders data row by row.
"""
orders_service.register_formula("total", "{ns.products.price} * {qty}")
orders_service.ensure_ready()
result = orders_service.get_store().ns_fast_access["total"]
assert result[0] == 20
assert result[1] == 60
def test_i_cannot_resolve_cross_table_formula_for_unknown_table(self, orders_service):
"""A formula referencing an unregistered table resolves to None without raising.
Why this matters:
- result is None: confirms the engine degrades gracefully when the resolver
returns None, instead of raising or producing corrupt values.
"""
orders_service.register_formula("total", "{ns.unknown_table.price} * {qty}")
orders_service.ensure_ready()
result = orders_service.get_store().ns_fast_access["total"]
assert result[0] is None
assert result[1] is None
class TestCrossTableFormulaWhere:
"""Cross-table formula resolution using an explicit WHERE clause.
The WHERE clause scans the remote table for a row where remote_column == local_value,
enabling correct lookups regardless of row ordering between tables.
"""
@pytest.fixture
def orders_service(self, dsm):
svc = dsm.create_service("ns.orders", save_state=False)
svc.load_dataframe(pd.DataFrame({"product_id": [2, 1], "qty": [3, 5]}))
return svc
@pytest.fixture
def products_service(self, dsm):
svc = dsm.create_service("ns.products", save_state=False)
svc.load_dataframe(pd.DataFrame({"product_id": [1, 2], "price": [10, 20]}))
return svc
def test_i_can_lookup_value_with_where_clause_non_sequential(self, orders_service, products_service):
"""WHERE resolves the correct remote row even when tables are not aligned by position.
Why these assertions matter:
- result[0] == 60: order row 0 has product_id=2, products row 1 has price=20 → 20*3=60.
- result[1] == 50: order row 1 has product_id=1, products row 0 has price=10 → 10*5=50.
Row-index fallback would return 10*3=30 and 20*5=100 — both wrong.
"""
orders_service.register_formula(
"total",
"{ns.products.price where ns.products.product_id = product_id} * {qty}"
)
orders_service.ensure_ready()
result = orders_service.get_store().ns_fast_access["total"]
assert result[0] == 60
assert result[1] == 50
def test_i_can_lookup_returns_none_when_no_match(self, orders_service, products_service):
"""WHERE returns None when the local value has no matching row in the remote table.
Why this matters:
- result[0] is None: product_id=2 exists in products, but product_id=99 does not.
- No exception is raised: the engine must degrade gracefully on missing lookups.
"""
orders_service_no_match = orders_service
orders_service_no_match.get_store().ns_fast_access["product_id"][0] = 99
orders_service_no_match.register_formula(
"total",
"{ns.products.price where ns.products.product_id = product_id} * {qty}"
)
orders_service_no_match.ensure_ready()
result = orders_service_no_match.get_store().ns_fast_access["total"]
assert result[0] is None
class TestFormulaLifecycle:
"""End-to-end formula lifecycle: column creation, registration, and evaluation."""
@pytest.fixture
def service(self, dsm):
svc = dsm.create_service("ns.lifecycle", save_state=False)
svc.load_dataframe(pd.DataFrame({"a": [1, 2, 3]}))
return svc
def test_i_can_add_formula_column_and_evaluate(self, service):
"""add_column(Formula) + register_formula() + ensure_ready() produces computed values.
Why these assertions matter:
- col_id in ns_fast_access: ensure_ready() must write the formula column into the cache.
- Values [2, 4, 6]: validates the formula expression is correctly applied to all rows.
"""
col_def = ColumnDefinition(col_id="__new__", col_index=-1,
title="Doubled", type=ColumnType.Formula,
formula="{a} * 2")
service.add_column(col_def)
service.register_formula(col_def.col_id, col_def.formula)
service.ensure_ready()
result = service.get_store().ns_fast_access[col_def.col_id]
assert list(result) == [2, 4, 6]
def test_i_can_evaluate_formula_after_adding_row(self, service):
"""add_row() marks formula columns dirty; ensure_ready() computes the new row.
Why these assertions matter:
- len(result) == 4: confirms the new row was appended and the cache extended.
- result[3] == 20: confirms the formula was recalculated for the new row (a=10, * 2).
- result[0] == 2: confirms existing rows are not corrupted by the recalculation.
"""
col_def = ColumnDefinition(col_id="__new__", col_index=-1,
title="Doubled", type=ColumnType.Formula,
formula="{a} * 2")
service.add_column(col_def)
service.register_formula(col_def.col_id, col_def.formula)
service.ensure_ready()
service.add_row(row_data={"a": 10})
service.ensure_ready()
result = service.get_store().ns_fast_access[col_def.col_id]
assert len(result) == 4
assert result[0] == 2
assert result[3] == 20

View File

@@ -0,0 +1,121 @@
"""Unit tests for DataServicesManager."""
import pandas as pd
class TestDataServicesManagerServiceLifecycle:
def test_i_can_create_a_service(self, dsm):
"""create_service() returns a DataService accessible by grid_id."""
service = dsm.create_service("ns.tbl", save_state=False)
assert service is not None
assert service.get_id() is not None
assert dsm.get_service(service.get_id()) is service
def test_i_can_create_service_with_correct_table_name(self, dsm):
"""create_service() sets the table_name on the returned DataService.
create_service() calls service.set_table_name() internally.
This test verifies the side effect is applied before returning the service.
"""
service = dsm.create_service("ns.my_table", save_state=False)
assert service.table_name == "ns.my_table"
def test_i_can_create_service_forcing_the_id(self, dsm):
"""create_service() sets the table_name on the returned DataService.
create_service() calls service.set_table_name() internally.
This test verifies the side effect is applied before returning the service.
"""
service = dsm.create_service("ns.my_table", _id="grid_id", save_state=False)
assert service.get_id() == "grid_id"
def test_i_can_get_a_service_by_grid_id(self, dsm):
"""get_service() returns the correct service."""
svc1 = dsm.create_service("ns.t1", _id="g1", save_state=False)
svc2 = dsm.create_service("ns.t2", _id="g2", save_state=False)
assert dsm.get_service("g1") is svc1
assert dsm.get_service("g2") is svc2
def test_i_cannot_get_a_nonexistent_service(self, dsm):
"""get_service() returns None for unknown grid_id."""
assert dsm.get_service("does_not_exist") is None
def test_i_can_remove_a_service(self, dsm):
"""remove_service() unregisters the service."""
service = dsm.create_service("ns.rm", save_state=False)
dsm.remove_service(service.get_id())
assert dsm.get_service(service.get_id()) is None
def test_i_can_remove_a_nonexistent_service_without_error(self, dsm):
"""remove_service() on unknown grid_id does not raise."""
dsm.remove_service("ghost") # should not raise
def test_i_can_restore_a_service(self, dsm):
"""restore_service() creates and registers a service if not already present."""
service = dsm.restore_service("grid_restore")
assert service is not None
assert dsm.get_service("grid_restore") is service
assert service.get_id() == "grid_restore"
def test_i_can_restore_existing_service(self, dsm):
"""restore_service() returns the existing service when already registered."""
original = dsm.create_service("ns.e", _id="grid_exist", save_state=False)
restored = dsm.restore_service("grid_exist")
assert restored is original
class TestDataServicesManagerFormulaEngine:
def test_i_can_get_formula_engine(self, dsm):
"""get_formula_engine() returns the shared FormulaEngine instance."""
engine = dsm.get_formula_engine()
assert engine is not None
def test_i_can_verify_shared_formula_engine(self, dsm):
"""All services share the same FormulaEngine from DataServicesManager."""
svc1 = dsm.create_service("ns.fe1", save_state=False)
svc2 = dsm.create_service("ns.fe2", save_state=False)
assert svc1.get_formula_engine() is svc2.get_formula_engine()
assert svc1.get_formula_engine() is dsm.get_formula_engine()
def test_i_can_resolve_store_by_table_name(self, dsm):
"""FormulaEngine resolver finds the DataStore for a given table name."""
service = dsm.create_service("ns.resolver", save_state=False)
df = pd.DataFrame({"a": [1, 2]})
service.load_dataframe(df)
store = dsm._resolve_store_for_table("ns.resolver")
assert store is service.get_store()
def test_i_can_resolve_correct_store_among_multiple_services(self, dsm):
"""_resolve_store_for_table() identifies the right store when multiple services are registered.
The resolver iterates over all registered services and must return the store
whose service has a matching table_name, not another service's store.
"""
svc_a = dsm.create_service("ns.table_a", save_state=False)
svc_b = dsm.create_service("ns.table_b", save_state=False)
df = pd.DataFrame({"x": [10, 20]})
svc_a.load_dataframe(df)
svc_b.load_dataframe(df.copy())
store_a = dsm._resolve_store_for_table("ns.table_a")
store_b = dsm._resolve_store_for_table("ns.table_b")
assert store_a is svc_a.get_store()
assert store_b is svc_b.get_store()
assert store_a is not store_b
def test_i_cannot_resolve_unknown_table(self, dsm):
"""FormulaEngine resolver returns None for an unknown table name."""
result = dsm._resolve_store_for_table("unknown.table")
assert result is None

View File

@@ -2,9 +2,7 @@ import shutil
import pytest
from dbengine.handlers import handlers
from pandas import DataFrame
from myfasthtml.controls.DataGrid import DataGrid, DatagridConf
from myfasthtml.core.DataGridsRegistry import DataGridsRegistry, DATAGRIDS_REGISTRY_ENTRY_KEY
from myfasthtml.core.dbengine_utils import DataFrameHandler
from myfasthtml.core.dbmanager import DbManager
@@ -31,7 +29,6 @@ def session():
@pytest.fixture
def parent(session):
instance = SingleInstance(session=session, _id="test_parent_id")
instance.get_formula_engine = lambda: None
return instance
@@ -39,76 +36,60 @@ def parent(session):
def db_manager(parent):
shutil.rmtree("TestDb", ignore_errors=True)
db_manager_instance = DbManager(parent, root="TestDb", auto_register=True)
yield db_manager_instance
shutil.rmtree("TestDb", ignore_errors=True)
InstancesManager.reset()
@pytest.fixture
def dg(parent):
# the table must be created
data = {"name": ["john", "jane"], "id": [1, 2]}
df = DataFrame(data)
dgc = DatagridConf("namespace", "table_name")
datagrid = DataGrid(parent, conf=dgc, save_state=True)
datagrid.init_from_dataframe(df, init_state=True)
yield datagrid
datagrid.dispose()
@pytest.fixture
def dgr(parent, db_manager):
return DataGridsRegistry(parent)
def test_entry_is_created_at_startup(db_manager, dgr, ):
def test_i_can_create_registry_with_empty_state(db_manager, dgr):
"""Registry is initialised with an empty dict in DB."""
assert db_manager.exists_entry(DATAGRIDS_REGISTRY_ENTRY_KEY)
assert clean_db_object(db_manager.load(DATAGRIDS_REGISTRY_ENTRY_KEY)) == {}
def test_i_can_put_a_table_in_registry(dgr):
def test_i_can_put_and_retrieve_entries(dgr):
"""put() persists entries retrievable via get_all_entries()."""
dgr.put("namespace", "name", "datagrid_id")
dgr.put("namespace2", "name2", "datagrid_id2")
assert dgr.get_all_tables() == ["namespace.name", "namespace2.name2"]
entries = dgr.get_all_entries()
assert "datagrid_id" in entries
assert "datagrid_id2" in entries
assert entries["datagrid_id"] == ("namespace", "name")
assert entries["datagrid_id2"] == ("namespace2", "name2")
def test_i_can_columns_names_for_a_table(dgr, dg):
expected = ["__row_index__", "name", "id"] if dg.get_state().row_index else ["name", "id"]
namespace, name = dg.get_settings().namespace, dg.get_settings().name
dgr.put(namespace, name, dg.get_id())
table_full_name = f"{namespace}.{name}"
assert dgr.get_columns(table_full_name) == expected
def test_i_can_remove_an_entry(dgr):
"""remove() deletes the entry from the registry."""
dgr.put("ns", "tbl", "grid_1")
assert "grid_1" in dgr.get_all_entries()
dgr.remove("grid_1")
assert "grid_1" not in dgr.get_all_entries()
def test_i_can_get_columns_values(dgr, dg):
namespace, name = dg.get_settings().namespace, dg.get_settings().name
dgr.put(namespace, name, dg.get_id())
table_full_name = f"{namespace}.{name}"
assert dgr.get_column_values(table_full_name, "name") == ["john", "jane"]
def test_i_can_remove_nonexistent_entry_without_error(dgr):
"""remove() on a missing id does not raise."""
dgr.remove("does_not_exist") # should not raise
def test_i_can_get_row_count(dgr, dg):
namespace, name = dg.get_settings().namespace, dg.get_settings().name
dgr.put(namespace, name, dg.get_id())
table_full_name = f"{namespace}.{name}"
assert dgr.get_row_count(table_full_name) == 2
def test_i_can_put_multiple_entries_and_get_all(dgr):
"""get_all_entries() returns all registered grids."""
dgr.put("ns1", "t1", "id1")
dgr.put("ns2", "t2", "id2")
dgr.put("ns3", "t3", "id3")
entries = dgr.get_all_entries()
assert len(entries) == 3
def test_i_can_manage_when_table_name_does_not_exist(dgr):
assert dgr.get_columns("namespace.name") == []
assert dgr.get_row_count("namespace.name") == 0
def test_i_can_manage_when_column_does_not_exist(dgr, dg):
namespace, name = dg.get_settings().namespace, dg.get_settings().name
dgr.put(namespace, name, dg.get_id())
table_full_name = f"{namespace}.{name}"
assert len(dgr.get_columns(table_full_name)) > 0
assert dgr.get_column_values("namespace.name", "") == []
def test_i_can_get_empty_entries_when_registry_is_empty(dgr):
"""get_all_entries() returns empty dict when nothing registered."""
assert dgr.get_all_entries() == {}