679 lines
22 KiB
Python
679 lines
22 KiB
Python
import time
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
|
|
from myfasthtml.core.profiler import (
|
|
ProfilingManager,
|
|
ProfilingSpan,
|
|
CumulativeSpan,
|
|
ProfilingTrace,
|
|
_NullSpan,
|
|
)
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def fresh_profiler():
|
|
"""Provide a fresh ProfilingManager for each test."""
|
|
p = ProfilingManager(max_traces=10)
|
|
p.enabled = True
|
|
return p
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestProfilingModels — data model tests (ProfilingSpan, CumulativeSpan)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestProfilingModels:
|
|
|
|
def test_i_can_compute_avg_ms_when_no_iterations(self):
|
|
"""Test that avg_ms returns 0.0 on a fresh CumulativeSpan to avoid division by zero."""
|
|
cum = CumulativeSpan(name="empty")
|
|
assert cum.avg_ms == 0.0
|
|
assert cum.total_ms == 0.0
|
|
assert cum.count == 0
|
|
assert cum.min_ms == float('inf')
|
|
assert cum.max_ms == 0
|
|
|
|
@pytest.mark.parametrize("durations, expected_min, expected_max, expected_total, expected_avg", [
|
|
([10.0, 5.0, 8.0], 5.0, 10.0, 23.0, 23.0 / 3),
|
|
([1.0], 1.0, 1.0, 1.0, 1.0),
|
|
([3.0, 3.0, 3.0], 3.0, 3.0, 9.0, 3.0),
|
|
])
|
|
def test_i_can_aggregate_iterations_in_cumulative_span(
|
|
self, durations, expected_min, expected_max, expected_total, expected_avg
|
|
):
|
|
"""Test that CumulativeSpan correctly aggregates all metrics across iterations."""
|
|
cum = CumulativeSpan(name="probe")
|
|
for d in durations:
|
|
cum.record(d)
|
|
|
|
assert cum.count == len(durations)
|
|
assert cum.min_ms == expected_min
|
|
assert cum.max_ms == expected_max
|
|
assert cum.total_ms == expected_total
|
|
assert cum.avg_ms == pytest.approx(expected_avg)
|
|
|
|
def test_i_can_chain_set_calls_on_span(self, fresh_profiler):
|
|
"""Test that set() returns self to allow fluent chaining."""
|
|
p = fresh_profiler
|
|
with p.span("query") as s:
|
|
result = s.set("table", "orders").set("rows", 42)
|
|
|
|
assert result is s
|
|
assert s.data["table"] == "orders"
|
|
assert s.data["rows"] == 42
|
|
|
|
def test_i_can_access_span_data_after_context_exits(self, fresh_profiler):
|
|
"""Test that span data and duration persist after the with block has exited."""
|
|
p = fresh_profiler
|
|
with p.span("query") as s:
|
|
s.set("key", "value")
|
|
|
|
assert s.data["key"] == "value"
|
|
assert s.duration_ms > 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestSpan — profiler.span() context manager and decorator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSpan:
|
|
|
|
def test_i_can_create_a_span(self, fresh_profiler):
|
|
"""Test that a span is created with correct name and measured duration."""
|
|
p = fresh_profiler
|
|
with p.span("my_span") as s:
|
|
time.sleep(0.01)
|
|
|
|
assert isinstance(s, ProfilingSpan)
|
|
assert s.name == "my_span"
|
|
assert s.duration_ms >= 10
|
|
|
|
def test_i_can_nest_spans(self, fresh_profiler):
|
|
"""Test that a span created inside another becomes its child."""
|
|
p = fresh_profiler
|
|
with p.span("parent") as parent:
|
|
with p.span("child") as child:
|
|
pass
|
|
|
|
assert len(parent.children) == 1
|
|
assert parent.children[0] is child
|
|
assert child.name == "child"
|
|
|
|
def test_i_can_have_sibling_spans(self, fresh_profiler):
|
|
"""Test that consecutive spans under the same parent are recorded as siblings."""
|
|
p = fresh_profiler
|
|
with p.span("parent") as parent:
|
|
with p.span("child_a"):
|
|
pass
|
|
with p.span("child_b"):
|
|
pass
|
|
|
|
assert len(parent.children) == 2
|
|
assert parent.children[0].name == "child_a"
|
|
assert parent.children[1].name == "child_b"
|
|
|
|
def test_i_can_create_two_spans_with_same_name_under_same_parent(self, fresh_profiler):
|
|
"""Test that two normal spans with the same name under the same parent are two distinct entries.
|
|
|
|
This contrasts with cumulative spans, which merge same-name entries into one aggregated entry.
|
|
"""
|
|
p = fresh_profiler
|
|
with p.span("parent") as parent:
|
|
with p.span("work"):
|
|
pass
|
|
with p.span("work"):
|
|
pass
|
|
|
|
assert len(parent.children) == 2, "Normal spans with the same name must remain separate entries"
|
|
assert parent.children[0] is not parent.children[1]
|
|
|
|
def test_i_can_use_same_span_name_under_different_parents(self, fresh_profiler):
|
|
"""Test that spans with the same name under different parents are independent objects."""
|
|
p = fresh_profiler
|
|
with p.span("root"):
|
|
with p.span("parent_a") as parent_a:
|
|
with p.span("work"):
|
|
pass
|
|
with p.span("parent_b") as parent_b:
|
|
with p.span("work"):
|
|
pass
|
|
|
|
span_a = parent_a.children[0]
|
|
span_b = parent_b.children[0]
|
|
assert span_a is not span_b, "Same name under different parents must be separate objects"
|
|
|
|
def test_i_can_use_span_as_decorator(self, fresh_profiler):
|
|
"""Test that @span wraps a function and captures all positional arguments."""
|
|
p = fresh_profiler
|
|
|
|
@p.span("decorated")
|
|
def my_func(x, y):
|
|
return x + y
|
|
|
|
with p.span("root") as root:
|
|
result = my_func(1, 2)
|
|
|
|
assert result == 3
|
|
assert len(root.children) == 1
|
|
child = root.children[0]
|
|
assert child.name == "decorated"
|
|
assert child.data.get("x") == "1"
|
|
assert child.data.get("y") == "2"
|
|
|
|
def test_i_can_capture_all_args_with_span_decorator(self, fresh_profiler):
|
|
"""Test that all positional and keyword arguments are captured by the @span decorator."""
|
|
p = fresh_profiler
|
|
|
|
@p.span("compute")
|
|
def my_func(x, y, z=10):
|
|
return x + y + z
|
|
|
|
with p.span("root") as root:
|
|
my_func(1, 2, z=3)
|
|
|
|
child = root.children[0]
|
|
assert child.data.get("x") == "1"
|
|
assert child.data.get("y") == "2"
|
|
assert child.data.get("z") == "3"
|
|
|
|
def test_i_can_exclude_self_from_captured_args_with_span_decorator(self, fresh_profiler):
|
|
"""Test that 'self' is not included in the captured args of a decorated method."""
|
|
p = fresh_profiler
|
|
|
|
class MyClass:
|
|
@p.span("method")
|
|
def my_method(self, value):
|
|
return value
|
|
|
|
with p.span("root") as root:
|
|
MyClass().my_method(42)
|
|
|
|
child = root.children[0]
|
|
assert "self" not in child.data
|
|
assert child.data.get("value") == "42"
|
|
|
|
def test_i_can_use_span_decorator_without_parent(self, fresh_profiler):
|
|
"""Test that a decorated function runs correctly with no active parent span."""
|
|
p = fresh_profiler
|
|
|
|
@p.span("solo")
|
|
def my_func():
|
|
return 42
|
|
|
|
result = my_func()
|
|
assert result == 42
|
|
|
|
def test_i_can_attach_data_to_span_via_context_manager(self, fresh_profiler):
|
|
"""Test that metadata can be attached to a span inside the with block."""
|
|
p = fresh_profiler
|
|
with p.span("query") as s:
|
|
s.set("row_count", 42)
|
|
s.set("table", "users")
|
|
|
|
assert s.data["row_count"] == 42
|
|
assert s.data["table"] == "users"
|
|
|
|
def test_i_can_attach_data_via_current_span(self, fresh_profiler):
|
|
"""Test that current_span().set() attaches metadata to the active span from anywhere."""
|
|
p = fresh_profiler
|
|
|
|
@p.span("process")
|
|
def process():
|
|
p.current_span().set("result", "ok")
|
|
|
|
with p.span("root") as root:
|
|
process()
|
|
|
|
child = root.children[0]
|
|
assert child.data["result"] == "ok"
|
|
|
|
def test_i_can_pass_args_to_span_context_manager(self, fresh_profiler):
|
|
"""Test that metadata can be pre-attached to a span via the args parameter."""
|
|
p = fresh_profiler
|
|
with p.span("query", args={"table": "orders"}) as s:
|
|
pass
|
|
|
|
assert s.data["table"] == "orders"
|
|
|
|
def test_i_can_get_none_from_current_span_when_no_span_active(self, fresh_profiler):
|
|
"""Test that current_span() returns None when called outside any span context."""
|
|
p = fresh_profiler
|
|
assert p.current_span() is None
|
|
|
|
def test_i_can_get_innermost_span_via_current_span(self, fresh_profiler):
|
|
"""Test that current_span() returns the innermost active span in nested contexts."""
|
|
p = fresh_profiler
|
|
with p.span("outer"):
|
|
with p.span("inner") as inner:
|
|
assert p.current_span() is inner
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCumulativeSpan — profiler.cumulative_span() context manager and decorator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCumulativeSpan:
|
|
|
|
def test_i_can_use_cumulative_span(self, fresh_profiler):
|
|
"""Test that repeated iterations are aggregated into a single child entry."""
|
|
p = fresh_profiler
|
|
with p.span("loop") as loop_span:
|
|
for _ in range(5):
|
|
with p.cumulative_span("item"):
|
|
time.sleep(0.001)
|
|
|
|
assert len(loop_span.children) == 1
|
|
cum = loop_span.children[0]
|
|
assert isinstance(cum, CumulativeSpan)
|
|
assert cum.count == 5
|
|
assert cum.total_ms >= 5
|
|
assert cum.min_ms <= cum.avg_ms <= cum.max_ms
|
|
|
|
def test_i_can_use_cumulative_span_as_decorator(self, fresh_profiler):
|
|
"""Test that @cumulative_span aggregates all decorated function calls."""
|
|
p = fresh_profiler
|
|
|
|
@p.cumulative_span("item")
|
|
def process_item(x):
|
|
return x * 2
|
|
|
|
with p.span("loop") as loop_span:
|
|
for i in range(3):
|
|
process_item(i)
|
|
|
|
assert len(loop_span.children) == 1
|
|
cum = loop_span.children[0]
|
|
assert cum.count == 3
|
|
|
|
def test_i_can_continue_using_the_cumulative_span(self, fresh_profiler):
|
|
"""Test that the same cumulative span accumulates across separate loop blocks."""
|
|
p = fresh_profiler
|
|
with p.span("parent") as parent:
|
|
for _ in range(3):
|
|
with p.cumulative_span("reads"):
|
|
pass
|
|
for _ in range(2):
|
|
with p.cumulative_span("reads"):
|
|
pass
|
|
|
|
assert len(parent.children) == 1
|
|
reads = parent.children[0]
|
|
assert reads.count == 5
|
|
|
|
def test_i_can_have_two_cumulative_spans_with_different_names(self, fresh_profiler):
|
|
"""Test that two cumulative spans with different names create separate entries in the parent."""
|
|
p = fresh_profiler
|
|
with p.span("parent") as parent:
|
|
for _ in range(3):
|
|
with p.cumulative_span("reads"):
|
|
pass
|
|
for _ in range(2):
|
|
with p.cumulative_span("writes"):
|
|
pass
|
|
|
|
assert len(parent.children) == 2
|
|
reads = next(c for c in parent.children if c.name == "reads")
|
|
writes = next(c for c in parent.children if c.name == "writes")
|
|
assert reads.count == 3
|
|
assert writes.count == 2
|
|
|
|
def test_i_can_use_same_cumulative_span_name_under_different_parents(self, fresh_profiler):
|
|
"""Test that cumulative spans with the same name under different parents are independent."""
|
|
p = fresh_profiler
|
|
with p.span("root"):
|
|
with p.span("parent_a") as parent_a:
|
|
for _ in range(3):
|
|
with p.cumulative_span("items"):
|
|
pass
|
|
with p.span("parent_b") as parent_b:
|
|
for _ in range(2):
|
|
with p.cumulative_span("items"):
|
|
pass
|
|
|
|
items_a = parent_a.children[0]
|
|
items_b = parent_b.children[0]
|
|
assert items_a is not items_b, "Same name under different parents must be separate objects"
|
|
assert items_a.count == 3
|
|
assert items_b.count == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestCommandSpan — profiler.command_span() and trace recording
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestCommandSpan:
|
|
|
|
def test_i_can_record_a_trace_via_command_span(self, fresh_profiler):
|
|
"""Test that command_span creates a complete trace with root span and children."""
|
|
p = fresh_profiler
|
|
with p.command_span("NavigateCell", "", "abc-123", {"row": "5"}):
|
|
with p.span("callback"):
|
|
time.sleep(0.01)
|
|
|
|
assert len(p.traces) == 1
|
|
trace = p.traces[0]
|
|
assert isinstance(trace, ProfilingTrace)
|
|
assert trace.command_name == "NavigateCell"
|
|
assert trace.command_id == "abc-123"
|
|
assert trace.kwargs == {"row": "5"}
|
|
assert trace.total_duration_ms >= 10
|
|
assert len(trace.root_span.children) == 1
|
|
assert trace.root_span.children[0].name == "callback"
|
|
|
|
def test_i_cannot_record_trace_when_profiler_disabled(self, fresh_profiler):
|
|
"""Test that command_span is a no-op when the profiler is disabled."""
|
|
p = fresh_profiler
|
|
p.enabled = False
|
|
with p.command_span("cmd", "", "id", {}):
|
|
pass
|
|
|
|
assert len(p.traces) == 0
|
|
|
|
def test_i_can_record_up_to_max_traces(self, fresh_profiler):
|
|
"""Test that the trace buffer respects the max_traces limit (FIFO eviction)."""
|
|
p = fresh_profiler
|
|
for i in range(15):
|
|
with p.command_span(f"cmd_{i}", "", str(i), {}):
|
|
pass
|
|
|
|
assert len(p.traces) == 10, "Buffer should cap at max_traces=10"
|
|
|
|
def test_i_can_access_trace_timestamp(self, fresh_profiler):
|
|
"""Test that a recorded trace contains a valid datetime timestamp."""
|
|
p = fresh_profiler
|
|
before = datetime.now()
|
|
with p.command_span("MyCmd", "", "id-001", {}):
|
|
pass
|
|
after = datetime.now()
|
|
|
|
trace = p.traces[0]
|
|
assert before <= trace.timestamp <= after
|
|
|
|
def test_i_can_verify_kwargs_are_copied_in_command_span(self, fresh_profiler):
|
|
"""Test that mutating the original kwargs dict does not affect the recorded trace."""
|
|
p = fresh_profiler
|
|
kwargs = {"row": "5", "col": "2"}
|
|
with p.command_span("MyCmd", "", "id-001", kwargs):
|
|
pass
|
|
|
|
kwargs["row"] = "99"
|
|
trace = p.traces[0]
|
|
assert trace.kwargs["row"] == "5"
|
|
|
|
def test_i_can_record_command_description_in_trace(self, fresh_profiler):
|
|
"""Test that the command description passed to command_span is stored in the trace."""
|
|
p = fresh_profiler
|
|
with p.command_span("NavigateCell", "Navigate to adjacent cell", "abc-123", {}):
|
|
pass
|
|
|
|
trace = p.traces[0]
|
|
assert trace.command_description == "Navigate to adjacent cell"
|
|
|
|
def test_i_can_record_empty_description_in_trace(self, fresh_profiler):
|
|
"""Test that an empty description is stored as-is in the trace."""
|
|
p = fresh_profiler
|
|
with p.command_span("MyCmd", "", "id-001", {}):
|
|
pass
|
|
|
|
trace = p.traces[0]
|
|
assert trace.command_description == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestTraceAll — profiler.trace_all() class decorator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTraceAll:
|
|
|
|
def test_i_can_use_trace_all_on_class(self, fresh_profiler):
|
|
"""Test that trace_all wraps all non-dunder methods of a class."""
|
|
p = fresh_profiler
|
|
|
|
@p.trace_all
|
|
class MyClass:
|
|
def method_a(self):
|
|
return "a"
|
|
|
|
def method_b(self):
|
|
return "b"
|
|
|
|
def __repr__(self):
|
|
return "MyClass()"
|
|
|
|
obj = MyClass()
|
|
with p.span("root") as root:
|
|
obj.method_a()
|
|
obj.method_b()
|
|
|
|
assert len(root.children) == 2
|
|
assert root.children[0].name == "method_a"
|
|
assert root.children[1].name == "method_b"
|
|
|
|
def test_i_can_use_trace_all_with_exclude(self, fresh_profiler):
|
|
"""Test that excluded methods are not wrapped by trace_all."""
|
|
p = fresh_profiler
|
|
|
|
@p.trace_all(exclude=["method_b"])
|
|
class MyClass:
|
|
def method_a(self):
|
|
return "a"
|
|
|
|
def method_b(self):
|
|
return "b"
|
|
|
|
obj = MyClass()
|
|
with p.span("root") as root:
|
|
obj.method_a()
|
|
obj.method_b()
|
|
|
|
assert len(root.children) == 1
|
|
assert root.children[0].name == "method_a"
|
|
|
|
def test_i_can_confirm_trace_all_skips_dunder_methods(self, fresh_profiler):
|
|
"""Test that trace_all does not wrap dunder methods like __repr__."""
|
|
p = fresh_profiler
|
|
call_log = []
|
|
|
|
@p.trace_all
|
|
class MyClass:
|
|
def __repr__(self):
|
|
call_log.append("repr")
|
|
return "MyClass()"
|
|
|
|
def method_a(self):
|
|
return "a"
|
|
|
|
obj = MyClass()
|
|
with p.span("root") as root:
|
|
repr(obj)
|
|
obj.method_a()
|
|
|
|
child_names = [c.name for c in root.children]
|
|
assert "method_a" in child_names
|
|
assert "__repr__" not in child_names
|
|
assert "repr" in call_log
|
|
|
|
def test_i_can_use_trace_all_with_parentheses_and_no_exclude(self, fresh_profiler):
|
|
"""Test that @profiler.trace_all() with parentheses and no args behaves like @profiler.trace_all."""
|
|
p = fresh_profiler
|
|
|
|
@p.trace_all()
|
|
class MyClass:
|
|
def method_a(self):
|
|
return "a"
|
|
|
|
obj = MyClass()
|
|
with p.span("root") as root:
|
|
obj.method_a()
|
|
|
|
assert len(root.children) == 1
|
|
assert root.children[0].name == "method_a"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestTraceCalls — profiler.trace_calls() function decorator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestTraceCalls:
|
|
|
|
def test_i_can_use_trace_calls_on_function(self, fresh_profiler):
|
|
"""Test that trace_calls traces all sub-calls as children of the decorated function span.
|
|
|
|
Verifies both direct children (helper_a, helper_b under main_func) and nested hierarchy
|
|
(grandchild under helper_a, not under main_func).
|
|
"""
|
|
p = fresh_profiler
|
|
|
|
def grandchild():
|
|
return 0
|
|
|
|
def helper_a():
|
|
grandchild()
|
|
return 1
|
|
|
|
def helper_b():
|
|
return 2
|
|
|
|
@p.trace_calls
|
|
def main_func():
|
|
helper_a()
|
|
helper_b()
|
|
return 42
|
|
|
|
with p.span("root") as root:
|
|
main_func()
|
|
|
|
assert len(root.children) == 1
|
|
main_span = root.children[0]
|
|
assert main_span.name == "main_func"
|
|
assert len(main_span.children) == 2, "main_func should have exactly 2 direct children"
|
|
child_names = [c.name for c in main_span.children]
|
|
assert "helper_a" in child_names
|
|
assert "helper_b" in child_names
|
|
|
|
helper_a_span = next(c for c in main_span.children if c.name == "helper_a")
|
|
assert len(helper_a_span.children) == 1, "grandchild must be nested under helper_a, not main_func"
|
|
assert helper_a_span.children[0].name == "grandchild"
|
|
|
|
def test_i_cannot_use_trace_calls_when_disabled(self, fresh_profiler):
|
|
"""Test that trace_calls creates no spans when the profiler is disabled at call time."""
|
|
p = fresh_profiler
|
|
|
|
@p.trace_calls
|
|
def main_func():
|
|
return 99
|
|
|
|
with p.span("root") as root:
|
|
p.enabled = False
|
|
main_func()
|
|
p.enabled = True
|
|
|
|
assert len(root.children) == 0, "trace_calls should not create spans when profiler is disabled"
|
|
|
|
def test_i_can_verify_trace_calls_captures_function_args(self, fresh_profiler):
|
|
"""Test that trace_calls captures the decorated function's arguments in the root span data."""
|
|
p = fresh_profiler
|
|
|
|
@p.trace_calls
|
|
def compute(x, y):
|
|
return x + y
|
|
|
|
with p.span("root") as root:
|
|
compute(3, 7)
|
|
|
|
main_span = root.children[0]
|
|
assert main_span.name == "compute"
|
|
assert main_span.data.get("x") == "3"
|
|
assert main_span.data.get("y") == "7"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TestProfilingManager — enable/disable, clear, overhead
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestProfilingManager:
|
|
|
|
def test_i_can_enable_disable_profiler(self, fresh_profiler):
|
|
"""Test that disabling the profiler makes span() return a no-op NullSpan."""
|
|
p = fresh_profiler
|
|
p.enabled = False
|
|
|
|
with p.span("ignored") as s:
|
|
pass
|
|
|
|
assert isinstance(s, _NullSpan)
|
|
|
|
def test_i_can_use_decorator_when_profiler_is_disabled(self, fresh_profiler):
|
|
"""Test that a @span decorated function still executes correctly when profiler is disabled."""
|
|
p = fresh_profiler
|
|
|
|
@p.span("my_span")
|
|
def my_func():
|
|
return "result"
|
|
|
|
p.enabled = False
|
|
result = my_func()
|
|
assert result == "result"
|
|
|
|
def test_i_can_toggle_profiler_at_runtime(self, fresh_profiler):
|
|
"""Test that spans are captured only while the profiler is enabled.
|
|
|
|
Three phases: enabled -> disabled -> re-enabled, verifying capture behavior at each step.
|
|
"""
|
|
p = fresh_profiler
|
|
|
|
@p.span("traced")
|
|
def my_func():
|
|
return 1
|
|
|
|
# Phase 1: enabled — span must be captured
|
|
with p.span("root") as root:
|
|
my_func()
|
|
assert len(root.children) == 1, "Span should be captured when profiler is enabled"
|
|
|
|
# Phase 2: disabled — p.span() returns NullSpan, nothing captured
|
|
p.enabled = False
|
|
with p.span("root_disabled") as root_disabled:
|
|
my_func()
|
|
assert isinstance(root_disabled, _NullSpan), "p.span() should return NullSpan when disabled"
|
|
|
|
# Phase 3: re-enabled — span must be captured again
|
|
p.enabled = True
|
|
with p.span("root_reenabled") as root_reenabled:
|
|
my_func()
|
|
assert len(root_reenabled.children) == 1, "Span should be captured again after re-enabling"
|
|
|
|
def test_i_can_clear_traces(self, fresh_profiler):
|
|
"""Test that clear() empties the trace buffer completely."""
|
|
p = fresh_profiler
|
|
with p.command_span("cmd", "", "uuid-1", {}):
|
|
pass
|
|
with p.command_span("cmd", "", "uuid-2", {}):
|
|
pass
|
|
|
|
assert len(p.traces) == 2
|
|
p.clear()
|
|
assert len(p.traces) == 0
|
|
|
|
def test_i_can_measure_overhead(self, fresh_profiler):
|
|
"""Test that overhead metrics are populated after spans are recorded."""
|
|
p = fresh_profiler
|
|
for _ in range(20):
|
|
with p.span("probe"):
|
|
pass
|
|
|
|
assert p.overhead_per_span_us >= 0
|
|
assert p.total_overhead_ms >= 0
|
|
|
|
def test_i_can_get_zero_overhead_when_no_samples(self, fresh_profiler):
|
|
"""Test that overhead_per_span_us returns 0.0 when no spans have been recorded."""
|
|
p = fresh_profiler
|
|
assert p.overhead_per_span_us == 0.0
|
|
|
|
def test_i_can_get_zero_total_overhead_when_buffer_empty(self, fresh_profiler):
|
|
"""Test that total_overhead_ms returns 0.0 when the trace buffer is empty."""
|
|
p = fresh_profiler
|
|
assert p.total_overhead_ms == 0.0
|