Source code for stormlog.tui.commands

"""Asynchronous CLI command runner for the Textual TUI."""

from __future__ import annotations

import asyncio
from typing import Awaitable, Callable, Optional


[docs] class CLICommandRunner: """Runs shell commands asynchronously while streaming output.""" def __init__(self) -> None: self._process: Optional[asyncio.subprocess.Process] = None self._io_tasks: list[asyncio.Task] = [] @property def is_running(self) -> bool: return self._process is not None
[docs] async def run( self, command: str, callback: Callable[[str, str], Awaitable[None]], ) -> int: """Run a command and stream stdout/stderr via callback.""" if self._process: raise RuntimeError("Another command is already running.") self._process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) async def pump(stream: Optional[asyncio.StreamReader], label: str) -> None: if not stream: return while True: line = await stream.readline() if not line: break await callback( label, line.decode("utf-8", errors="replace").rstrip("\n") ) stdout_task = asyncio.create_task(pump(self._process.stdout, "stdout")) stderr_task = asyncio.create_task(pump(self._process.stderr, "stderr")) self._io_tasks = [stdout_task, stderr_task] return_code = await self._process.wait() await asyncio.gather(*self._io_tasks, return_exceptions=True) self._cleanup() return return_code
[docs] async def cancel(self) -> bool: """Attempt to terminate the running command.""" if not self._process: return False self._process.terminate() try: await asyncio.wait_for(self._process.wait(), timeout=3) except asyncio.TimeoutError: self._process.kill() await self._process.wait() await asyncio.gather(*self._io_tasks, return_exceptions=True) self._cleanup() return True
def _cleanup(self) -> None: self._process = None self._io_tasks = []