Introducing columns formulas

This commit is contained in:
2026-02-13 21:38:00 +01:00
parent 0df78c0513
commit e8443f07f9
29 changed files with 3889 additions and 15 deletions

View File

View File

@@ -0,0 +1,391 @@
"""
Tests for the DependencyGraph DAG.
"""
import pytest
from myfasthtml.core.formula.dataclasses import (
ColumnRef,
ConditionalExpr,
CrossTableRef,
FormulaDefinition,
FunctionCall,
LiteralNode,
UnaryOp,
WhereClause,
)
from myfasthtml.core.formula.dependency_graph import DependencyGraph
from myfasthtml.core.formula.dsl.exceptions import FormulaCycleError
from myfasthtml.core.formula.engine import parse_formula
def make_formula(expr):
"""Create a FormulaDefinition from a raw AST node for direct AST testing."""
return FormulaDefinition(expression=expr, source_text="test")
# ==================== Add formula ====================
def test_i_can_add_simple_dependency():
"""Test that adding a formula creates edges in the graph."""
graph = DependencyGraph()
formula = parse_formula("{Price} * {Quantity}")
graph.add_formula("orders", "total", formula)
node = graph.get_node("orders", "total")
assert node is not None
assert node.formula is formula
assert node.dirty is True
# Precedents should include Price and Quantity
precedent = graph._precedents["orders.total"]
assert "orders.Price" in precedent
assert "orders.Quantity" in precedent
# Descendants are correctly set
assert "orders.total" in graph._dependents["orders.Price"]
assert "orders.total" in graph._dependents["orders.Quantity"]
def test_i_can_add_formula_and_check_dirty():
"""Test that newly added formulas are marked dirty."""
graph = DependencyGraph()
formula = parse_formula("{A} + {B}")
graph.add_formula("t", "C", formula)
node = graph.get_node("t", "C")
assert node.dirty is True
def test_i_can_update_formula():
"""Test that replacing a formula updates dependencies."""
graph = DependencyGraph()
formula1 = parse_formula("{A} + {B}")
graph.add_formula("t", "C", formula1)
formula2 = parse_formula("{X} * {Y}")
graph.add_formula("t", "C", formula2)
node = graph.get_node("t", "C")
assert node.formula is formula2
# Should no longer depend on A and B
precedent = graph._precedents.get("t.C", set())
assert "t.A" not in precedent
assert "t.B" not in precedent
# ==================== Cycle detection ====================
def test_i_cannot_create_cycle():
"""Test that circular dependencies raise FormulaCycleError."""
graph = DependencyGraph()
# A depends on B
graph.add_formula("t", "A", parse_formula("{B} + 1"))
# B depends on A -> cycle
with pytest.raises(FormulaCycleError):
graph.add_formula("t", "B", parse_formula("{A} * 2"))
def test_i_cannot_create_self_reference():
"""Test that a formula referencing its own column raises FormulaCycleError."""
graph = DependencyGraph()
with pytest.raises(FormulaCycleError):
graph.add_formula("t", "A", parse_formula("{A} + 1"))
def test_i_can_detect_long_cycle():
"""Test that a longer chain cycle is also detected."""
graph = DependencyGraph()
graph.add_formula("t", "A", parse_formula("{B} + 1"))
graph.add_formula("t", "B", parse_formula("{C} + 1"))
with pytest.raises(FormulaCycleError):
graph.add_formula("t", "C", parse_formula("{A} + 1"))
# ==================== Dirty flag propagation ====================
def test_i_can_propagate_dirty_flags():
"""Test that marking a source column dirty propagates to dependents."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} * 2"))
# Clear dirty flag set by add_formula
graph.clear_dirty("t.B")
assert not graph.get_node("t", "B").dirty
# Mark source A as dirty
graph.mark_dirty("t", "A")
# B depends on A, so B should become dirty
node_b = graph.get_node("t", "B")
assert node_b is not None
assert node_b.dirty is True
def test_i_can_propagate_dirty_to_chain():
"""Test that dirty flags propagate through a chain: A -> B -> C."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.add_formula("t", "C", parse_formula("{B} + 1"))
# Clear all dirty flags
graph.clear_dirty("t.B")
graph.clear_dirty("t.C")
# Mark A dirty
graph.mark_dirty("t", "A")
assert graph.get_node("t", "B").dirty is True
assert graph.get_node("t", "C").dirty is True
def test_i_can_propagate_specific_rows():
"""Test that dirty propagation can be limited to specific rows."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} * 2"))
graph.clear_dirty("t.B")
graph.mark_dirty("t", "A", rows=[0, 2, 5])
node_b = graph.get_node("t", "B")
assert node_b.dirty is True
assert 0 in node_b.dirty_rows
assert 2 in node_b.dirty_rows
assert 5 in node_b.dirty_rows
assert 1 not in node_b.dirty_rows
# ==================== Topological ordering ====================
def test_i_can_get_calculation_order():
"""Test that dirty formula nodes are returned in topological order."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.add_formula("t", "C", parse_formula("{B} + 1"))
graph.mark_dirty("t", "A")
order = graph.get_calculation_order(table="t")
node_ids = [n.node_id for n in order]
# B must come before C
assert "t.B" in node_ids
assert "t.C" in node_ids
assert node_ids.index("t.B") < node_ids.index("t.C")
def test_i_can_get_calculation_order_without_table_filter():
"""Test that get_calculation_order with no table filter returns dirty nodes across all tables.
Why: The table parameter is optional. Omitting it should return dirty formula nodes
from every table, not just one.
"""
graph = DependencyGraph()
graph.add_formula("t1", "B", parse_formula("{A} + 1"))
graph.add_formula("t2", "D", parse_formula("{C} * 2"))
order = graph.get_calculation_order()
node_ids = [n.node_id for n in order]
assert "t1.B" in node_ids
assert "t2.D" in node_ids
def test_i_can_get_calculation_order_excludes_clean_nodes():
"""Test that get_calculation_order only returns dirty formula nodes.
Why: The method filters on node.dirty (source line 166). Clean nodes must
never appear in the output, otherwise they would be recalculated unnecessarily.
"""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.add_formula("t", "C", parse_formula("{A} * 2"))
graph.clear_dirty("t.C")
order = graph.get_calculation_order(table="t")
node_ids = [n.node_id for n in order]
assert "t.B" in node_ids
assert "t.C" not in node_ids
def test_i_can_handle_diamond_dependency():
"""Test topological order with diamond dependency: A -> B, A -> C, B+C -> D."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.add_formula("t", "C", parse_formula("{A} * 2"))
graph.add_formula("t", "D", parse_formula("{B} + {C}"))
graph.mark_dirty("t", "A")
order = graph.get_calculation_order(table="t")
node_ids = [n.node_id for n in order]
assert "t.B" in node_ids
assert "t.C" in node_ids
assert "t.D" in node_ids
# D must come after both B and C
assert node_ids.index("t.D") > node_ids.index("t.B")
assert node_ids.index("t.D") > node_ids.index("t.C")
# ==================== Remove formula ====================
def test_i_can_remove_formula():
"""Test that a formula can be removed from the graph."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
assert graph.has_formula("t", "B")
graph.remove_formula("t", "B")
assert not graph.has_formula("t", "B")
def test_i_can_remove_formula_node_kept_when_has_dependents():
"""Test that removing a formula keeps the node when other formulas still depend on it.
Why: remove_formula deletes the node only when no dependents exist (source line 145-146).
If another formula depends on the removed node it must remain as a data node.
"""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.add_formula("t", "C", parse_formula("{B} * 2"))
graph.remove_formula("t", "B")
assert not graph.has_formula("t", "B")
assert graph.get_node("t", "B") is not None
def test_i_can_remove_formula_and_add_back():
"""Test that a formula can be removed and re-added."""
graph = DependencyGraph()
graph.add_formula("t", "B", parse_formula("{A} + 1"))
graph.remove_formula("t", "B")
# Should not raise
graph.add_formula("t", "B", parse_formula("{X} * 2"))
assert graph.has_formula("t", "B")
# ==================== Cross-table ====================
def test_i_can_handle_cross_table_dependencies():
"""Test that cross-table references create inter-table edges."""
graph = DependencyGraph()
formula = parse_formula("{Products.Price} * {Quantity}")
graph.add_formula("orders", "total", formula)
node = graph.get_node("orders", "total")
assert node is not None
prec = graph._precedents.get("orders.total", set())
assert "Products.Price" in prec
assert "orders.Quantity" in prec
def test_i_can_extract_cross_table_ref_with_where_clause():
"""Test that CrossTableRef with a WHERE clause adds both the remote column
and the local column from the WHERE clause as dependencies.
Why: Source line 366-367 adds where_clause.local_column as a dependency.
Without this test that code path is never exercised.
"""
graph = DependencyGraph()
formula = make_formula(
CrossTableRef(
table="Products",
column="Price",
where_clause=WhereClause(
remote_table="Products",
remote_column="id",
local_column="product_id",
),
)
)
graph.add_formula("orders", "total", formula)
prec = graph._precedents.get("orders.total", set())
assert "Products.Price" in prec
assert "orders.product_id" in prec
# ==================== _extract_dependencies AST coverage ====================
def test_i_can_extract_unary_op_dependency():
"""Test that UnaryOp correctly extracts its operand's column dependency.
Why: Source lines 373-375 handle UnaryOp. Without this test that branch
is never exercised — a negated column reference would silently produce
no dependency edge.
"""
graph = DependencyGraph()
formula = make_formula(UnaryOp("-", ColumnRef("A")))
graph.add_formula("t", "B", formula)
prec = graph._precedents.get("t.B", set())
assert "t.A" in prec
def test_i_can_extract_function_call_dependencies():
"""Test that FunctionCall extracts column dependencies from all arguments.
Why: Source lines 376-378 iterate over arguments. With multiple column
arguments every dependency must appear in the precedents set.
"""
graph = DependencyGraph()
formula = make_formula(FunctionCall("SUM", [ColumnRef("A"), ColumnRef("B")]))
graph.add_formula("t", "C", formula)
prec = graph._precedents.get("t.C", set())
assert "t.A" in prec
assert "t.B" in prec
def test_i_can_extract_function_call_with_no_column_args():
"""Test that a FunctionCall with only literal arguments creates no column dependencies.
Why: FunctionCall with literal args only (e.g. NOW(1)) must not create
spurious dependency edges that would trigger unnecessary recalculations.
"""
graph = DependencyGraph()
formula = make_formula(FunctionCall("NOW", [LiteralNode(1)]))
graph.add_formula("t", "C", formula)
prec = graph._precedents.get("t.C", set())
assert len(prec) == 0, "FunctionCall with literal args should produce no column dependencies"
def test_i_can_extract_conditional_expr_all_branches():
"""Test that ConditionalExpr extracts dependencies from value_expr, condition, and else_expr.
Why: Source lines 380-384 walk all three branches. All three column references
must appear as dependencies so that any change in any branch triggers recalculation.
"""
graph = DependencyGraph()
formula = make_formula(
ConditionalExpr(
value_expr=ColumnRef("A"),
condition=ColumnRef("B"),
else_expr=ColumnRef("C"),
)
)
graph.add_formula("t", "D", formula)
prec = graph._precedents.get("t.D", set())
assert "t.A" in prec
assert "t.B" in prec
assert "t.C" in prec
def test_i_can_extract_conditional_expr_without_else():
"""Test that ConditionalExpr with else_expr=None does not crash and extracts value and condition.
Why: Source line 383 guards on ``if node.else_expr is not None``. A missing
else branch must not raise and must still extract the remaining two dependencies.
"""
graph = DependencyGraph()
formula = make_formula(
ConditionalExpr(
value_expr=ColumnRef("A"),
condition=ColumnRef("B"),
else_expr=None,
)
)
graph.add_formula("t", "C", formula)
prec = graph._precedents.get("t.C", set())
assert "t.A" in prec
assert "t.B" in prec

