stormlog.telemetry

Canonical telemetry event schema and legacy conversion helpers.

Functions

load_telemetry_events(path[, ...])

Load telemetry events from JSON and return the selected session.

load_telemetry_sessions(path[, ...])

Load grouped telemetry sessions from JSON, JSONL, or sink directories.

project_telemetry_event(event)

Project telemetry objects or compatible mappings into the shared model.

project_telemetry_events(events)

Project existing telemetry events into backend-neutral records.

resolve_distributed_identity(*[, job_id, ...])

Normalize distributed identity fields from explicit, metadata, or env inputs.

telemetry_event_from_record(record[, ...])

Create a canonical telemetry event from v3, v2, or legacy records.

telemetry_event_to_dict(event)

Serialize a telemetry event to a plain dictionary.

validate_telemetry_record(record)

Validate a v2 or v3 telemetry record.

Classes

LoadedTelemetrySession(summary, events[, ...])

Grouped telemetry records and lifecycle metadata for one session.

TelemetryEvent

alias of TelemetryEventV3

TelemetryEventV2(schema_version, ...[, ...])

Legacy v2 telemetry event payload retained for backward-compatible writes/tests.

TelemetryEventV3(schema_version, session_id, ...)

Canonical telemetry event payload used by tracker exports and loaders.

class stormlog.telemetry.ProjectedTelemetryRecord(schema_version, record_id, timestamp_ns, observed_timestamp_ns, session_id, source_kind, event_type, stage, severity, severity_text, body, resource=<factory>, attributes=<factory>, correlation=<factory>)[source]

Bases: object

Small immutable event envelope shared by live and loaded telemetry.

Parameters:
  • schema_version (Literal[1])

  • record_id (str)

  • timestamp_ns (int)

  • observed_timestamp_ns (int)

  • session_id (str)

  • source_kind (str)

  • event_type (str)

  • stage (str | None)

  • severity (str | None)

  • severity_text (str | None)

  • body (str | None)

  • resource (Mapping[str, Any])

  • attributes (Mapping[str, Any])

  • correlation (Mapping[str, Any])

schema_version: Literal[1]
record_id: str
timestamp_ns: int
observed_timestamp_ns: int
session_id: str
source_kind: str
event_type: str
stage: str | None
severity: str | None
severity_text: str | None
body: str | None
resource: Mapping[str, Any]
attributes: Mapping[str, Any]
correlation: Mapping[str, Any]
class stormlog.telemetry.LoadedTelemetrySession(summary, events, sources_loaded=<factory>, warnings=<factory>)[source]

Bases: object

Grouped telemetry records and lifecycle metadata for one session.

Parameters:
summary: SessionSummary
events: list[TelemetryEventV3]
sources_loaded: list[str]
warnings: list[str]
telemetry_records()[source]

Return backend-neutral projected records for this loaded session.

Return type:

list[ProjectedTelemetryRecord]

resources()[source]

Return unique observed resources for this loaded session.

Return type:

list[dict[str, Any]]

correlations()[source]

Return unique correlation contexts for this loaded session.

Return type:

list[dict[str, Any]]

stormlog.telemetry.TelemetryEvent

alias of TelemetryEventV3

class stormlog.telemetry.TelemetryEventV2(schema_version, timestamp_ns, event_type, collector, sampling_interval_ms, pid, host, device_id, allocator_allocated_bytes, allocator_reserved_bytes, allocator_active_bytes, allocator_inactive_bytes, allocator_change_bytes, device_used_bytes, device_free_bytes, device_total_bytes, context, job_id=None, rank=0, local_rank=0, world_size=1, metadata=<factory>)[source]

Bases: object

Legacy v2 telemetry event payload retained for backward-compatible writes/tests.

Parameters:
  • schema_version (Literal[2])

  • timestamp_ns (int)

  • event_type (str)

  • collector (str)

  • sampling_interval_ms (int)

  • pid (int)

  • host (str)

  • device_id (int)

  • allocator_allocated_bytes (int)

  • allocator_reserved_bytes (int)

  • allocator_active_bytes (int | None)

  • allocator_inactive_bytes (int | None)

  • allocator_change_bytes (int)

  • device_used_bytes (int)

  • device_free_bytes (int | None)

  • device_total_bytes (int | None)

  • context (str | None)

  • job_id (str | None)

  • rank (int)

  • local_rank (int)

  • world_size (int)

  • metadata (dict[str, Any])

