import logging from typing import Optional import numpy as np import pandas as pd from myfasthtml.core.constants import ColumnType, ROW_INDEX_ID from myfasthtml.core.data.ColumnDefinition import ColumnDefinition from myfasthtml.core.dbmanager import DbObject from myfasthtml.core.instances import MultipleInstance from myfasthtml.core.utils import make_safe_id, make_unique_safe_id logger = logging.getLogger(__name__) _COLUMN_TYPE_DEFAULTS = { ColumnType.Number: 0, ColumnType.Text: "", ColumnType.Bool: False, ColumnType.Datetime: pd.NaT, } class DataStore(DbObject): """Persistent storage for a DataGrid's tabular data. Holds the DataFrame and its derived caches used for rendering and formula evaluation. Contains no business logic — all mutations are performed by DataService. Attributes: ne_df: The pandas DataFrame. Source of truth for non-formula columns. ns_fast_access: Dict mapping col_id to a numpy array. O(1) column lookup used by FormulaEngine and rendering. ns_row_data: List of row dicts built from ns_fast_access. Used by FormattingEngine for rule evaluation. ns_total_rows: Cached total row count after filtering. """ def __init__(self, owner, save_state: bool = True): with self.initializing(): super().__init__(owner, name=f"{owner.get_id()}#store", save_state=save_state) self.ne_df = None self.ns_fast_access = None self.ns_row_data = None self.ns_total_rows = None class DataServiceState(DbObject): """Persistent state for DataService. Stores the column definitions and the table name associated with the DataGrid. Persists across sessions via DbObject. Attributes: columns: Ordered list of column data definitions. table_name: Fully qualified table name used by FormulaEngine (format: "namespace.name" or "name"). """ def __init__(self, owner, save_state: bool = True): with self.initializing(): super().__init__(owner, name="#state", save_state=save_state) self.columns: list[ColumnDefinition] = [] self.table_name: str = "" class DataService(MultipleInstance): """Data companion to DataGrid. Owns the DataStore and the list of ColumnDefinition objects for one DataGrid. All data mutations go through this class. Holds a reference to DataServicesManager to access the shared FormulaEngine. This class can exist and operate independently of any rendering component. Attributes: _state: Persistent state (columns, table_name). _store: Persistent storage (DataFrame, caches). """ def __init__(self, parent, _id: Optional[str] = None, save_state: bool = True): super().__init__(parent, _id=_id) self._state = DataServiceState(self, save_state=save_state) self._store = DataStore(self, save_state=save_state) self._init_store() def save_state(self): self._state.save() @property def columns(self) -> list[ColumnDefinition]: """Return the list of column definitions.""" return self._state.columns @property def table_name(self) -> str: """Return the fully qualified table name used by FormulaEngine.""" return self._state.table_name def set_table_name(self, table_name: str) -> None: """Update the table name (e.g. after a rename).""" self._state.table_name = table_name def get_store(self) -> DataStore: """Return the underlying DataStore.""" return self._store def get_formula_engine(self): """Return the shared FormulaEngine from DataServicesManager.""" return self._parent.get_formula_engine() # ------------------------------------------------------------------ # Data initialisation # ------------------------------------------------------------------ def load_dataframe(self, df: pd.DataFrame, init_columns: bool = True) -> None: """Load a DataFrame into the store and initialise caches. Args: df: Source DataFrame. Column names are normalised to safe IDs. init_columns: When True, build ColumnDefinition list from the DataFrame columns and register any existing formula columns with the FormulaEngine. """ if df is None: return df.columns = df.columns.map(make_safe_id) self._store.ne_df = df if init_columns: self._state.columns = self._build_column_definitions(df) self._state.save() self._init_store() # ------------------------------------------------------------------ # Mutations # ------------------------------------------------------------------ def add_column(self, col_def: ColumnDefinition) -> None: """Add a new column to the DataGrid data layer. Assigns a unique safe col_id from the title. For Formula and RowIndex columns, no DataFrame column is created. For all other types, a column with a type-appropriate default value is added to the DataFrame. Args: col_def: Column definition. col_id will be set by this method. """ col_def.col_id = make_unique_safe_id( col_def.title, [c.col_id for c in self._state.columns] ) if col_def.type == ColumnType.Formula: col_def.col_index = -1 self._state.columns.append(col_def) self._state.save() return if col_def.type == ColumnType.RowIndex: col_def.col_index = -1 self._state.columns.append(col_def) if self._store.ne_df is not None: self._store.ns_fast_access[col_def.col_id] = ( self._store.ne_df.index.to_numpy() ) self._state.save() self._store.save() return default_value = _COLUMN_TYPE_DEFAULTS.get(col_def.type, "") col_def.col_index = ( len(self._store.ne_df.columns) if self._store.ne_df is not None else 0 ) self._state.columns.append(col_def) if self._store.ne_df is not None: self._store.ne_df[col_def.col_id] = default_value self._store.ns_fast_access[col_def.col_id] = ( self._store.ne_df[col_def.col_id].to_numpy() ) for row_dict in self._store.ns_row_data: row_dict[col_def.col_id] = default_value self._state.save() self._store.save() self._mark_changed(col_def.col_id) def add_row(self, row_data: Optional[dict] = None) -> None: """Append a new row with incremental cache updates. Creates default values for all non-virtual columns when row_data is not provided. Marks formula columns dirty so ensure_ready() will recalculate them on the next render. Args: row_data: Optional dict of {col_id: value}. Defaults to type-appropriate values for each column. """ if self._store.ne_df is None: return new_index = len(self._store.ne_df) if row_data is None: row_data = {} for col in self._state.columns: if col.type not in (ColumnType.Formula, ColumnType.RowSelection_): value = ( new_index if col.type == ColumnType.RowIndex else _COLUMN_TYPE_DEFAULTS.get(col.type, "") ) row_data[col.col_id] = value self._store.ne_df.loc[new_index] = row_data for col_id, value in row_data.items(): if col_id in self._store.ns_fast_access: self._store.ns_fast_access[col_id] = np.append( self._store.ns_fast_access[col_id], value ) else: self._store.ns_fast_access[col_id] = np.array([value]) self._store.ns_row_data.append(row_data.copy()) self._store.ns_total_rows = len(self._store.ne_df) self._store.save() self._mark_all_formula_columns_dirty() def set_data(self, col_id: str, row_index: int, value) -> None: """Update a single cell value. Updates the DataFrame, fast-access cache, and row data dict, then marks dependent formula columns dirty. Args: col_id: Column identifier. row_index: Zero-based row index. value: New cell value. """ if self._store.ne_df is None: return self._store.ne_df.at[row_index, col_id] = value if self._store.ns_fast_access and col_id in self._store.ns_fast_access: self._store.ns_fast_access[col_id][row_index] = value if self._store.ns_row_data and row_index < len(self._store.ns_row_data): self._store.ns_row_data[row_index][col_id] = value self._store.save() self._mark_changed(col_id, rows=[row_index]) # ------------------------------------------------------------------ # Formula management # ------------------------------------------------------------------ def register_formula(self, col_id: str, formula_text: str) -> None: """Register or update a formula for a column with the FormulaEngine. Args: col_id: Column identifier. formula_text: DSL formula expression. """ engine = self.get_formula_engine() if engine is None: return try: engine.set_formula(self._state.table_name, col_id, formula_text) except Exception as e: logger.warning("Failed to register formula for %s.%s: %s", self._state.table_name, col_id, e) def remove_formula(self, col_id: str) -> None: """Remove a formula for a column from the FormulaEngine. Args: col_id: Column identifier. """ engine = self.get_formula_engine() if engine is None: return engine.remove_formula(self._state.table_name, col_id) def ensure_ready(self) -> None: """Recalculate dirty formula columns before rendering. Called by DataGrid.mk_body_content_page() to ensure formula columns are up-to-date. No-op when no columns are dirty. """ engine = self.get_formula_engine() if engine is None: return engine.recalculate_if_needed(self._state.table_name, self._store) # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _init_store(self): df = self._store.ne_df if df is None: return self._store.ns_fast_access = self._build_fast_access(df) self._store.ns_row_data = df.to_dict(orient="records") self._store.ns_total_rows = len(df) self._store.save() self._register_existing_formulas() def _build_column_definitions(self, df: pd.DataFrame) -> list[ColumnDefinition]: """Build ColumnDefinition objects from DataFrame columns. Args: df: Source DataFrame with normalised column names. Returns: Ordered list of ColumnDefinition objects. """ return [ ColumnDefinition( col_id=make_safe_id(col_id), col_index=col_index, title=col_id, type=self._infer_column_type(df[make_safe_id(col_id)].dtype), ) for col_index, col_id in enumerate(df.columns) ] @staticmethod def _infer_column_type(dtype) -> ColumnType: """Infer ColumnType from a pandas dtype.""" if pd.api.types.is_integer_dtype(dtype): return ColumnType.Number if pd.api.types.is_float_dtype(dtype): return ColumnType.Number if pd.api.types.is_bool_dtype(dtype): return ColumnType.Bool if pd.api.types.is_datetime64_any_dtype(dtype): return ColumnType.Datetime return ColumnType.Text @staticmethod def _build_fast_access(df: pd.DataFrame) -> dict: """Build ns_fast_access from a DataFrame. Args: df: Source DataFrame. Returns: Dict mapping col_id to numpy array, plus ROW_INDEX_ID. """ result = {col: df[col].to_numpy() for col in df.columns} result[ROW_INDEX_ID] = df.index.to_numpy() return result def _register_existing_formulas(self) -> None: """Re-register all formula columns with the FormulaEngine.""" engine = self.get_formula_engine() if engine is None: return for col_def in self._state.columns: if col_def.formula: self.register_formula(col_def.col_id, col_def.formula) def _mark_changed(self, col_id: str, rows: Optional[list[int]] = None) -> None: """Notify FormulaEngine that a column's data has changed. Args: col_id: Changed column identifier. rows: Optional list of changed row indices. None means all rows. """ engine = self.get_formula_engine() if engine is None: return engine.mark_data_changed(self._state.table_name, col_id, rows) def _mark_all_formula_columns_dirty(self) -> None: """Mark all formula columns dirty after a structural change (e.g. add_row).""" engine = self.get_formula_engine() if engine is None: return table = self._state.table_name for col in self._state.columns: if col.type == ColumnType.Formula and col.formula: engine.mark_data_changed(table, col.col_id)