import time from datetime import datetime import pytest from myfasthtml.core.profiler import ( ProfilingManager, ProfilingSpan, CumulativeSpan, ProfilingTrace, _NullSpan, ) @pytest.fixture(autouse=True) def fresh_profiler(): """Provide a fresh ProfilingManager for each test.""" p = ProfilingManager(max_traces=10) p.enabled = True return p # --------------------------------------------------------------------------- # TestProfilingModels — data model tests (ProfilingSpan, CumulativeSpan) # --------------------------------------------------------------------------- class TestProfilingModels: def test_i_can_compute_avg_ms_when_no_iterations(self): """Test that avg_ms returns 0.0 on a fresh CumulativeSpan to avoid division by zero.""" cum = CumulativeSpan(name="empty") assert cum.avg_ms == 0.0 assert cum.total_ms == 0.0 assert cum.count == 0 assert cum.min_ms == float('inf') assert cum.max_ms == 0 @pytest.mark.parametrize("durations, expected_min, expected_max, expected_total, expected_avg", [ ([10.0, 5.0, 8.0], 5.0, 10.0, 23.0, 23.0 / 3), ([1.0], 1.0, 1.0, 1.0, 1.0), ([3.0, 3.0, 3.0], 3.0, 3.0, 9.0, 3.0), ]) def test_i_can_aggregate_iterations_in_cumulative_span( self, durations, expected_min, expected_max, expected_total, expected_avg ): """Test that CumulativeSpan correctly aggregates all metrics across iterations.""" cum = CumulativeSpan(name="probe") for d in durations: cum.record(d) assert cum.count == len(durations) assert cum.min_ms == expected_min assert cum.max_ms == expected_max assert cum.total_ms == expected_total assert cum.avg_ms == pytest.approx(expected_avg) def test_i_can_chain_set_calls_on_span(self, fresh_profiler): """Test that set() returns self to allow fluent chaining.""" p = fresh_profiler with p.span("query") as s: result = s.set("table", "orders").set("rows", 42) assert result is s assert s.data["table"] == "orders" assert s.data["rows"] == 42 def test_i_can_access_span_data_after_context_exits(self, fresh_profiler): """Test that span data and duration persist after the with block has exited.""" p = fresh_profiler with p.span("query") as s: s.set("key", "value") assert s.data["key"] == "value" assert s.duration_ms > 0 # --------------------------------------------------------------------------- # TestSpan — profiler.span() context manager and decorator # --------------------------------------------------------------------------- class TestSpan: def test_i_can_create_a_span(self, fresh_profiler): """Test that a span is created with correct name and measured duration.""" p = fresh_profiler with p.span("my_span") as s: time.sleep(0.01) assert isinstance(s, ProfilingSpan) assert s.name == "my_span" assert s.duration_ms >= 10 def test_i_can_nest_spans(self, fresh_profiler): """Test that a span created inside another becomes its child.""" p = fresh_profiler with p.span("parent") as parent: with p.span("child") as child: pass assert len(parent.children) == 1 assert parent.children[0] is child assert child.name == "child" def test_i_can_have_sibling_spans(self, fresh_profiler): """Test that consecutive spans under the same parent are recorded as siblings.""" p = fresh_profiler with p.span("parent") as parent: with p.span("child_a"): pass with p.span("child_b"): pass assert len(parent.children) == 2 assert parent.children[0].name == "child_a" assert parent.children[1].name == "child_b" def test_i_can_create_two_spans_with_same_name_under_same_parent(self, fresh_profiler): """Test that two normal spans with the same name under the same parent are two distinct entries. This contrasts with cumulative spans, which merge same-name entries into one aggregated entry. """ p = fresh_profiler with p.span("parent") as parent: with p.span("work"): pass with p.span("work"): pass assert len(parent.children) == 2, "Normal spans with the same name must remain separate entries" assert parent.children[0] is not parent.children[1] def test_i_can_use_same_span_name_under_different_parents(self, fresh_profiler): """Test that spans with the same name under different parents are independent objects.""" p = fresh_profiler with p.span("root"): with p.span("parent_a") as parent_a: with p.span("work"): pass with p.span("parent_b") as parent_b: with p.span("work"): pass span_a = parent_a.children[0] span_b = parent_b.children[0] assert span_a is not span_b, "Same name under different parents must be separate objects" def test_i_can_use_span_as_decorator(self, fresh_profiler): """Test that @span wraps a function and captures all positional arguments.""" p = fresh_profiler @p.span("decorated") def my_func(x, y): return x + y with p.span("root") as root: result = my_func(1, 2) assert result == 3 assert len(root.children) == 1 child = root.children[0] assert child.name == "decorated" assert child.data.get("x") == "1" assert child.data.get("y") == "2" def test_i_can_capture_all_args_with_span_decorator(self, fresh_profiler): """Test that all positional and keyword arguments are captured by the @span decorator.""" p = fresh_profiler @p.span("compute") def my_func(x, y, z=10): return x + y + z with p.span("root") as root: my_func(1, 2, z=3) child = root.children[0] assert child.data.get("x") == "1" assert child.data.get("y") == "2" assert child.data.get("z") == "3" def test_i_can_exclude_self_from_captured_args_with_span_decorator(self, fresh_profiler): """Test that 'self' is not included in the captured args of a decorated method.""" p = fresh_profiler class MyClass: @p.span("method") def my_method(self, value): return value with p.span("root") as root: MyClass().my_method(42) child = root.children[0] assert "self" not in child.data assert child.data.get("value") == "42" def test_i_can_use_span_decorator_without_parent(self, fresh_profiler): """Test that a decorated function runs correctly with no active parent span.""" p = fresh_profiler @p.span("solo") def my_func(): return 42 result = my_func() assert result == 42 def test_i_can_attach_data_to_span_via_context_manager(self, fresh_profiler): """Test that metadata can be attached to a span inside the with block.""" p = fresh_profiler with p.span("query") as s: s.set("row_count", 42) s.set("table", "users") assert s.data["row_count"] == 42 assert s.data["table"] == "users" def test_i_can_attach_data_via_current_span(self, fresh_profiler): """Test that current_span().set() attaches metadata to the active span from anywhere.""" p = fresh_profiler @p.span("process") def process(): p.current_span().set("result", "ok") with p.span("root") as root: process() child = root.children[0] assert child.data["result"] == "ok" def test_i_can_pass_args_to_span_context_manager(self, fresh_profiler): """Test that metadata can be pre-attached to a span via the args parameter.""" p = fresh_profiler with p.span("query", args={"table": "orders"}) as s: pass assert s.data["table"] == "orders" def test_i_can_get_none_from_current_span_when_no_span_active(self, fresh_profiler): """Test that current_span() returns None when called outside any span context.""" p = fresh_profiler assert p.current_span() is None def test_i_can_get_innermost_span_via_current_span(self, fresh_profiler): """Test that current_span() returns the innermost active span in nested contexts.""" p = fresh_profiler with p.span("outer"): with p.span("inner") as inner: assert p.current_span() is inner # --------------------------------------------------------------------------- # TestCumulativeSpan — profiler.cumulative_span() context manager and decorator # --------------------------------------------------------------------------- class TestCumulativeSpan: def test_i_can_use_cumulative_span(self, fresh_profiler): """Test that repeated iterations are aggregated into a single child entry.""" p = fresh_profiler with p.span("loop") as loop_span: for _ in range(5): with p.cumulative_span("item"): time.sleep(0.001) assert len(loop_span.children) == 1 cum = loop_span.children[0] assert isinstance(cum, CumulativeSpan) assert cum.count == 5 assert cum.total_ms >= 5 assert cum.min_ms <= cum.avg_ms <= cum.max_ms def test_i_can_use_cumulative_span_as_decorator(self, fresh_profiler): """Test that @cumulative_span aggregates all decorated function calls.""" p = fresh_profiler @p.cumulative_span("item") def process_item(x): return x * 2 with p.span("loop") as loop_span: for i in range(3): process_item(i) assert len(loop_span.children) == 1 cum = loop_span.children[0] assert cum.count == 3 def test_i_can_continue_using_the_cumulative_span(self, fresh_profiler): """Test that the same cumulative span accumulates across separate loop blocks.""" p = fresh_profiler with p.span("parent") as parent: for _ in range(3): with p.cumulative_span("reads"): pass for _ in range(2): with p.cumulative_span("reads"): pass assert len(parent.children) == 1 reads = parent.children[0] assert reads.count == 5 def test_i_can_have_two_cumulative_spans_with_different_names(self, fresh_profiler): """Test that two cumulative spans with different names create separate entries in the parent.""" p = fresh_profiler with p.span("parent") as parent: for _ in range(3): with p.cumulative_span("reads"): pass for _ in range(2): with p.cumulative_span("writes"): pass assert len(parent.children) == 2 reads = next(c for c in parent.children if c.name == "reads") writes = next(c for c in parent.children if c.name == "writes") assert reads.count == 3 assert writes.count == 2 def test_i_can_use_same_cumulative_span_name_under_different_parents(self, fresh_profiler): """Test that cumulative spans with the same name under different parents are independent.""" p = fresh_profiler with p.span("root"): with p.span("parent_a") as parent_a: for _ in range(3): with p.cumulative_span("items"): pass with p.span("parent_b") as parent_b: for _ in range(2): with p.cumulative_span("items"): pass items_a = parent_a.children[0] items_b = parent_b.children[0] assert items_a is not items_b, "Same name under different parents must be separate objects" assert items_a.count == 3 assert items_b.count == 2 # --------------------------------------------------------------------------- # TestCommandSpan — profiler.command_span() and trace recording # --------------------------------------------------------------------------- class TestCommandSpan: def test_i_can_record_a_trace_via_command_span(self, fresh_profiler): """Test that command_span creates a complete trace with root span and children.""" p = fresh_profiler with p.command_span("NavigateCell", "abc-123", {"row": "5"}): with p.span("callback"): time.sleep(0.01) assert len(p.traces) == 1 trace = p.traces[0] assert isinstance(trace, ProfilingTrace) assert trace.command_name == "NavigateCell" assert trace.command_id == "abc-123" assert trace.kwargs == {"row": "5"} assert trace.total_duration_ms >= 10 assert len(trace.root_span.children) == 1 assert trace.root_span.children[0].name == "callback" def test_i_cannot_record_trace_when_profiler_disabled(self, fresh_profiler): """Test that command_span is a no-op when the profiler is disabled.""" p = fresh_profiler p.enabled = False with p.command_span("cmd", "id", {}): pass assert len(p.traces) == 0 def test_i_can_record_up_to_max_traces(self, fresh_profiler): """Test that the trace buffer respects the max_traces limit (FIFO eviction).""" p = fresh_profiler for i in range(15): with p.command_span(f"cmd_{i}", str(i), {}): pass assert len(p.traces) == 10, "Buffer should cap at max_traces=10" def test_i_can_access_trace_timestamp(self, fresh_profiler): """Test that a recorded trace contains a valid datetime timestamp.""" p = fresh_profiler before = datetime.now() with p.command_span("MyCmd", "id-001", {}): pass after = datetime.now() trace = p.traces[0] assert before <= trace.timestamp <= after def test_i_can_verify_kwargs_are_copied_in_command_span(self, fresh_profiler): """Test that mutating the original kwargs dict does not affect the recorded trace.""" p = fresh_profiler kwargs = {"row": "5", "col": "2"} with p.command_span("MyCmd", "id-001", kwargs): pass kwargs["row"] = "99" trace = p.traces[0] assert trace.kwargs["row"] == "5" # --------------------------------------------------------------------------- # TestTraceAll — profiler.trace_all() class decorator # --------------------------------------------------------------------------- class TestTraceAll: def test_i_can_use_trace_all_on_class(self, fresh_profiler): """Test that trace_all wraps all non-dunder methods of a class.""" p = fresh_profiler @p.trace_all class MyClass: def method_a(self): return "a" def method_b(self): return "b" def __repr__(self): return "MyClass()" obj = MyClass() with p.span("root") as root: obj.method_a() obj.method_b() assert len(root.children) == 2 assert root.children[0].name == "method_a" assert root.children[1].name == "method_b" def test_i_can_use_trace_all_with_exclude(self, fresh_profiler): """Test that excluded methods are not wrapped by trace_all.""" p = fresh_profiler @p.trace_all(exclude=["method_b"]) class MyClass: def method_a(self): return "a" def method_b(self): return "b" obj = MyClass() with p.span("root") as root: obj.method_a() obj.method_b() assert len(root.children) == 1 assert root.children[0].name == "method_a" def test_i_can_confirm_trace_all_skips_dunder_methods(self, fresh_profiler): """Test that trace_all does not wrap dunder methods like __repr__.""" p = fresh_profiler call_log = [] @p.trace_all class MyClass: def __repr__(self): call_log.append("repr") return "MyClass()" def method_a(self): return "a" obj = MyClass() with p.span("root") as root: repr(obj) obj.method_a() child_names = [c.name for c in root.children] assert "method_a" in child_names assert "__repr__" not in child_names assert "repr" in call_log def test_i_can_use_trace_all_with_parentheses_and_no_exclude(self, fresh_profiler): """Test that @profiler.trace_all() with parentheses and no args behaves like @profiler.trace_all.""" p = fresh_profiler @p.trace_all() class MyClass: def method_a(self): return "a" obj = MyClass() with p.span("root") as root: obj.method_a() assert len(root.children) == 1 assert root.children[0].name == "method_a" # --------------------------------------------------------------------------- # TestTraceCalls — profiler.trace_calls() function decorator # --------------------------------------------------------------------------- class TestTraceCalls: def test_i_can_use_trace_calls_on_function(self, fresh_profiler): """Test that trace_calls traces all sub-calls as children of the decorated function span. Verifies both direct children (helper_a, helper_b under main_func) and nested hierarchy (grandchild under helper_a, not under main_func). """ p = fresh_profiler def grandchild(): return 0 def helper_a(): grandchild() return 1 def helper_b(): return 2 @p.trace_calls def main_func(): helper_a() helper_b() return 42 with p.span("root") as root: main_func() assert len(root.children) == 1 main_span = root.children[0] assert main_span.name == "main_func" assert len(main_span.children) == 2, "main_func should have exactly 2 direct children" child_names = [c.name for c in main_span.children] assert "helper_a" in child_names assert "helper_b" in child_names helper_a_span = next(c for c in main_span.children if c.name == "helper_a") assert len(helper_a_span.children) == 1, "grandchild must be nested under helper_a, not main_func" assert helper_a_span.children[0].name == "grandchild" def test_i_cannot_use_trace_calls_when_disabled(self, fresh_profiler): """Test that trace_calls creates no spans when the profiler is disabled at call time.""" p = fresh_profiler @p.trace_calls def main_func(): return 99 with p.span("root") as root: p.enabled = False main_func() p.enabled = True assert len(root.children) == 0, "trace_calls should not create spans when profiler is disabled" def test_i_can_verify_trace_calls_captures_function_args(self, fresh_profiler): """Test that trace_calls captures the decorated function's arguments in the root span data.""" p = fresh_profiler @p.trace_calls def compute(x, y): return x + y with p.span("root") as root: compute(3, 7) main_span = root.children[0] assert main_span.name == "compute" assert main_span.data.get("x") == "3" assert main_span.data.get("y") == "7" # --------------------------------------------------------------------------- # TestProfilingManager — enable/disable, clear, overhead # --------------------------------------------------------------------------- class TestProfilingManager: def test_i_can_enable_disable_profiler(self, fresh_profiler): """Test that disabling the profiler makes span() return a no-op NullSpan.""" p = fresh_profiler p.enabled = False with p.span("ignored") as s: pass assert isinstance(s, _NullSpan) def test_i_can_use_decorator_when_profiler_is_disabled(self, fresh_profiler): """Test that a @span decorated function still executes correctly when profiler is disabled.""" p = fresh_profiler @p.span("my_span") def my_func(): return "result" p.enabled = False result = my_func() assert result == "result" def test_i_can_toggle_profiler_at_runtime(self, fresh_profiler): """Test that spans are captured only while the profiler is enabled. Three phases: enabled -> disabled -> re-enabled, verifying capture behavior at each step. """ p = fresh_profiler @p.span("traced") def my_func(): return 1 # Phase 1: enabled — span must be captured with p.span("root") as root: my_func() assert len(root.children) == 1, "Span should be captured when profiler is enabled" # Phase 2: disabled — p.span() returns NullSpan, nothing captured p.enabled = False with p.span("root_disabled") as root_disabled: my_func() assert isinstance(root_disabled, _NullSpan), "p.span() should return NullSpan when disabled" # Phase 3: re-enabled — span must be captured again p.enabled = True with p.span("root_reenabled") as root_reenabled: my_func() assert len(root_reenabled.children) == 1, "Span should be captured again after re-enabling" def test_i_can_clear_traces(self, fresh_profiler): """Test that clear() empties the trace buffer completely.""" p = fresh_profiler with p.command_span("cmd", "uuid-1", {}): pass with p.command_span("cmd", "uuid-2", {}): pass assert len(p.traces) == 2 p.clear() assert len(p.traces) == 0 def test_i_can_measure_overhead(self, fresh_profiler): """Test that overhead metrics are populated after spans are recorded.""" p = fresh_profiler for _ in range(20): with p.span("probe"): pass assert p.overhead_per_span_us >= 0 assert p.total_overhead_ms >= 0 def test_i_can_get_zero_overhead_when_no_samples(self, fresh_profiler): """Test that overhead_per_span_us returns 0.0 when no spans have been recorded.""" p = fresh_profiler assert p.overhead_per_span_us == 0.0 def test_i_can_get_zero_total_overhead_when_buffer_empty(self, fresh_profiler): """Test that total_overhead_ms returns 0.0 when the trace buffer is empty.""" p = fresh_profiler assert p.total_overhead_ms == 0.0