schema_version: Literal[2]
timestamp_ns: int
event_type: str
collector: str
sampling_interval_ms: int
pid: int
host: str
device_id: int
allocator_allocated_bytes: int
allocator_reserved_bytes: int
allocator_active_bytes: int | None
allocator_inactive_bytes: int | None
allocator_change_bytes: int
device_used_bytes: int
device_free_bytes: int | None
device_total_bytes: int | None
context: str | None
job_id: str | None = None
rank: int = 0
local_rank: int = 0
world_size: int = 1
metadata: dict[str, Any]
class stormlog.telemetry.TelemetryEventV3(schema_version, session_id, timestamp_ns, event_type, collector, sampling_interval_ms, pid, host, device_id, allocator_allocated_bytes, allocator_reserved_bytes, allocator_active_bytes, allocator_inactive_bytes, allocator_change_bytes, device_used_bytes, device_free_bytes, device_total_bytes, context, job_id=None, rank=0, local_rank=0, world_size=1, metadata=<factory>)[source]

Bases: object

Canonical telemetry event payload used by tracker exports and loaders.

Parameters:
  • schema_version (Literal[3])

  • session_id (str)

  • timestamp_ns (int)

  • event_type (str)

  • collector (str)

  • sampling_interval_ms (int)

  • pid (int)

  • host (str)

  • device_id (int)

  • allocator_allocated_bytes (int)

  • allocator_reserved_bytes (int)

  • allocator_active_bytes (int | None)

  • allocator_inactive_bytes (int | None)

  • allocator_change_bytes (int)

  • device_used_bytes (int)

  • device_free_bytes (int | None)

  • device_total_bytes (int | None)

  • context (str | None)

  • job_id (str | None)

  • rank (int)

  • local_rank (int)

  • world_size (int)

  • metadata (dict[str, Any])

schema_version: Literal[3]
session_id: str
timestamp_ns: int
event_type: str
collector: str
sampling_interval_ms: int
pid: int
host: str
device_id: int
allocator_allocated_bytes: int
allocator_reserved_bytes: int
allocator_active_bytes: int | None
allocator_inactive_bytes: int | None
allocator_change_bytes: int
device_used_bytes: int
device_free_bytes: int | None
device_total_bytes: int | None
context: str | None
job_id: str | None = None
rank: int = 0
local_rank: int = 0
world_size: int = 1
metadata: dict[str, Any]
stormlog.telemetry.project_telemetry_event(event)[source]

Project telemetry objects or compatible mappings into the shared model.

Parameters:

event (TelemetryEventV3 | Mapping[str, Any])

Return type:

ProjectedTelemetryRecord

stormlog.telemetry.project_telemetry_events(events)[source]

Project existing telemetry events into backend-neutral records.

Parameters:

events (Iterable[TelemetryEventV3 | Mapping[str, Any]])

Return type:

list[ProjectedTelemetryRecord]

stormlog.telemetry.load_telemetry_sessions(path, permissive_legacy=True, events_key=None)[source]

Load grouped telemetry sessions from JSON, JSONL, or sink directories.

Parameters:
  • path (str | Path)

  • permissive_legacy (bool)

  • events_key (str | None)

Return type:

list[LoadedTelemetrySession]

stormlog.telemetry.telemetry_event_from_record(record, permissive_legacy=True, default_collector='legacy.unknown', default_sampling_interval_ms=0, default_session_id=None)[source]

Create a canonical telemetry event from v3, v2, or legacy records.

Parameters:
  • record (Mapping[str, Any])

  • permissive_legacy (bool)

  • default_collector (str)

  • default_sampling_interval_ms (int)

  • default_session_id (str | None)

Return type:

TelemetryEventV3

stormlog.telemetry.telemetry_event_to_dict(event)[source]

Serialize a telemetry event to a plain dictionary.

Parameters:

event (TelemetryEventV3 | TelemetryEventV2)

Return type:

dict[str, Any]

stormlog.telemetry.validate_telemetry_record(record)[source]

Validate a v2 or v3 telemetry record.

Raises:

ValueError – if the record is invalid or partial.

Parameters:

record (Mapping[str, Any])

Return type:

None

stormlog.telemetry.load_telemetry_events(path, permissive_legacy=True, events_key=None, session_id=None)[source]

Load telemetry events from JSON and return the selected session.

Parameters:
  • path (str | Path)

  • permissive_legacy (bool)

  • events_key (str | None)

  • session_id (str | None)

Return type:

list[TelemetryEventV3]

stormlog.telemetry.resolve_distributed_identity(*, job_id=None, rank=None, local_rank=None, world_size=None, metadata=None, env=None)[source]

Normalize distributed identity fields from explicit, metadata, or env inputs.

Parameters:
  • job_id (Any)

  • rank (Any)

  • local_rank (Any)

  • world_size (Any)

  • metadata (Mapping[str, Any] | None)

  • env (Mapping[str, str] | None)

Return type:

dict[str, Any]