feat(gateway): live-stream /update output + forward interactive prompts

Adds real-time output streaming and interactive prompt forwarding for
the gateway /update command, so users on Telegram/Discord/etc see the
full update progress and can respond to prompts (stash restore, config
migration) without needing terminal access.

Changes:

hermes_cli/main.py:
- Add --gateway flag to 'hermes update' argparse
- Add _gateway_prompt() file-based IPC function that writes
  .update_prompt.json and polls for .update_response
- Modify _restore_stashed_changes() to accept optional input_fn
  parameter for gateway mode prompt forwarding
- cmd_update() uses _gateway_prompt when --gateway is set, enabling
  interactive stash restore and config migration prompts

gateway/run.py:
- _handle_update_command: spawn with --gateway flag and
  PYTHONUNBUFFERED=1 for real-time output flushing
- Store session_key in .update_pending.json for cross-restart
  session matching
- Add _update_prompt_pending dict to track sessions awaiting
  update prompt responses
- Replace _watch_for_update_completion with _watch_update_progress:
  streams output chunks every ~4s, detects .update_prompt.json and
  forwards prompts to the user, handles completion/failure/timeout
- Add update prompt interception in _handle_message: when a prompt
  is pending, the user's next message is written to .update_response
  instead of being processed normally
- Preserve _send_update_notification as legacy fallback for
  post-restart cases where adapter isn't available yet

File-based IPC protocol:
- .update_prompt.json: written by update process with prompt text,
  default value, and unique ID
- .update_response: written by gateway with user's answer
- .update_output.txt: existing, now streamed in real-time
- .update_exit_code: existing completion marker

Tests: 16 new tests covering _gateway_prompt IPC, output streaming,
prompt detection/forwarding, message interception, and cleanup.
This commit is contained in:
Teknium
2026-04-04 23:27:03 -07:00
parent 0fd3de2674
commit 3d698ba4e1
4 changed files with 775 additions and 17 deletions

View File

