import functools import inspect import logging import sys import time from collections import deque from contextvars import ContextVar from dataclasses import dataclass, field from datetime import datetime from typing import Optional from uuid import uuid4 logger = logging.getLogger("Profiler") # --------------------------------------------------------------------------- # No-op sentinel — used when profiler is disabled # --------------------------------------------------------------------------- class _NullSpan: """No-op span returned when profiler is disabled.""" def set(self, key: str, value) -> '_NullSpan': return self def __enter__(self) -> '_NullSpan': return self def __exit__(self, *args): pass def __call__(self, fn): return fn # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class CumulativeSpan: """Aggregated span for loops — one entry regardless of iteration count. Attributes: name: Span name. count: Number of iterations recorded. total_ms: Cumulative duration in milliseconds. min_ms: Shortest recorded iteration in milliseconds. max_ms: Longest recorded iteration in milliseconds. """ name: str count: int = 0 total_ms: float = 0.0 min_ms: float = float('inf') max_ms: float = 0.0 @property def avg_ms(self) -> float: """Average duration per iteration in milliseconds.""" return self.total_ms / self.count if self.count > 0 else 0.0 def record(self, duration_ms: float): """Record one iteration. Args: duration_ms: Duration of this iteration in milliseconds. """ self.count += 1 self.total_ms += duration_ms if duration_ms < self.min_ms: self.min_ms = duration_ms if duration_ms > self.max_ms: self.max_ms = duration_ms @dataclass class ProfilingSpan: """A named timing segment. Attributes: name: Span name. data: Arbitrary metadata attached via span.set(). children: Nested spans and cumulative spans. duration_ms: Duration of this span in milliseconds (set on finish). """ name: str data: dict = field(default_factory=dict) children: list = field(default_factory=list) _start: float = field(default_factory=time.perf_counter, repr=False) duration_ms: float = field(default=0.0) def set(self, key: str, value) -> 'ProfilingSpan': """Attach metadata to this span. Args: key: Metadata key. value: Metadata value. Returns: Self, for chaining. """ self.data[key] = value return self def finish(self): """Stop timing and record the duration.""" self.duration_ms = (time.perf_counter() - self._start) * 1000 @dataclass class ProfilingTrace: """One complete command execution, from route handler to response. Attributes: command_name: Name of the executed command. command_description: Human-readable description of the command. command_id: UUID of the command. kwargs: Arguments received from the client. timestamp: When the command was received. root_span: Top-level span wrapping the full execution. total_duration_ms: Total server-side duration in milliseconds. """ command_name: str command_description: str command_id: str kwargs: dict timestamp: datetime root_span: Optional[ProfilingSpan] = None total_duration_ms: float = 0.0 trace_id: str = field(default_factory=lambda: str(uuid4())) # --------------------------------------------------------------------------- # Context managers / decorators # --------------------------------------------------------------------------- class _ActiveSpan: """Context manager and decorator for a single named span. When used as a context manager, returns the ProfilingSpan so callers can attach metadata via ``span.set()``. When used as a decorator, captures function arguments automatically. """ def __init__(self, manager: 'ProfilingManager', name: str, args: dict = None): self._manager = manager self._name = name self._args = args self._span: Optional[ProfilingSpan] = None self._token = None def __enter__(self) -> ProfilingSpan: if not self._manager.enabled: return _NullSpan() overhead_start = time.perf_counter() self._span = ProfilingSpan(name=self._name) if self._args: self._span.data.update(self._args) self._token = self._manager.push_span(self._span) self._manager.record_overhead(time.perf_counter() - overhead_start) return self._span def __exit__(self, *args): if self._span is not None: overhead_start = time.perf_counter() self._manager.pop_span(self._span, self._token) self._manager.record_overhead(time.perf_counter() - overhead_start) def __call__(self, fn): """Use as decorator — enabled check deferred to call time.""" manager = self._manager name = self._name @functools.wraps(fn) def wrapper(*args, **kwargs): if not manager.enabled: return fn(*args, **kwargs) captured = manager.capture_args(fn, args, kwargs) with _ActiveSpan(manager, name, captured): return fn(*args, **kwargs) return wrapper class _CumulativeActiveSpan: """Context manager and decorator for cumulative spans. Finds or creates a CumulativeSpan in the current parent and records each iteration without adding a new child entry. """ def __init__(self, manager: 'ProfilingManager', name: str): self._manager = manager self._name = name self._cumulative_span: Optional[CumulativeSpan] = None self._iter_start: float = 0.0 def _get_or_create(self) -> CumulativeSpan: parent = self._manager.current_span() if isinstance(parent, ProfilingSpan): for child in parent.children: if isinstance(child, CumulativeSpan) and child.name == self._name: return child cumulative_span = CumulativeSpan(name=self._name) parent.children.append(cumulative_span) return cumulative_span return CumulativeSpan(name=self._name) def __enter__(self) -> CumulativeSpan: if not self._manager.enabled: return _NullSpan() self._cumulative_span = self._get_or_create() self._iter_start = time.perf_counter() return self._cumulative_span def __exit__(self, *args): if self._cumulative_span is not None: duration_ms = (time.perf_counter() - self._iter_start) * 1000 self._cumulative_span.record(duration_ms) def __call__(self, fn): manager = self._manager name = self._name @functools.wraps(fn) def wrapper(*args, **kwargs): with _CumulativeActiveSpan(manager, name): return fn(*args, **kwargs) return wrapper class _CommandSpan: """Context manager that creates both a ProfilingTrace and its root span. Used exclusively by the route handler to wrap the full command execution. """ def __init__(self, manager: 'ProfilingManager', command_name: str, command_description: str, command_id: str, kwargs: dict): self._manager = manager self._command_name = command_name self._command_description = command_description self._command_id = command_id self._kwargs = kwargs self._trace: Optional[ProfilingTrace] = None self._span: Optional[ProfilingSpan] = None self._token = None def __enter__(self) -> ProfilingSpan: self._trace = ProfilingTrace( command_name=self._command_name, command_description=self._command_description, command_id=self._command_id, kwargs=dict(self._kwargs) if self._kwargs else {}, timestamp=datetime.now(), ) self._span = ProfilingSpan(name=self._command_name) self._token = self._manager.push_span(self._span) return self._span def __exit__(self, *args): self._manager.pop_span(self._span, self._token) self._trace.root_span = self._span self._trace.total_duration_ms = self._span.duration_ms self._manager.add_trace(self._trace) # --------------------------------------------------------------------------- # Manager # --------------------------------------------------------------------------- class ProfilingManager: """Global in-memory profiling manager. All probe mechanisms check ``enabled`` at call time, so the profiler can be toggled without restarting the server. Use the module-level ``profiler`` singleton rather than instantiating this class directly. """ def __init__(self, max_traces: int = None): from myfasthtml.core.constants import PROFILER_MAX_TRACES self.enabled: bool = False self._traces: deque = deque(maxlen=max_traces or PROFILER_MAX_TRACES) self._current_span: ContextVar[Optional[ProfilingSpan]] = ContextVar( 'profiler_current_span', default=None ) self._overhead_samples: list = [] # --- Span lifecycle --- def push_span(self, span: ProfilingSpan) -> object: """Register a span as the current context and attach it to the parent. Args: span: The span to activate. Returns: A reset token to pass to pop_span(). """ parent = self._current_span.get() if isinstance(parent, ProfilingSpan): parent.children.append(span) return self._current_span.set(span) def pop_span(self, span: ProfilingSpan, token: object) -> None: """Finish a span and restore the previous context. Args: span: The span to finish. token: The reset token returned by push_span(). """ span.finish() self._current_span.reset(token) def add_trace(self, trace: ProfilingTrace) -> None: """Add a completed trace to the buffer. Args: trace: The trace to store. """ self._traces.appendleft(trace) def record_overhead(self, duration_s: float) -> None: """Record a span boundary overhead sample. Args: duration_s: Duration in seconds of the profiler's own housekeeping. """ self._overhead_samples.append(duration_s * 1e6) if len(self._overhead_samples) > 1000: self._overhead_samples = self._overhead_samples[-1000:] @staticmethod def capture_args(fn, args, kwargs) -> dict: """Capture function arguments as a truncated string dict. Args: fn: The function whose signature is inspected. args: Positional arguments passed to the function. kwargs: Keyword arguments passed to the function. Returns: Dict of parameter names to string-truncated values. """ try: sig = inspect.signature(fn) bound = sig.bind(*args, **kwargs) bound.apply_defaults() params = dict(bound.arguments) params.pop('self', None) params.pop('cls', None) return {k: str(v)[:200] for k, v in params.items()} except Exception: return {} # --- Public interface --- @property def traces(self) -> list[ProfilingTrace]: """All recorded traces, most recent first.""" return list(self._traces) def current_span(self) -> Optional[ProfilingSpan]: """Return the active span in the current async context. Can be called from anywhere within a span to attach metadata:: profiler.current_span().set("row_count", len(rows)) """ return self._current_span.get() def clear(self): """Empty the trace buffer.""" self._traces.clear() logger.debug("Profiler traces cleared.") # --- Probe mechanisms --- def span(self, name: str, args: dict = None) -> _ActiveSpan: """Context manager and decorator for a named span. The enabled check is deferred to call/enter time, so this can be used as a static decorator without concern for startup order. Args: name: Span name. args: Optional metadata to attach immediately. Returns: An object usable as a context manager or decorator. """ return _ActiveSpan(self, name, args) def cumulative_span(self, name: str) -> _CumulativeActiveSpan: """Context manager and decorator for loop spans. Aggregates all iterations into a single entry (count, total, min, max, avg). Args: name: Span name. Returns: An object usable as a context manager or decorator. """ return _CumulativeActiveSpan(self, name) def command_span(self, command_name: str, command_description: str, command_id: str, kwargs: dict) -> '_CommandSpan | _NullSpan': """Context manager for the route handler. Creates a ProfilingTrace and its root span together. When the context exits, the trace is added to the buffer. Args: command_name: Human-readable command name. command_description: Human-readable description of the command. command_id: UUID string of the command. kwargs: Client arguments received by the route handler. Returns: An object usable as a context manager. """ if not self.enabled: return _NullSpan() return _CommandSpan(self, command_name, command_description, command_id, kwargs) def trace_all(self, cls=None, *, exclude: list = None): """Class decorator — statically wraps all non-dunder methods with spans. Wrapping happens at class definition time; the enabled check is deferred to call time via _ActiveSpan.__call__. Args: cls: The class to instrument (when used without parentheses). exclude: List of method names to skip. Usage:: @profiler.trace_all class MyClass: ... @profiler.trace_all(exclude=["render"]) class MyClass: ... """ _exclude = set(exclude or []) def decorator(klass): for attr_name, method in inspect.getmembers(klass, predicate=inspect.isfunction): if attr_name in _exclude: continue if attr_name.startswith('__') and attr_name.endswith('__'): continue setattr(klass, attr_name, _ActiveSpan(self, attr_name)(method)) return klass if cls is not None: return decorator(cls) return decorator def trace_calls(self, fn): """Function decorator — traces all sub-calls via sys.setprofile(). Use for exploration when the bottleneck location is unknown. sys.setprofile() is scoped to this function's execution only; the global profiler is restored on exit. The root span for ``fn`` itself is created before setprofile is activated so that profiler internals are not captured as children. Args: fn: The function to instrument. """ manager = self @functools.wraps(fn) def wrapper(*args, **kwargs): if not manager.enabled: return fn(*args, **kwargs) call_stack: list[tuple[ProfilingSpan, object]] = [] # Skip the first call event (fn itself — already represented by root_span) skip_first = [True] def _profile(frame, event, arg): if event == 'call': if skip_first[0]: skip_first[0] = False return span = ProfilingSpan(name=frame.f_code.co_name) token = manager.push_span(span) call_stack.append((span, token)) elif event in ('return', 'exception'): if call_stack: span, token = call_stack.pop() manager.pop_span(span, token) # Build root span BEFORE activating setprofile so that profiler # internals (capture_args, ProfilingSpan.__init__, etc.) are not # captured as children. captured = manager.capture_args(fn, args, kwargs) root_span = ProfilingSpan(name=fn.__name__) root_span.data.update(captured) root_token = manager.push_span(root_span) old_profile = sys.getprofile() sys.setprofile(_profile) try: result = fn(*args, **kwargs) finally: sys.setprofile(old_profile) manager.pop_span(root_span, root_token) return result return wrapper # --- Overhead measurement --- @property def overhead_per_span_us(self) -> float: """Average overhead per span boundary in microseconds.""" if not self._overhead_samples: return 0.0 return sum(self._overhead_samples) / len(self._overhead_samples) @property def total_overhead_ms(self) -> float: """Estimated total overhead across all recorded traces.""" total_spans = sum( self._count_spans(t.root_span) for t in self._traces if t.root_span ) return (total_spans * self.overhead_per_span_us * 2) / 1000 def _count_spans(self, span: ProfilingSpan) -> int: if span is None: return 0 count = 1 for child in span.children: if isinstance(child, ProfilingSpan): count += self._count_spans(child) return count # --------------------------------------------------------------------------- # Singleton # --------------------------------------------------------------------------- profiler = ProfilingManager()