View File

@@ -0,0 +1,408 @@
"""
Tests for the FormulaEngine facade.
"""
import numpy as np
import pytest
from myfasthtml.core.formula.dsl.exceptions import FormulaSyntaxError, FormulaCycleError
from myfasthtml.core.formula.engine import FormulaEngine
class FakeStore:
"""Minimal DatagridStore-like object for testing."""
def __init__(self, rows):
self.ns_row_data = rows
self.ns_fast_access = self._build_fast_access(rows)
self.ns_total_rows = len(rows)
@staticmethod
def _build_fast_access(rows):
"""Build a columnar fast-access dict from a list of row dicts."""
if not rows:
return {}
return {
col: np.array([row.get(col) for row in rows], dtype=object)
for col in rows[0].keys()
}
def make_engine(store_map=None):
"""Create an engine with an optional store map for cross-table resolution."""
def resolver(table_name):
return store_map.get(table_name) if store_map else None
return FormulaEngine(registry_resolver=resolver)
# ==================== TestSetFormula ====================
class TestSetFormula:
"""Tests for set_formula: parsing, registration, and edge cases."""
def test_i_can_set_and_evaluate_formula(self):
"""Test that a formula can be set and evaluated."""
rows = [{"Price": 10, "Quantity": 3}]
store = FakeStore(rows)
engine = make_engine({"orders": store})
engine.set_formula("orders", "total", "{Price} * {Quantity}")
engine.recalculate_if_needed("orders", store)
assert "total" in store.ns_fast_access
assert store.ns_fast_access["total"][0] == 30
def test_i_can_evaluate_multiple_rows(self):
"""Test that formula evaluation works across multiple rows."""
rows = [
{"Price": 10, "Quantity": 3},
{"Price": 20, "Quantity": 2},
{"Price": 5, "Quantity": 10},
]
store = FakeStore(rows)
engine = make_engine({"orders": store})
engine.set_formula("orders", "total", "{Price} * {Quantity}")
engine.recalculate_if_needed("orders", store)
totals = store.ns_fast_access["total"]
assert totals[0] == 30
assert totals[1] == 40
assert totals[2] == 50
def test_i_cannot_set_invalid_formula(self):
"""Test that invalid formula syntax raises FormulaSyntaxError."""
engine = make_engine()
with pytest.raises(FormulaSyntaxError):
engine.set_formula("t", "col", "{Price} * * {Qty}")
def test_i_cannot_set_formula_with_cycle(self):
"""Test that a circular dependency raises FormulaCycleError."""
rows = [{"A": 1}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "A", "{B} + 1")
with pytest.raises(FormulaCycleError):
engine.set_formula("t", "B", "{A} * 2")
@pytest.mark.parametrize("text", ["", " ", "\t\n"])
def test_i_can_set_formula_with_blank_input_removes_it(self, text):
"""Test that setting a blank or whitespace-only formula string removes it.
Why: set_formula strips the input (source line 86) before checking emptiness.
All blank variants — empty string, spaces, tabs — must behave identically.
"""
rows = [{"Price": 10}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "col", "{Price} * 2")
assert engine.has_formula("t", "col")
engine.set_formula("t", "col", text)
assert not engine.has_formula("t", "col")
def test_i_can_replace_formula_clears_old_dependencies(self):
"""Test that replacing a formula removes old dependency edges.
Why: add_formula calls _remove_edges before adding new ones (source line 99).
After replacement, marking the old dependency dirty must NOT trigger
recalculation of the formula column.
"""
rows = [{"A": 5, "B": 3, "X": 10}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "C", "{A} + {B}")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == pytest.approx(8.0)
engine.set_formula("t", "C", "{X} * 2")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == pytest.approx(20.0)
# A is no longer a dependency — changing it should NOT trigger C recalculation
rows[0]["A"] = 100
store.ns_fast_access["A"] = np.array([100], dtype=object)
engine.mark_data_changed("t", "A")
store.ns_fast_access["C"][0] = 999 # sentinel
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == 999, (
"C should not be recalculated when A changes after formula replacement"
)
# ==================== TestRemoveFormula ====================
class TestRemoveFormula:
"""Tests for remove_formula."""
def test_i_can_remove_formula(self):
"""Test that a formula can be removed."""
rows = [{"Price": 10, "Quantity": 3}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "total", "{Price} * {Quantity}")
assert engine.has_formula("t", "total")
engine.remove_formula("t", "total")
assert not engine.has_formula("t", "total")
def test_i_can_remove_formula_and_set_back(self):
"""Test that a formula can be removed via empty string and then re-added."""
rows = [{"Price": 10}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "col", "{Price} * 2")
assert engine.has_formula("t", "col")
engine.set_formula("t", "col", "")
assert not engine.has_formula("t", "col")
# ==================== TestRecalculate ====================
class TestRecalculate:
"""Tests for recalculate_if_needed: dirty tracking, return values, and row data update."""
def test_i_can_recalculate_only_dirty(self):
"""Test that only dirty formula columns are recalculated."""
rows = [{"A": 5, "B": 3}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "C", "{A} + {B}")
engine.recalculate_if_needed("t", store)
# Manually set a different value to detect recalculation
store.ns_fast_access["C"][0] = 999
# No dirty flags → should NOT recalculate
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == 999 # unchanged
def test_i_can_recalculate_after_data_changed(self):
"""Test that marking data changed triggers recalculation."""
rows = [{"A": 5, "B": 3}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "C", "{A} + {B}")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == 8
# Update both storage structures (mirrors real DataGrid behaviour)
rows[0]["A"] = 10
store.ns_fast_access["A"] = np.array([10], dtype=object)
# Mark source column dirty
engine.mark_data_changed("t", "A")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["C"][0] == 13
def test_i_can_recalculate_returns_false_when_no_dirty(self):
"""Test that recalculate_if_needed returns False when no nodes are dirty.
Why: Source line 151 returns False early when get_calculation_order is empty.
After a first successful recalculation all dirty flags are cleared, so the
second call must return False to avoid redundant work.
"""
rows = [{"A": 5}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 1")
engine.recalculate_if_needed("t", store) # clears dirty flag
result = engine.recalculate_if_needed("t", store)
assert result is False
def test_i_can_recalculate_returns_true_when_dirty(self):
"""Test that recalculate_if_needed returns True when columns are recalculated.
Why: Source line 164 returns True after the evaluation loop. This return value
lets callers skip downstream work (e.g. rendering) when nothing changed.
"""
rows = [{"A": 5}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 1")
result = engine.recalculate_if_needed("t", store)
assert result is True
def test_i_can_recalculate_with_empty_store(self):
"""Test that recalculate_if_needed handles a store with no rows without crashing.
Why: _evaluate_column guards on empty ns_row_data (source line 211-212).
No formula result should be written when there are no rows to process.
"""
store = FakeStore([]) # no rows
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 1")
engine.recalculate_if_needed("t", store) # must not raise
assert "B" not in store.ns_fast_access
def test_i_can_verify_formula_values_appear_in_row_data(self):
"""Test that formula values are written back into ns_row_data after recalculation.
Why: _rebuild_row_data (source line 231-249) merges ns_fast_access values into
each row dict. This ensures formula results are available in row_data for
subsequent evaluation passes and for rendering.
"""
rows = [{"A": 5}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 10")
engine.recalculate_if_needed("t", store)
assert store.ns_row_data[0]["B"] == pytest.approx(15.0)
# --- Known bug: chained formula columns ---
# _rebuild_row_data is called once AFTER the full evaluation loop, so formula
# column B is not yet in row_data when formula column C is evaluated.
# Fix needed: rebuild row_data between each column evaluation in the loop.
def test_i_can_recalculate_chain_formula_initial(self):
"""Test that C = f(B) is correct when B is itself a formula column (initial pass).
Chain: A (data) → B = A + 10 → C = B * 2
Expected: B = 15, C = 30.
"""
rows = [{"A": 5}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 10")
engine.set_formula("t", "C", "{B} * 2")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["B"][0] == 15
assert store.ns_fast_access["C"][0] == 30
def test_i_can_recalculate_chain_formula_after_data_change(self):
"""Test that C = f(B) stays correct after the source data column A changes.
Chain: A (data) → B = A + 10 → C = B * 2
After A changes from 5 to 10: B = 20, C = 40.
"""
rows = [{"A": 5}]
store = FakeStore(rows)
engine = make_engine({"t": store})
engine.set_formula("t", "B", "{A} + 10")
engine.set_formula("t", "C", "{B} * 2")
engine.recalculate_if_needed("t", store) # first pass (B=15, C=None per bug above)
rows[0]["A"] = 10
store.ns_fast_access["A"] = np.array([10], dtype=object)
engine.mark_data_changed("t", "A")
engine.recalculate_if_needed("t", store)
assert store.ns_fast_access["B"][0] == 20
assert store.ns_fast_access["C"][0] == 40
# ==================== TestCrossTable ====================
class TestCrossTable:
"""Tests for cross-table reference resolution strategies."""
def test_i_can_handle_cross_table_formula(self):
"""Test that cross-table references are resolved via registry (Strategy 3: row_index)."""
orders_rows = [{"Quantity": 3, "ProductId": 1}]
products_rows = [{"Price": 99.0}]
orders_store = FakeStore(orders_rows)
products_store = FakeStore(products_rows)
products_store.ns_fast_access = {"Price": np.array([99.0], dtype=object)}
engine = make_engine({
"orders": orders_store,
"products": products_store,
})
engine.set_formula("orders", "total", "{products.Price} * {Quantity}")
engine.recalculate_if_needed("orders", orders_store)
assert "total" in orders_store.ns_fast_access
assert orders_store.ns_fast_access["total"][0] == pytest.approx(297.0)
def test_i_can_resolve_cross_table_by_id_join(self):
"""Test that cross-table references are resolved via implicit id-join (Strategy 2).
Why: Strategy 2 (source lines 303-318) matches rows where both tables share
the same id value. This allows cross-table lookups without an explicit WHERE
clause when both stores expose an 'id' column in ns_fast_access.
"""
orders_rows = [{"id": 101, "Quantity": 3}, {"id": 102, "Quantity": 5}]
orders_store = FakeStore(orders_rows)
products_rows = [{"id": 101, "Price": 10.0}, {"id": 102, "Price": 20.0}]
products_store = FakeStore(products_rows)
engine = make_engine({
"orders": orders_store,
"products": products_store,
})
engine.set_formula("orders", "total", "{products.Price} * {Quantity}")
engine.recalculate_if_needed("orders", orders_store)
totals = orders_store.ns_fast_access["total"]
assert totals[0] == 30, "Row with id=101: Price=10 * Qty=3"
assert totals[1] == 100, "Row with id=102: Price=20 * Qty=5"
def test_i_can_handle_cross_table_without_registry(self):
"""Test that a cross-table formula evaluates gracefully when no registry is set.
Why: _make_cross_table_resolver guards on registry_resolver=None (source line 274).
The formula must evaluate to None without raising, preserving engine stability.
"""
rows = [{"Quantity": 3}]
store = FakeStore(rows)
engine = FormulaEngine(registry_resolver=None)
engine.set_formula("orders", "total", "{products.Price} * {Quantity}")
engine.recalculate_if_needed("orders", store) # must not raise
assert store.ns_fast_access["total"][0] is None
def test_i_can_handle_cross_table_missing_table(self):
"""Test that a cross-table formula evaluates gracefully when the remote table is absent.
Why: Source line 282-284 returns None when registry_resolver returns None for
the requested table. The engine must not crash and must produce None for the row.
"""
rows = [{"Quantity": 3}]
orders_store = FakeStore(rows)
engine = make_engine({"orders": orders_store}) # "products" not in registry
engine.set_formula("orders", "total", "{products.Price} * {Quantity}")
engine.recalculate_if_needed("orders", orders_store) # must not raise
assert orders_store.ns_fast_access["total"][0] is None
# ==================== TestGetFormulaText ====================
class TestGetFormulaText:
"""Tests for formula text retrieval."""
def test_i_can_get_formula_text(self):
"""Test that registered formula text can be retrieved."""
engine = make_engine()
engine.set_formula("t", "col", "{Price} * 2")
assert engine.get_formula_text("t", "col") == "{Price} * 2"
def test_i_can_get_formula_text_returns_none_when_not_set(self):
"""Test that get_formula_text returns None for non-formula columns."""
engine = make_engine()
assert engine.get_formula_text("t", "non_existing") is None

View File

@@ -0,0 +1,188 @@
"""
Tests for the FormulaEvaluator.
"""
import pytest
from myfasthtml.core.formula.engine import parse_formula
from myfasthtml.core.formula.evaluator import FormulaEvaluator
def make_evaluator(resolver=None):
return FormulaEvaluator(cross_table_resolver=resolver)
def eval_formula(text, row_data, row_index=0, resolver=None):
"""Helper: parse and evaluate a formula."""
formula = parse_formula(text)
evaluator = make_evaluator(resolver)
return evaluator.evaluate(formula, row_data, row_index)
# ==================== Arithmetic ====================
@pytest.mark.parametrize("formula,row_data,expected", [
("{Price} * {Quantity}", {"Price": 10, "Quantity": 3}, 30.0),
("{Price} + {Tax}", {"Price": 100, "Tax": 20}, 120.0),
("{Total} - {Discount}", {"Total": 100, "Discount": 15}, 85.0),
("{Total} / {Count}", {"Total": 100, "Count": 4}, 25.0),
("{Value} % 3", {"Value": 10}, 1.0),
("{Base} ^ 2", {"Base": 5}, 25.0),
])
def test_i_can_evaluate_simple_arithmetic(formula, row_data, expected):
"""Test that arithmetic formulas evaluate correctly."""
result = eval_formula(formula, row_data)
assert result == pytest.approx(expected)
# ==================== Functions ====================
@pytest.mark.parametrize("formula,row_data,expected", [
("round({Price} * 1.2, 2)", {"Price": 10}, 12.0),
("abs({Balance})", {"Balance": -50}, 50.0),
("upper({Name})", {"Name": "hello"}, "HELLO"),
("lower({Name})", {"Name": "WORLD"}, "world"),
("len({Description})", {"Description": "abc"}, 3),
("concat({First}, \" \", {Last})", {"First": "John", "Last": "Doe"}, "John Doe"),
("left({Code}, 3)", {"Code": "ABCDEF"}, "ABC"),
("right({Code}, 3)", {"Code": "ABCDEF"}, "DEF"),
("trim({Name})", {"Name": " hello "}, "hello"),
])
def test_i_can_evaluate_function(formula, row_data, expected):
"""Test that built-in function calls evaluate correctly."""
result = eval_formula(formula, row_data)
assert result == expected
def test_i_can_evaluate_nested_functions():
"""Test that nested function calls evaluate correctly."""
result = eval_formula("round(abs({Val}), 1)", {"Val": -3.456})
assert result == 3.5
# ==================== Conditionals ====================
def test_i_can_evaluate_conditional_true():
"""Test conditional when condition is true."""
result = eval_formula('{Price} * 0.8 if {Country} == "FR"', {"Price": 100, "Country": "FR"})
assert result == 80.0
def test_i_can_evaluate_conditional_false_no_else():
"""Test conditional returns None when condition is false and no else."""
result = eval_formula('{Price} * 0.8 if {Country} == "FR"', {"Price": 100, "Country": "DE"})
assert result is None
def test_i_can_evaluate_conditional_with_else():
"""Test conditional returns else value when condition is false."""
result = eval_formula('{Price} * 0.8 if {Country} == "FR" else {Price}', {"Price": 100, "Country": "DE"})
assert result == 100.0
def test_i_can_evaluate_chained_conditional():
"""Test chained conditionals evaluate in order."""
formula = '{Price} * 0.8 if {Country} == "FR" else {Price} * 0.9 if {Country} == "DE" else {Price}'
assert eval_formula(formula, {"Price": 100, "Country": "FR"}) == pytest.approx(80.0)
assert eval_formula(formula, {"Price": 100, "Country": "DE"}) == pytest.approx(90.0)
assert eval_formula(formula, {"Price": 100, "Country": "US"}) == pytest.approx(100.0)
# ==================== Logical operators ====================
@pytest.mark.parametrize("formula,row_data,expected", [
("{A} and {B}", {"A": True, "B": True}, True),
("{A} and {B}", {"A": True, "B": False}, False),
("{A} or {B}", {"A": False, "B": True}, True),
("{A} or {B}", {"A": False, "B": False}, False),
("not {A}", {"A": True}, False),
("not {A}", {"A": False}, True),
])
def test_i_can_evaluate_logical_operators(formula, row_data, expected):
"""Test that logical operators evaluate correctly."""
result = eval_formula(formula, row_data)
assert result == expected
# ==================== Error handling ====================
def test_i_can_handle_division_by_zero():
"""Test that division by zero returns None."""
result = eval_formula("{A} / {B}", {"A": 10, "B": 0})
assert result is None
def test_i_can_handle_missing_column():
"""Test that missing column reference returns None."""
result = eval_formula("{NonExistent} * 2", {})
assert result is None
def test_i_can_handle_none_operand():
"""Test that operations with None operands return None."""
result = eval_formula("{A} * {B}", {"A": None, "B": 5})
assert result is None
# ==================== Cross-table references ====================
def test_i_can_evaluate_cross_table_ref():
"""Test cross-table reference with mock resolver."""
def mock_resolver(table, column, where_clause, row_index):
assert table == "Products"
assert column == "Price"
return 99.0
result = eval_formula("{Products.Price} * {Quantity}", {"Quantity": 3}, resolver=mock_resolver)
assert result == pytest.approx(297.0)
def test_i_can_evaluate_cross_table_with_where():
"""Test cross-table reference with WHERE clause and mock resolver."""
def mock_resolver(table, column, where_clause, row_index):
assert where_clause is not None
assert where_clause.local_column == "ProductCode"
return 50.0
result = eval_formula(
"{Products.Price where Products.Code = ProductCode} * {Qty}",
{"ProductCode": "ABC", "Qty": 2},
resolver=mock_resolver,
)
assert result == pytest.approx(100.0)
# ==================== Aggregation ====================
def test_i_can_evaluate_aggregation():
"""Test that aggregation functions work with cross-table resolver."""
values = [10.0, 20.0, 30.0]
call_count = [0]
def mock_resolver(table, column, where_clause, row_index):
# For aggregation test, return a list to simulate multi-row match
val = values[call_count[0] % len(values)]
call_count[0] += 1
return val
formula = parse_formula("sum({OrderLines.Amount where OrderLines.OrderId = Id})")
evaluator = FormulaEvaluator(cross_table_resolver=mock_resolver)
result = evaluator.evaluate(formula, {"Id": 1}, 0)
# sum() with a single cross-table value returned by resolver
assert result is not None
# ==================== String operations ====================
@pytest.mark.parametrize("formula,row_data,expected", [
('{Name} contains "Corp"', {"Name": "Acme Corp"}, True),
('{Code} startswith "ERR"', {"Code": "ERR001"}, True),
('{File} endswith ".csv"', {"File": "data.csv"}, True),
('{Status} in ["active", "new"]', {"Status": "active"}, True),
('{Status} in ["active", "new"]', {"Status": "deleted"}, False),
])
def test_i_can_evaluate_string_operations(formula, row_data, expected):
"""Test string comparison operations."""
result = eval_formula(formula, row_data)
assert result == expected

View File

@@ -0,0 +1,188 @@
"""
Tests for the formula parser (grammar + transformer integration).
"""
import pytest
from myfasthtml.core.formula.dataclasses import (
BinaryOp,
CrossTableRef,
ConditionalExpr,
FunctionCall,
LiteralNode,
FormulaDefinition,
)
from myfasthtml.core.formula.dsl.exceptions import FormulaSyntaxError
from myfasthtml.core.formula.engine import parse_formula
# ==================== Valid formulas ====================
@pytest.mark.parametrize("formula_text", [
"{Price} * {Quantity}",
"{Price} + {Tax}",
"{Total} - {Discount}",
"{Total} / {Count}",
"{Value} % 2",
"{Base} ^ 2",
])
def test_i_can_parse_simple_arithmetic(formula_text):
"""Test that basic arithmetic formulas parse without error."""
result = parse_formula(formula_text)
assert result is not None
assert isinstance(result, FormulaDefinition)
assert isinstance(result.expression, BinaryOp)
@pytest.mark.parametrize("formula_text,expected_func", [
("round({Price} * 1.2, 2)", "round"),
("abs({Balance})", "abs"),
("upper({Name})", "upper"),
("len({Description})", "len"),
("today()", "today"),
("concat({First}, \" \", {Last})", "concat"),
])
def test_i_can_parse_function_call(formula_text, expected_func):
"""Test that function calls parse correctly."""
result = parse_formula(formula_text)
assert result is not None
assert isinstance(result.expression, FunctionCall)
assert result.expression.function_name == expected_func
def test_i_can_parse_conditional_no_else():
"""Test that suffix-if without else parses correctly."""
result = parse_formula('{Price} * 0.8 if {Country} == "FR"')
assert result is not None
expr = result.expression
assert isinstance(expr, ConditionalExpr)
assert expr.else_expr is None
assert isinstance(expr.value_expr, BinaryOp)
def test_i_can_parse_conditional_with_else():
"""Test that suffix-if with else parses correctly."""
result = parse_formula('{Price} * 0.8 if {Country} == "FR" else {Price}')
assert result is not None
expr = result.expression
assert isinstance(expr, ConditionalExpr)
assert expr.else_expr is not None
def test_i_can_parse_chained_conditional():
"""Test that chained conditionals parse correctly."""
formula = '{Price} * 0.8 if {Country} == "FR" else {Price} * 0.9 if {Country} == "DE" else {Price}'
result = parse_formula(formula)
assert result is not None
expr = result.expression
assert isinstance(expr, ConditionalExpr)
# The else_expr should be another ConditionalExpr
assert isinstance(expr.else_expr, ConditionalExpr)
def test_i_can_parse_cross_table_ref():
"""Test that cross-table references parse correctly."""
result = parse_formula("{Products.Price} * {Quantity}")
assert result is not None
expr = result.expression
assert isinstance(expr, BinaryOp)
assert isinstance(expr.left, CrossTableRef)
assert expr.left.table == "Products"
assert expr.left.column == "Price"
def test_i_can_parse_cross_table_with_where():
"""Test that cross-table references with WHERE clause parse correctly."""
result = parse_formula("{Products.Price where Products.Code = ProductCode} * {Quantity}")
assert result is not None
expr = result.expression
assert isinstance(expr, BinaryOp)
cross_ref = expr.left
assert isinstance(cross_ref, CrossTableRef)
assert cross_ref.where_clause is not None
assert cross_ref.where_clause.remote_table == "Products"
assert cross_ref.where_clause.remote_column == "Code"
assert cross_ref.where_clause.local_column == "ProductCode"
@pytest.mark.parametrize("formula_text,expected_op", [
("{A} and {B}", "and"),
("{A} or {B}", "or"),
("not {A}", "not"),
])
def test_i_can_parse_logical_operators(formula_text, expected_op):
"""Test that logical operators parse correctly."""
result = parse_formula(formula_text)
assert result is not None
if expected_op == "not":
from myfasthtml.core.formula.dataclasses import UnaryOp
assert isinstance(result.expression, UnaryOp)
else:
assert isinstance(result.expression, BinaryOp)
assert result.expression.operator == expected_op
@pytest.mark.parametrize("formula_text", [
"42",
"3.14",
'"hello"',
"true",
"false",
])
def test_i_can_parse_literals(formula_text):
"""Test that literal values parse correctly."""
result = parse_formula(formula_text)
assert result is not None
assert isinstance(result.expression, LiteralNode)
def test_i_can_parse_aggregation():
"""Test that aggregation with cross-table WHERE parses correctly."""
result = parse_formula("sum({OrderLines.Amount where OrderLines.OrderId = Id})")
assert result is not None
expr = result.expression
assert isinstance(expr, FunctionCall)
assert expr.function_name == "sum"
assert len(expr.arguments) == 1
arg = expr.arguments[0]
assert isinstance(arg, CrossTableRef)
assert arg.where_clause is not None
def test_i_can_parse_empty_formula():
"""Test that empty formula returns None."""
result = parse_formula("")
assert result is None
def test_i_can_parse_whitespace_formula():
"""Test that whitespace-only formula returns None."""
result = parse_formula(" ")
assert result is None
def test_i_can_parse_nested_functions():
"""Test that nested function calls parse correctly."""
result = parse_formula("round(avg({Q1}, {Q2}, {Q3}), 1)")
assert result is not None
expr = result.expression
assert isinstance(expr, FunctionCall)
assert expr.function_name == "round"
# ==================== Invalid formulas ====================
@pytest.mark.parametrize("formula_text", [
"{Price} * * {Quantity}", # double operator
"round(", # unclosed paren
"123 + + 456", # double operator
])
def test_i_cannot_parse_invalid_syntax(formula_text):
"""Test that invalid syntax raises FormulaSyntaxError."""
with pytest.raises(FormulaSyntaxError):
parse_formula(formula_text)
def test_i_cannot_parse_unclosed_brace():
"""Test that an unclosed brace raises FormulaSyntaxError."""
with pytest.raises(FormulaSyntaxError):
parse_formula("{Price")