Source code for stormlog.derived_fields

"""Registry-driven derived-field layer for Stormlog telemetry.

Centralises the common investigation formulas (allocator gap, utilisation %,
fragmentation ratio, degraded-collector state) so that ``diagnose.py``,
``gap_analysis.py``, and future TUI surfaces all use a single, tested source
of truth instead of copy-pasting the same arithmetic.

Usage::

    from stormlog.derived_fields import compute_event_fields, enrich_event

    fields = compute_event_fields(event)
    # fields["allocator_gap_bytes"] always present
    # fields["utilization_ratio"] is None when device_total_bytes is unknown

    enriched = enrich_event(event)
    # {"allocator_allocated_bytes": ..., ..., "allocator_gap_bytes": ..., ...}
"""

from __future__ import annotations

import dataclasses
from collections.abc import Mapping as MappingABC
from typing import Any, Dict, Sequence, TypedDict, cast

from .collector_health import COLLECTOR_HEALTH_DEGRADED

# ---------------------------------------------------------------------------
# Public contract
# ---------------------------------------------------------------------------


[docs] class DerivedFields(TypedDict, total=False): """Computed fields derived from a single telemetry event. ``allocator_gap_bytes`` is always present. All other fields are present only when the underlying raw counters are available; their absence is encoded as ``None`` in the returned dict rather than a missing key. """ allocator_gap_bytes: int utilization_ratio: None | float fragmentation_ratio: None | float is_degraded_collector: bool
[docs] class SessionDerivedFields(TypedDict, total=False): """Session-scoped rollups computed across a sequence of events.""" peak_utilization_ratio: None | float avg_fragmentation_ratio: None | float is_session_interrupted: bool
# --------------------------------------------------------------------------- # Degraded-collector registry # --------------------------------------------------------------------------- # Collector strings that indicate the collector is operating in a degraded / # fallback mode. Cross-referenced against ``collector_health.py`` status # constants and the known collector name fragments used in ``telemetry.py``. _DEGRADED_COLLECTOR_SUBSTRINGS: frozenset[str] = frozenset( { "fallback", COLLECTOR_HEALTH_DEGRADED, # canonical constant from collector_health.py "unavailable", "partial", "legacy.unknown", } ) def _collector_is_degraded(collector: None | str) -> bool: """Return True when the collector name signals a degraded / fallback mode.""" if not collector: return False lower = collector.lower() return any(fragment in lower for fragment in _DEGRADED_COLLECTOR_SUBSTRINGS) # --------------------------------------------------------------------------- # Per-field compute helpers (the "registry") # --------------------------------------------------------------------------- def _compute_allocator_gap_bytes( allocator_allocated_bytes: int, allocator_reserved_bytes: int, ) -> int: """Reserved memory that the allocator holds but has not actively allocated. Clamped to zero - negative values indicate a stale or inconsistent snapshot rather than a meaningful metric. """ return max(0, allocator_reserved_bytes - allocator_allocated_bytes) def _compute_utilization_ratio( allocator_allocated_bytes: int, device_total_bytes: None | int, ) -> None | float: """Fraction of device capacity currently allocated (0.0 - 1.0).""" if device_total_bytes is None or device_total_bytes <= 0: return None return allocator_allocated_bytes / device_total_bytes def _compute_fragmentation_ratio( allocator_allocated_bytes: int, allocator_reserved_bytes: int, ) -> None | float: """Fraction of reserved memory that is fragmented (unused gap / reserved). Returns ``None`` when ``allocator_reserved_bytes`` is zero to avoid division-by-zero. """ if allocator_reserved_bytes == 0: return None gap = _compute_allocator_gap_bytes( allocator_allocated_bytes, allocator_reserved_bytes ) return gap / allocator_reserved_bytes # --------------------------------------------------------------------------- # Shared accessor # --------------------------------------------------------------------------- def _event_get(event: Any, key: str, default: Any = None) -> Any: """Uniform attribute access for dataclasses and mapping inputs.""" if isinstance(event, MappingABC): return event.get(key, default) return getattr(event, key, default) # --------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------
[docs] def compute_event_fields(event: Any) -> DerivedFields: """Compute row-scoped derived fields from a single telemetry event. ``event`` may be a ``TelemetryEventV2``, ``TelemetryEventV3``, or any object / mapping that exposes the standard allocator counter attributes. ``allocator_gap_bytes`` is always present in the returned dict. Fields that require raw counters which may be absent (e.g. ``device_total_bytes``) are returned as ``None`` rather than being omitted. Args: event: A telemetry event object or mapping. Returns: A :class:`DerivedFields` dict. """ allocated: int = int(_event_get(event, "allocator_allocated_bytes", 0) or 0) reserved: int = int(_event_get(event, "allocator_reserved_bytes", 0) or 0) device_total: None | int = _event_get(event, "device_total_bytes") collector: None | str = _event_get(event, "collector") result: DerivedFields = { "allocator_gap_bytes": _compute_allocator_gap_bytes(allocated, reserved), "utilization_ratio": _compute_utilization_ratio(allocated, device_total), "fragmentation_ratio": _compute_fragmentation_ratio(allocated, reserved), "is_degraded_collector": _collector_is_degraded(collector), } return result
[docs] def compute_session_fields(events: Sequence[Any]) -> SessionDerivedFields: """Compute session-scoped rollups across an ordered sequence of events. Args: events: Ordered sequence of telemetry event objects or mappings. Returns: A :class:`SessionDerivedFields` dict with: * ``peak_utilization_ratio`` - highest per-event utilization seen; or ``None`` if ``device_total_bytes`` was unavailable in every event. * ``avg_fragmentation_ratio`` - mean fragmentation ratio across events where ``allocator_reserved_bytes > 0``; ``None`` if none qualify. * ``is_session_interrupted`` - ``True`` when the last event is not a ``"stop"`` event. """ util_values: list[float] = [] frag_values: list[float] = [] for event in events: per_event = compute_event_fields(event) u = per_event.get("utilization_ratio") if u is not None: util_values.append(u) f = per_event.get("fragmentation_ratio") if f is not None: frag_values.append(f) peak_utilization: None | float = max(util_values) if util_values else None avg_fragmentation: None | float = ( sum(frag_values) / len(frag_values) if frag_values else None ) last_event_type: str = "" if events: last_event_type = _event_get(events[-1], "event_type", "") is_interrupted = last_event_type != "stop" return { "peak_utilization_ratio": peak_utilization, "avg_fragmentation_ratio": avg_fragmentation, "is_session_interrupted": is_interrupted, }
[docs] def enrich_event(event: Any) -> Dict[str, Any]: """Return a flat dict merging raw event fields with derived fields. Derived fields are nested under a ``"derived"`` key so that raw and computed fields never collide. Args: event: A telemetry event object or mapping. Returns: ``{"schema_version": ..., ..., "derived": {"allocator_gap_bytes": ..., ...}}`` """ if isinstance(event, MappingABC): raw: Dict[str, Any] = dict(event) elif dataclasses.is_dataclass(event): raw = dataclasses.asdict(cast(Any, event)) else: raw = dict(vars(event)) raw["derived"] = dict(compute_event_fields(event)) return raw
__all__ = [ "DerivedFields", "SessionDerivedFields", "compute_event_fields", "compute_session_fields", "enrich_event", ]