@@ -517,6 +517,10 @@ class GatewayRunner:
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
# Track pending /update prompt responses per session.
# Key: session_key, Value: True when a prompt is waiting for user input.
self._update_prompt_pending: Dict[str, bool] = {}
# Persistent Honcho managers keyed by gateway session key.
# This preserves write_frequency="session" semantics across short-lived
# per-message AIAgent instances.
@@ -1737,6 +1741,26 @@ class GatewayRunner:
self.pairing_store._record_rate_limit(platform_name, source.user_id)
return None
# Intercept messages that are responses to a pending /update prompt.
# The update process (detached) wrote .update_prompt.json; the watcher
# forwarded it to the user; now the user's reply goes back via
# .update_response so the update process can continue.
_quick_key = self._session_key_for_source(source)
_update_prompts = getattr(self, "_update_prompt_pending", {})
if _update_prompts.get(_quick_key):
response_path = _hermes_home / ".update_response"
response_text = (event.text or "").strip()
if response_text:
try:
tmp = response_path.with_suffix(".tmp")
tmp.write_text(response_text)
tmp.replace(response_path)
except OSError as e:
logger.warning("Failed to write update response: %s", e)
return f"✗ Failed to send response to update process: {e}"
_update_prompts.pop(_quick_key, None)
return f"✓ Sent `{response_text}` to the update process."
# PRIORITY handling when an agent is already running for this session.
# Default behavior is to interrupt immediately so user text/stop messages
# are handled with minimal latency.
@@ -1744,7 +1768,6 @@ class GatewayRunner:
# Special case: Telegram/photo bursts often arrive as multiple near-
# simultaneous updates. Do NOT interrupt for photo-only follow-ups here;
# let the adapter-level batching/queueing logic absorb them.
_quick_key = self._session_key_for_source(source)
# Staleness eviction: if an entry has been in _running_agents for
# longer than the agent timeout, it's a leaked lock from a hung or
@@ -4964,10 +4987,12 @@ class GatewayRunner:
pending_path = _hermes_home / ".update_pending.json"
output_path = _hermes_home / ".update_output.txt"
exit_code_path = _hermes_home / ".update_exit_code"
session_key = self._session_key_for_source(event.source)
pending = {
"platform": event.source.platform.value,
"chat_id": event.source.chat_id,
"user_id": event.source.user_id,
"session_key": session_key,
"timestamp": datetime.now().isoformat(),
}
_tmp_pending = pending_path.with_suffix(".tmp")
@@ -4975,12 +5000,18 @@ class GatewayRunner:
_tmp_pending.replace(pending_path)
exit_code_path.unlink(missing_ok=True)
# Spawn `hermes update` detached so it survives gateway restart.
# Spawn `hermes update --gateway` detached so it survives gateway restart.
# --gateway enables file-based IPC for interactive prompts (stash
# restore, config migration) so the gateway can forward them to the
# user instead of silently skipping them.
# Use setsid for portable session detach (works under system services
# where systemd-run --user fails due to missing D-Bus session).
# PYTHONUNBUFFERED ensures output is flushed line-by-line so the
# gateway can stream it to the messenger in near-real-time.
hermes_cmd_str = " ".join(shlex.quote(part) for part in hermes_cmd)
update_cmd = (
f"{hermes_cmd_str} update > {shlex.quote(str(output_path))} 2>&1; "
f"PYTHONUNBUFFERED=1 {hermes_cmd_str} update --gateway"
f" > {shlex.quote(str(output_path))} 2>&1; "
f"status=$?; printf '%s' \"$status\" > {shlex.quote(str(exit_code_path))}"
)
try:
@@ -5007,7 +5038,7 @@ class GatewayRunner:
return f"✗ Failed to start update: {e}"
self._schedule_update_notification_watch()
return "⚕ Starting Hermes update… I'll notify you when it's done."
return "⚕ Starting Hermes update… I'll stream progress here."
def _schedule_update_notification_watch(self) -> None:
"""Ensure a background task is watching for update completion."""
@@ -5017,39 +5048,195 @@ class GatewayRunner:
try:
self._update_notification_task = asyncio.create_task(
self._watch_for_update_completion()
self._watch_update_progress()
)
except RuntimeError:
logger.debug("Skipping update notification watcher: no running event loop")
async def _watch_for_update_completion(
async def _watch_update_progress(
self,
poll_interval: float = 2.0,
stream_interval: float = 4.0,
timeout: float = 1800.0,
) -> None:
"""Wait for ``hermes update`` to finish, then send its notification."""
"""Watch ``hermes update --gateway``, streaming output + forwarding prompts.
Polls ``.update_output.txt`` for new content and sends chunks to the
user periodically. Detects ``.update_prompt.json`` (written by the
update process when it needs user input) and forwards the prompt to
the messenger. The user's next message is intercepted by
``_handle_message`` and written to ``.update_response``.
"""
import json
import re as _re
pending_path = _hermes_home / ".update_pending.json"
claimed_path = _hermes_home / ".update_pending.claimed.json"
output_path = _hermes_home / ".update_output.txt"
exit_code_path = _hermes_home / ".update_exit_code"
prompt_path = _hermes_home / ".update_prompt.json"
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while (pending_path.exists() or claimed_path.exists()) and loop.time() < deadline:
if exit_code_path.exists():
# Resolve the adapter and chat_id for sending messages
adapter = None
chat_id = None
session_key = None
for path in (claimed_path, pending_path):
if path.exists():
try:
pending = json.loads(path.read_text())
platform_str = pending.get("platform")
chat_id = pending.get("chat_id")
session_key = pending.get("session_key")
if platform_str and chat_id:
platform = Platform(platform_str)
adapter = self.adapters.get(platform)
# Fallback session key if not stored (old pending files)
if not session_key:
session_key = f"{platform_str}:{chat_id}"
break
except Exception:
pass
if not adapter or not chat_id:
logger.warning("Update watcher: cannot resolve adapter/chat_id, falling back to completion-only")
# Fall back to old behavior: wait for exit code and send final notification
while (pending_path.exists() or claimed_path.exists()) and loop.time() < deadline:
if exit_code_path.exists():
await self._send_update_notification()
return
await asyncio.sleep(poll_interval)
if (pending_path.exists() or claimed_path.exists()) and not exit_code_path.exists():
exit_code_path.write_text("124")
await self._send_update_notification()
return
def _strip_ansi(text: str) -> str:
return _re.sub(r'\x1b\[[0-9;]*[A-Za-z]', '', text)
bytes_sent = 0
last_stream_time = loop.time()
buffer = ""
async def _flush_buffer() -> None:
"""Send buffered output to the user."""
nonlocal buffer, last_stream_time
if not buffer.strip():
buffer = ""
return
# Chunk to fit message limits (Telegram: 4096, others: generous)
clean = _strip_ansi(buffer).strip()
buffer = ""
last_stream_time = loop.time()
if not clean:
return
# Split into chunks if too long
max_chunk = 3500
chunks = [clean[i:i + max_chunk] for i in range(0, len(clean), max_chunk)]
for chunk in chunks:
try:
await adapter.send(chat_id, f"```\n{chunk}\n```")
except Exception as e:
logger.debug("Update stream send failed: %s", e)
while loop.time() < deadline:
# Check for completion
if exit_code_path.exists():
# Read any remaining output
if output_path.exists():
try:
content = output_path.read_text()
if len(content) > bytes_sent:
buffer += content[bytes_sent:]
bytes_sent = len(content)
except OSError:
pass
await _flush_buffer()
# Send final status
try:
exit_code_raw = exit_code_path.read_text().strip() or "1"
exit_code = int(exit_code_raw)
if exit_code == 0:
await adapter.send(chat_id, "✅ Hermes update finished.")
else:
await adapter.send(chat_id, "❌ Hermes update failed (exit code {}).".format(exit_code))
logger.info("Update finished (exit=%s), notified %s", exit_code, session_key)
except Exception as e:
logger.warning("Update final notification failed: %s", e)
# Cleanup
for p in (pending_path, claimed_path, output_path,
exit_code_path, prompt_path):
p.unlink(missing_ok=True)
(_hermes_home / ".update_response").unlink(missing_ok=True)
self._update_prompt_pending.pop(session_key, None)
return
# Check for new output
if output_path.exists():
try:
content = output_path.read_text()
if len(content) > bytes_sent:
buffer += content[bytes_sent:]
bytes_sent = len(content)
except OSError:
pass
# Flush buffer periodically
if buffer.strip() and (loop.time() - last_stream_time) >= stream_interval:
await _flush_buffer()
# Check for prompts
if prompt_path.exists() and session_key:
try:
prompt_data = json.loads(prompt_path.read_text())
prompt_text = prompt_data.get("prompt", "")
default = prompt_data.get("default", "")
if prompt_text:
# Flush any buffered output first so the user sees
# context before the prompt
await _flush_buffer()
default_hint = f" (default: {default})" if default else ""
await adapter.send(
chat_id,
f"⚕ **Update needs your input:**\n\n"
f"{prompt_text}{default_hint}\n\n"
f"_Reply with your answer, or wait 5 min for default._"
)
self._update_prompt_pending[session_key] = True
logger.info("Forwarded update prompt to %s: %s", session_key, prompt_text[:80])
except (json.JSONDecodeError, OSError) as e:
logger.debug("Failed to read update prompt: %s", e)
await asyncio.sleep(poll_interval)
if (pending_path.exists() or claimed_path.exists()) and not exit_code_path.exists():
logger.warning("Update watcher timed out waiting for completion marker")
# Timeout
if not exit_code_path.exists():
logger.warning("Update watcher timed out after %.0fs", timeout)
exit_code_path.write_text("124")
await self._send_update_notification()
await _flush_buffer()
try:
await adapter.send(chat_id, "❌ Hermes update timed out after 30 minutes.")
except Exception:
pass
for p in (pending_path, claimed_path, output_path,
exit_code_path, prompt_path):
p.unlink(missing_ok=True)
(_hermes_home / ".update_response").unlink(missing_ok=True)
self._update_prompt_pending.pop(session_key, None)
async def _send_update_notification(self) -> bool:
"""If an update finished, notify the user.
Returns False when the update is still running so a caller can retry
later. Returns True after a definitive send/skip decision.
This is the legacy notification path used when the streaming watcher
cannot resolve the adapter (e.g. after a gateway restart where the
platform hasn't reconnected yet).
"""
import json
import re as _re