Source code for stormlog.session

"""Shared session identity and lifecycle helpers."""

from __future__ import annotations

import os
import socket
import time
from dataclasses import asdict, dataclass, replace
from typing import Any, Iterable, Mapping, Protocol, TypeVar, cast
from uuid import NAMESPACE_URL, uuid4, uuid5

SESSION_STATUS_RUNNING = "running"
SESSION_STATUS_COMPLETED = "completed"
SESSION_STATUS_INTERRUPTED = "interrupted"
SESSION_STATUS_INCOMPLETE = "incomplete"

SESSION_STATUSES = frozenset(
    {
        SESSION_STATUS_RUNNING,
        SESSION_STATUS_COMPLETED,
        SESSION_STATUS_INTERRUPTED,
        SESSION_STATUS_INCOMPLETE,
    }
)

_SESSION_SELECTION_PRIORITY = {
    SESSION_STATUS_COMPLETED: 0,
    SESSION_STATUS_INTERRUPTED: 1,
    SESSION_STATUS_INCOMPLETE: 2,
    SESSION_STATUS_RUNNING: 3,
}
_UNCHANGED = object()


[docs] @dataclass(frozen=True) class SessionSummary: """Identity and lifecycle metadata for one capture session.""" session_id: str status: str started_at_ns: int ended_at_ns: int | None host: str pid: int job_id: str | None = None rank: int = 0 local_rank: int = 0 world_size: int = 1 source: str = "stormlog.session"
class _HasSummary(Protocol): summary: SessionSummary _SessionLikeT = TypeVar("_SessionLikeT", bound=SessionSummary | _HasSummary)
[docs] def now_ns() -> int: """Return the current wall-clock timestamp in nanoseconds.""" return time.time_ns()
[docs] def new_session_id() -> str: """Create a new opaque session identifier.""" return str(uuid4())
[docs] def stable_legacy_session_id(*parts: object) -> str: """Build a deterministic synthetic session id for legacy artifacts.""" payload = "::".join(str(part) for part in parts if part is not None) return f"legacy-{uuid5(NAMESPACE_URL, payload)}"
[docs] def create_session_summary( *, source: str, status: str = SESSION_STATUS_RUNNING, session_id: str | None = None, started_at_ns: int | None = None, ended_at_ns: int | None = None, host: str | None = None, pid: int | None = None, job_id: str | None = None, rank: int = 0, local_rank: int = 0, world_size: int = 1, ) -> SessionSummary: """Create a normalized session summary.""" resolved_status = normalize_session_status(status) resolved_started = int(now_ns() if started_at_ns is None else started_at_ns) resolved_host = host or socket.gethostname() resolved_pid = int(os.getpid() if pid is None else pid) resolved_ended = None if ended_at_ns is None else int(ended_at_ns) return SessionSummary( session_id=session_id or new_session_id(), status=resolved_status, started_at_ns=resolved_started, ended_at_ns=resolved_ended, host=resolved_host, pid=resolved_pid, job_id=job_id, rank=int(rank), local_rank=int(local_rank), world_size=max(1, int(world_size)), source=source, )
[docs] def update_session_summary( summary: SessionSummary, *, status: str | None = None, ended_at_ns: int | None | object = _UNCHANGED, source: str | None = None, ) -> SessionSummary: """Return a copy of a session summary with lifecycle updates applied.""" updates: dict[str, Any] = {} if status is not None: updates["status"] = normalize_session_status(status) if ended_at_ns is not _UNCHANGED: updates["ended_at_ns"] = ( None if ended_at_ns is None else int(cast(int, ended_at_ns)) ) if source is not None: updates["source"] = source return replace(summary, **updates)
[docs] def finalize_session_summary( summary: SessionSummary, *, ended_at_ns: int | None = None, ) -> SessionSummary: """Finalize a session without overwriting degraded terminal states.""" final_status = ( SESSION_STATUS_COMPLETED if summary.status == SESSION_STATUS_RUNNING else summary.status ) return update_session_summary( summary, status=final_status, ended_at_ns=now_ns() if ended_at_ns is None else ended_at_ns, )
[docs] def normalize_session_status(value: object) -> str: """Validate and normalize a session status string.""" if not isinstance(value, str) or not value.strip(): raise ValueError("session status must be a non-empty string") normalized = value.strip().lower() if normalized not in SESSION_STATUSES: raise ValueError(f"Unsupported session status: {value}") return normalized
[docs] def session_summary_to_dict(summary: SessionSummary) -> dict[str, Any]: """Serialize a session summary into a plain JSON-safe mapping.""" return asdict(summary)
[docs] def session_summary_from_dict(payload: Mapping[str, Any]) -> SessionSummary: """Deserialize a session summary mapping.""" if not isinstance(payload, Mapping): raise ValueError("session summary payload must be an object") session_id = payload.get("session_id") if not isinstance(session_id, str) or not session_id.strip(): raise ValueError("session_id must be a non-empty string") source = payload.get("source") if not isinstance(source, str) or not source.strip(): raise ValueError("source must be a non-empty string") status = normalize_session_status(payload.get("status")) started_at_raw = payload.get("started_at_ns") if started_at_raw is None: raise ValueError("started_at_ns must be present") started_at_ns = int(started_at_raw) ended_raw = payload.get("ended_at_ns") ended_at_ns = None if ended_raw is None else int(ended_raw) host = payload.get("host") if not isinstance(host, str) or not host.strip(): raise ValueError("host must be a non-empty string") return SessionSummary( session_id=session_id, status=status, started_at_ns=started_at_ns, ended_at_ns=ended_at_ns, host=host, pid=int(payload.get("pid", -1)), job_id=payload.get("job_id"), rank=int(payload.get("rank", 0)), local_rank=int(payload.get("local_rank", 0)), world_size=max(1, int(payload.get("world_size", 1))), source=source, )
[docs] def infer_session_summary_from_events( *, session_id: str, events: Iterable[Any], source: str, fallback_status: str = SESSION_STATUS_INCOMPLETE, ) -> SessionSummary: """Infer a session summary from event timestamps and lifecycle markers.""" event_list = list(events) if event_list: sorted_events = sorted( event_list, key=lambda event: int(getattr(event, "timestamp_ns")), ) first = sorted_events[0] ended_at_ns: int | None = None status = normalize_session_status(fallback_status) if any(getattr(event, "event_type", "") == "stop" for event in sorted_events): status = SESSION_STATUS_COMPLETED stop_events = [ event for event in sorted_events if getattr(event, "event_type", "") == "stop" ] ended_at_ns = int(getattr(stop_events[-1], "timestamp_ns")) return create_session_summary( source=source, status=status, session_id=session_id, started_at_ns=int(getattr(first, "timestamp_ns")), ended_at_ns=ended_at_ns, host=str(getattr(first, "host", "unknown")), pid=int(getattr(first, "pid", -1)), job_id=getattr(first, "job_id", None), rank=int(getattr(first, "rank", 0)), local_rank=int(getattr(first, "local_rank", 0)), world_size=max(1, int(getattr(first, "world_size", 1))), ) return create_session_summary( source=source, status=fallback_status, session_id=session_id, )
[docs] def sort_session_summaries(summaries: Iterable[SessionSummary]) -> list[SessionSummary]: """Sort sessions by selection priority and recency.""" return sorted(summaries, key=_session_selection_sort_key)
def _session_selection_sort_key(summary: SessionSummary) -> tuple[int, int, int, str]: return ( _SESSION_SELECTION_PRIORITY.get(summary.status, 99), -int(summary.ended_at_ns or summary.started_at_ns), -summary.started_at_ns, summary.session_id, ) def _session_summary_for_selection( session: SessionSummary | _HasSummary, ) -> SessionSummary: if isinstance(session, SessionSummary): return session return session.summary
[docs] def select_default_session( sessions: Iterable[_SessionLikeT], ) -> _SessionLikeT | None: """Pick the default session using lifecycle priority and recency.""" session_list = list(sessions) if not session_list: return None ordered = sorted( session_list, key=lambda session: _session_selection_sort_key( _session_summary_for_selection(session) ), ) return ordered[0]
[docs] def select_default_loaded_session( sessions: Iterable[Any], ) -> Any | None: """Pick the default loaded session, preferring non-empty event groups.""" session_list = list(sessions) if not session_list: return None non_empty = [loaded for loaded in session_list if len(loaded.events) > 0] if non_empty: return select_default_session(non_empty) return select_default_session(session_list)
__all__ = [ "SESSION_STATUS_COMPLETED", "SESSION_STATUS_INCOMPLETE", "SESSION_STATUS_INTERRUPTED", "SESSION_STATUS_RUNNING", "SESSION_STATUSES", "SessionSummary", "create_session_summary", "finalize_session_summary", "infer_session_summary_from_events", "new_session_id", "normalize_session_status", "now_ns", "select_default_loaded_session", "select_default_session", "session_summary_from_dict", "session_summary_to_dict", "sort_session_summaries", "stable_legacy_session_id", "update_session_summary", ]