Added DataServicesManager and DataService
This commit is contained in:
380
src/myfasthtml/core/data/DataService.py
Normal file
380
src/myfasthtml/core/data/DataService.py
Normal file
@@ -0,0 +1,380 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from myfasthtml.core.constants import ColumnType, ROW_INDEX_ID
|
||||
from myfasthtml.core.data.ColumnDefinition import ColumnDefinition
|
||||
from myfasthtml.core.dbmanager import DbObject
|
||||
from myfasthtml.core.instances import MultipleInstance
|
||||
from myfasthtml.core.utils import make_safe_id, make_unique_safe_id
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_COLUMN_TYPE_DEFAULTS = {
|
||||
ColumnType.Number: 0,
|
||||
ColumnType.Text: "",
|
||||
ColumnType.Bool: False,
|
||||
ColumnType.Datetime: pd.NaT,
|
||||
}
|
||||
|
||||
|
||||
class DataStore(DbObject):
|
||||
"""Persistent storage for a DataGrid's tabular data.
|
||||
|
||||
Holds the DataFrame and its derived caches used for rendering and formula
|
||||
evaluation. Contains no business logic — all mutations are performed by
|
||||
DataService.
|
||||
|
||||
Attributes:
|
||||
ne_df: The pandas DataFrame. Source of truth for non-formula columns.
|
||||
ns_fast_access: Dict mapping col_id to a numpy array. O(1) column
|
||||
lookup used by FormulaEngine and rendering.
|
||||
ns_row_data: List of row dicts built from ns_fast_access. Used by
|
||||
FormattingEngine for rule evaluation.
|
||||
ns_total_rows: Cached total row count after filtering.
|
||||
"""
|
||||
|
||||
def __init__(self, owner, save_state: bool = True):
|
||||
with self.initializing():
|
||||
super().__init__(owner, name=f"{owner.get_id()}#store", save_state=save_state)
|
||||
self.ne_df = None
|
||||
self.ns_fast_access = None
|
||||
self.ns_row_data = None
|
||||
self.ns_total_rows = None
|
||||
|
||||
|
||||
class DataServiceState(DbObject):
|
||||
"""Persistent state for DataService.
|
||||
|
||||
Stores the column definitions and the table name associated with the
|
||||
DataGrid. Persists across sessions via DbObject.
|
||||
|
||||
Attributes:
|
||||
columns: Ordered list of column data definitions.
|
||||
table_name: Fully qualified table name used by FormulaEngine
|
||||
(format: "namespace.name" or "name").
|
||||
"""
|
||||
|
||||
def __init__(self, owner, save_state: bool = True):
|
||||
with self.initializing():
|
||||
super().__init__(owner, name="#state", save_state=save_state)
|
||||
self.columns: list[ColumnDefinition] = []
|
||||
self.table_name: str = ""
|
||||
|
||||
|
||||
class DataService(MultipleInstance):
|
||||
"""Data companion to DataGrid.
|
||||
|
||||
Owns the DataStore and the list of ColumnDefinition objects for one
|
||||
DataGrid. All data mutations go through this class. Holds a reference to
|
||||
DataServicesManager to access the shared FormulaEngine.
|
||||
|
||||
This class can exist and operate independently of any rendering component.
|
||||
|
||||
Attributes:
|
||||
_state: Persistent state (columns, table_name).
|
||||
_store: Persistent storage (DataFrame, caches).
|
||||
"""
|
||||
|
||||
def __init__(self, parent, _id: Optional[str] = None, save_state: bool = True):
|
||||
super().__init__(parent, _id=_id)
|
||||
self._state = DataServiceState(self, save_state=save_state)
|
||||
self._store = DataStore(self, save_state=save_state)
|
||||
|
||||
@property
|
||||
def columns(self) -> list[ColumnDefinition]:
|
||||
"""Return the list of column definitions."""
|
||||
return self._state.columns
|
||||
|
||||
@property
|
||||
def table_name(self) -> str:
|
||||
"""Return the fully qualified table name used by FormulaEngine."""
|
||||
return self._state.table_name
|
||||
|
||||
def set_table_name(self, table_name: str) -> None:
|
||||
"""Update the table name (e.g. after a rename)."""
|
||||
self._state.table_name = table_name
|
||||
|
||||
def get_store(self) -> DataStore:
|
||||
"""Return the underlying DataStore."""
|
||||
return self._store
|
||||
|
||||
def get_formula_engine(self):
|
||||
"""Return the shared FormulaEngine from DataServicesManager."""
|
||||
return self._parent.get_formula_engine()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Data initialisation
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def load_dataframe(self, df: pd.DataFrame, init_columns: bool = True) -> None:
|
||||
"""Load a DataFrame into the store and initialise caches.
|
||||
|
||||
Args:
|
||||
df: Source DataFrame. Column names are normalised to safe IDs.
|
||||
init_columns: When True, build ColumnDefinition list from the
|
||||
DataFrame columns and register any existing formula columns
|
||||
with the FormulaEngine.
|
||||
"""
|
||||
if df is None:
|
||||
return
|
||||
|
||||
df.columns = df.columns.map(make_safe_id)
|
||||
self._store.ne_df = df
|
||||
|
||||
if init_columns:
|
||||
self._state.columns = self._build_column_definitions(df)
|
||||
self._state.save()
|
||||
|
||||
self._store.ns_fast_access = self._build_fast_access(df)
|
||||
self._store.ns_row_data = df.to_dict(orient="records")
|
||||
self._store.ns_total_rows = len(df)
|
||||
self._store.save()
|
||||
|
||||
self._register_existing_formulas()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Mutations
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def add_column(self, col_def: ColumnDefinition) -> None:
|
||||
"""Add a new column to the DataGrid data layer.
|
||||
|
||||
Assigns a unique safe col_id from the title. For Formula and RowIndex
|
||||
columns, no DataFrame column is created. For all other types, a column
|
||||
with a type-appropriate default value is added to the DataFrame.
|
||||
|
||||
Args:
|
||||
col_def: Column definition. col_id will be set by this method.
|
||||
"""
|
||||
col_def.col_id = make_unique_safe_id(
|
||||
col_def.title, [c.col_id for c in self._state.columns]
|
||||
)
|
||||
|
||||
if col_def.type == ColumnType.Formula:
|
||||
col_def.col_index = -1
|
||||
self._state.columns.append(col_def)
|
||||
self._state.save()
|
||||
return
|
||||
|
||||
if col_def.type == ColumnType.RowIndex:
|
||||
col_def.col_index = -1
|
||||
self._state.columns.append(col_def)
|
||||
if self._store.ne_df is not None:
|
||||
self._store.ns_fast_access[col_def.col_id] = (
|
||||
self._store.ne_df.index.to_numpy()
|
||||
)
|
||||
self._state.save()
|
||||
self._store.save()
|
||||
return
|
||||
|
||||
default_value = _COLUMN_TYPE_DEFAULTS.get(col_def.type, "")
|
||||
col_def.col_index = (
|
||||
len(self._store.ne_df.columns) if self._store.ne_df is not None else 0
|
||||
)
|
||||
self._state.columns.append(col_def)
|
||||
|
||||
if self._store.ne_df is not None:
|
||||
self._store.ne_df[col_def.col_id] = default_value
|
||||
self._store.ns_fast_access[col_def.col_id] = (
|
||||
self._store.ne_df[col_def.col_id].to_numpy()
|
||||
)
|
||||
for row_dict in self._store.ns_row_data:
|
||||
row_dict[col_def.col_id] = default_value
|
||||
|
||||
self._state.save()
|
||||
self._store.save()
|
||||
self._mark_changed(col_def.col_id)
|
||||
|
||||
def add_row(self, row_data: Optional[dict] = None) -> None:
|
||||
"""Append a new row with incremental cache updates.
|
||||
|
||||
Creates default values for all non-virtual columns when row_data is
|
||||
not provided. Marks formula columns dirty so ensure_ready() will
|
||||
recalculate them on the next render.
|
||||
|
||||
Args:
|
||||
row_data: Optional dict of {col_id: value}. Defaults to
|
||||
type-appropriate values for each column.
|
||||
"""
|
||||
if self._store.ne_df is None:
|
||||
return
|
||||
|
||||
new_index = len(self._store.ne_df)
|
||||
|
||||
if row_data is None:
|
||||
row_data = {}
|
||||
for col in self._state.columns:
|
||||
if col.type not in (ColumnType.Formula, ColumnType.RowSelection_):
|
||||
value = (
|
||||
new_index
|
||||
if col.type == ColumnType.RowIndex
|
||||
else _COLUMN_TYPE_DEFAULTS.get(col.type, "")
|
||||
)
|
||||
row_data[col.col_id] = value
|
||||
|
||||
self._store.ne_df.loc[new_index] = row_data
|
||||
|
||||
for col_id, value in row_data.items():
|
||||
if col_id in self._store.ns_fast_access:
|
||||
self._store.ns_fast_access[col_id] = np.append(
|
||||
self._store.ns_fast_access[col_id], value
|
||||
)
|
||||
else:
|
||||
self._store.ns_fast_access[col_id] = np.array([value])
|
||||
|
||||
self._store.ns_row_data.append(row_data.copy())
|
||||
self._store.ns_total_rows = len(self._store.ne_df)
|
||||
self._store.save()
|
||||
|
||||
self._mark_all_formula_columns_dirty()
|
||||
|
||||
def set_data(self, col_id: str, row_index: int, value) -> None:
|
||||
"""Update a single cell value.
|
||||
|
||||
Updates the DataFrame, fast-access cache, and row data dict, then
|
||||
marks dependent formula columns dirty.
|
||||
|
||||
Args:
|
||||
col_id: Column identifier.
|
||||
row_index: Zero-based row index.
|
||||
value: New cell value.
|
||||
"""
|
||||
if self._store.ne_df is None:
|
||||
return
|
||||
|
||||
self._store.ne_df.at[row_index, col_id] = value
|
||||
|
||||
if self._store.ns_fast_access and col_id in self._store.ns_fast_access:
|
||||
self._store.ns_fast_access[col_id][row_index] = value
|
||||
|
||||
if self._store.ns_row_data and row_index < len(self._store.ns_row_data):
|
||||
self._store.ns_row_data[row_index][col_id] = value
|
||||
|
||||
self._store.save()
|
||||
self._mark_changed(col_id, rows=[row_index])
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Formula management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def register_formula(self, col_id: str, formula_text: str) -> None:
|
||||
"""Register or update a formula for a column with the FormulaEngine.
|
||||
|
||||
Args:
|
||||
col_id: Column identifier.
|
||||
formula_text: DSL formula expression.
|
||||
"""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
try:
|
||||
engine.set_formula(self._state.table_name, col_id, formula_text)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to register formula for %s.%s: %s",
|
||||
self._state.table_name, col_id, e)
|
||||
|
||||
def remove_formula(self, col_id: str) -> None:
|
||||
"""Remove a formula for a column from the FormulaEngine.
|
||||
|
||||
Args:
|
||||
col_id: Column identifier.
|
||||
"""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
engine.remove_formula(self._state.table_name, col_id)
|
||||
|
||||
def ensure_ready(self) -> None:
|
||||
"""Recalculate dirty formula columns before rendering.
|
||||
|
||||
Called by DataGrid.mk_body_content_page() to ensure formula columns
|
||||
are up-to-date. No-op when no columns are dirty.
|
||||
"""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
engine.recalculate_if_needed(self._state.table_name, self._store)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Private helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _build_column_definitions(self, df: pd.DataFrame) -> list[ColumnDefinition]:
|
||||
"""Build ColumnDefinition objects from DataFrame columns.
|
||||
|
||||
Args:
|
||||
df: Source DataFrame with normalised column names.
|
||||
|
||||
Returns:
|
||||
Ordered list of ColumnDefinition objects.
|
||||
"""
|
||||
return [
|
||||
ColumnDefinition(
|
||||
col_id=make_safe_id(col_id),
|
||||
col_index=col_index,
|
||||
title=col_id,
|
||||
type=self._infer_column_type(df[make_safe_id(col_id)].dtype),
|
||||
)
|
||||
for col_index, col_id in enumerate(df.columns)
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def _infer_column_type(dtype) -> ColumnType:
|
||||
"""Infer ColumnType from a pandas dtype."""
|
||||
if pd.api.types.is_integer_dtype(dtype):
|
||||
return ColumnType.Number
|
||||
if pd.api.types.is_float_dtype(dtype):
|
||||
return ColumnType.Number
|
||||
if pd.api.types.is_bool_dtype(dtype):
|
||||
return ColumnType.Bool
|
||||
if pd.api.types.is_datetime64_any_dtype(dtype):
|
||||
return ColumnType.Datetime
|
||||
return ColumnType.Text
|
||||
|
||||
@staticmethod
|
||||
def _build_fast_access(df: pd.DataFrame) -> dict:
|
||||
"""Build ns_fast_access from a DataFrame.
|
||||
|
||||
Args:
|
||||
df: Source DataFrame.
|
||||
|
||||
Returns:
|
||||
Dict mapping col_id to numpy array, plus ROW_INDEX_ID.
|
||||
"""
|
||||
result = {col: df[col].to_numpy() for col in df.columns}
|
||||
result[ROW_INDEX_ID] = df.index.to_numpy()
|
||||
return result
|
||||
|
||||
def _register_existing_formulas(self) -> None:
|
||||
"""Re-register all formula columns with the FormulaEngine."""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
for col_def in self._state.columns:
|
||||
if col_def.formula:
|
||||
self.register_formula(col_def.col_id, col_def.formula)
|
||||
|
||||
def _mark_changed(self, col_id: str, rows: Optional[list[int]] = None) -> None:
|
||||
"""Notify FormulaEngine that a column's data has changed.
|
||||
|
||||
Args:
|
||||
col_id: Changed column identifier.
|
||||
rows: Optional list of changed row indices. None means all rows.
|
||||
"""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
engine.mark_data_changed(self._state.table_name, col_id, rows)
|
||||
|
||||
def _mark_all_formula_columns_dirty(self) -> None:
|
||||
"""Mark all formula columns dirty after a structural change (e.g. add_row)."""
|
||||
engine = self.get_formula_engine()
|
||||
if engine is None:
|
||||
return
|
||||
table = self._state.table_name
|
||||
for col in self._state.columns:
|
||||
if col.type == ColumnType.Formula and col.formula:
|
||||
engine.mark_data_changed(table, col.col_id)
|
||||
Reference in New Issue
Block a user