mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 04:08:28 +08:00
Compare commits
2 Commits
salvage/pr
...
extend-hoo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f24346c011 | ||
|
|
299b9dba67 |
@@ -1218,6 +1218,23 @@ def main(
|
||||
# List available distributions
|
||||
python batch_runner.py --list_distributions
|
||||
"""
|
||||
# Cross-process hook delivery: start the forwarder if a dashboard
|
||||
# is reachable. No-op when no dashboard is running or when
|
||||
# ``HERMES_HOOK_FORWARDER=0`` is set. Currently the batch runner
|
||||
# emits no hooks directly (only the gateway and TUI do today), but
|
||||
# wiring here future-proofs batch-spawned agent runs that may emit
|
||||
# hooks via tools. Forwarder workers spawned by ``multiprocessing.Pool``
|
||||
# below would each need their own; that's deferred to a follow-up.
|
||||
# See gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
|
||||
try:
|
||||
from gateway import hook_forwarder
|
||||
from gateway.hooks import get_default_registry
|
||||
hook_forwarder.start_if_dashboard_available(
|
||||
get_default_registry(), src="batch"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Handle list distributions
|
||||
if list_distributions:
|
||||
from toolset_distributions import print_distribution_info
|
||||
|
||||
16
cli.py
16
cli.py
@@ -12515,6 +12515,22 @@ class HermesCLI:
|
||||
|
||||
def run(self):
|
||||
"""Run the interactive CLI loop with persistent input at bottom."""
|
||||
# Cross-process hook delivery: start the forwarder if a
|
||||
# dashboard is reachable. No-op when no dashboard is running
|
||||
# or when ``HERMES_HOOK_FORWARDER=0`` is set. Currently the
|
||||
# CLI process emits no hooks directly (only the gateway and
|
||||
# TUI do today), but wiring here future-proofs CLI-spawned
|
||||
# agent runs that may emit hooks via tools. See
|
||||
# gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
|
||||
try:
|
||||
from gateway import hook_forwarder
|
||||
from gateway.hooks import get_default_registry
|
||||
hook_forwarder.start_if_dashboard_available(
|
||||
get_default_registry(), src="cli"
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Detect light/dark terminal mode now (before pt grabs the tty).
|
||||
# Caches the result so subsequent _hex_to_ansi / style calls
|
||||
# don't risk re-querying mid-render.
|
||||
|
||||
450
gateway/hook_forwarder.py
Normal file
450
gateway/hook_forwarder.py
Normal file
@@ -0,0 +1,450 @@
|
||||
"""Cross-process hook forwarder.
|
||||
|
||||
Subscribes to a fixed set of namespaces on a process-local
|
||||
:class:`gateway.hooks.HookRegistry` and POSTs each fired event to the
|
||||
dashboard's ``/api/hooks/ingest`` endpoint. The dashboard republishes
|
||||
the event on its own default registry so plugins running in the
|
||||
dashboard process see events that originated in the gateway, TUI,
|
||||
subagent, or batch-runner processes.
|
||||
|
||||
Design constraints (see ``DESIGN-cross-process-hooks.md``):
|
||||
|
||||
* Never blocks the publisher. Handler enqueues onto a bounded queue
|
||||
and returns immediately; a daemon worker thread does the HTTP POSTs.
|
||||
* Bounded queue drops *oldest* on overflow. Observability events are
|
||||
best-effort; recency beats history when the dashboard is slow.
|
||||
* Loop prevention. Events whose context carries ``_forwarded=True``
|
||||
are skipped — those came from the ingest endpoint and must not be
|
||||
shipped back.
|
||||
* No-op when no dashboard is available. Discovery file absent ⇒ no
|
||||
registration, no thread. Probe re-checks every 30s so a dashboard
|
||||
that starts later auto-attaches.
|
||||
* ``HERMES_HOOK_FORWARDER=0`` short-circuits everything for paranoid
|
||||
security postures, or for tests that don't want the daemon thread.
|
||||
|
||||
The forwarder is wired in by long-lived non-dashboard processes
|
||||
(gateway, TUI, subagents, batch runners) via
|
||||
:func:`start_if_dashboard_available`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from queue import Empty, Full, Queue
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from hermes_cli.config import get_hermes_home
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Namespaces the forwarder ships to the dashboard. Picked to cover every
|
||||
# event type the registry emits today. Adding a new namespace is a
|
||||
# one-line append here. Wildcards use the existing registry semantics
|
||||
# (``<namespace>:*`` matches every ``<namespace>:<anything>`` event).
|
||||
_FORWARDED_NAMESPACES = (
|
||||
"tui:*",
|
||||
"agent:*",
|
||||
"session:*",
|
||||
"command:*",
|
||||
"gateway:*",
|
||||
)
|
||||
|
||||
# Bounded queue size per source process. At peak (~70 events/sec, see the
|
||||
# design doc's "Performance" section) this is ~14 seconds of backlog
|
||||
# before drop-oldest kicks in. Generous for a purely-observability feed.
|
||||
_QUEUE_MAX = 1024
|
||||
|
||||
# Probe cadence. When the dashboard isn't running we re-check the
|
||||
# discovery file every 30s so a delayed ``hermes dashboard`` startup
|
||||
# eventually attaches. Cheap (one stat + one health GET) so this can
|
||||
# run forever without overhead.
|
||||
_PROBE_INTERVAL_S = 30.0
|
||||
|
||||
# HTTP timeouts. Connection is loopback in the common case so anything
|
||||
# beyond 2s probably means the dashboard is wedged; better to drop the
|
||||
# frame than queue up retries that won't help.
|
||||
_HTTP_TIMEOUT_S = 2.0
|
||||
|
||||
# Error-logging cadence. POST failures get logged once per minute, not
|
||||
# per failure, so a downed dashboard doesn't spam ``agent.log``.
|
||||
_ERROR_LOG_INTERVAL_S = 60.0
|
||||
|
||||
|
||||
def _dashboard_discovery_path() -> Path:
|
||||
"""Return the path the dashboard writes its discovery JSON to.
|
||||
|
||||
Always under ``$HERMES_HOME``; no ``/tmp`` fallback. Config
|
||||
consistency across processes is a precondition, not something the
|
||||
forwarder patches over.
|
||||
"""
|
||||
return get_hermes_home() / "dashboard.json"
|
||||
|
||||
|
||||
def _read_discovery_file() -> Optional[dict]:
|
||||
"""Read and parse ``dashboard.json``.
|
||||
|
||||
Returns ``None`` when the file is absent, unreadable, malformed, or
|
||||
missing required keys. Never raises — callers treat a ``None``
|
||||
result as "no dashboard available" and re-probe on the next cycle.
|
||||
"""
|
||||
path = _dashboard_discovery_path()
|
||||
try:
|
||||
if not path.exists():
|
||||
return None
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
if "url" not in data or "hooks_ingest_token" not in data:
|
||||
return None
|
||||
return data
|
||||
|
||||
|
||||
def _disabled_by_env() -> bool:
|
||||
"""``HERMES_HOOK_FORWARDER=0`` (or ``false``/``no``) short-circuits."""
|
||||
val = os.environ.get("HERMES_HOOK_FORWARDER", "").strip().lower()
|
||||
return val in ("0", "false", "no", "off")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# HookForwarder — per-process singleton-ish
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _HookForwarder:
|
||||
"""The actual forwarder.
|
||||
|
||||
Tracks the registry it's attached to, the unregister callables (so
|
||||
:meth:`stop` can clean up), and the worker thread state.
|
||||
|
||||
Multiple instances are technically supported but in practice each
|
||||
process holds at most one — see :func:`start_if_dashboard_available`
|
||||
and the module-level ``_active`` reference.
|
||||
"""
|
||||
|
||||
def __init__(self, src: str) -> None:
|
||||
self.src = src
|
||||
self._queue: Queue[dict] = Queue(maxsize=_QUEUE_MAX)
|
||||
self._unregisters: list[Callable[[], None]] = []
|
||||
self._stop = threading.Event()
|
||||
self._worker: Optional[threading.Thread] = None
|
||||
# Discovery state — refreshed by the worker thread before each
|
||||
# POST so a dashboard restart (with a new token) auto-recovers.
|
||||
self._discovery: Optional[dict] = None
|
||||
self._discovery_lock = threading.Lock()
|
||||
# Error-log rate limit state.
|
||||
self._last_error_log_at: float = 0.0
|
||||
self._error_count_since_log: int = 0
|
||||
|
||||
# -- registry side ---------------------------------------------------
|
||||
|
||||
def _handler(self, event_type: str, context: dict) -> None:
|
||||
"""Sync hook handler — enqueues the event for the worker thread."""
|
||||
# Loop prevention: events forwarded *into* this process must not
|
||||
# be shipped back. The dashboard's ingest endpoint stamps every
|
||||
# republished context with ``_forwarded=True``.
|
||||
if context.get("_forwarded") is True:
|
||||
return
|
||||
try:
|
||||
self._queue.put_nowait(
|
||||
{"event_type": event_type, "context": context, "src": self.src}
|
||||
)
|
||||
except Full:
|
||||
# Drop oldest, enqueue newest. Observability is best-effort.
|
||||
try:
|
||||
self._queue.get_nowait()
|
||||
except Empty:
|
||||
pass
|
||||
try:
|
||||
self._queue.put_nowait(
|
||||
{"event_type": event_type, "context": context, "src": self.src}
|
||||
)
|
||||
except Full:
|
||||
# Pathological — worker isn't draining at all. Give up
|
||||
# silently; next event will overwrite this slot.
|
||||
pass
|
||||
|
||||
def register(self, registry: "object") -> None:
|
||||
"""Register the handler for every forwarded namespace."""
|
||||
for pattern in _FORWARDED_NAMESPACES:
|
||||
unreg = registry.register( # type: ignore[union-attr]
|
||||
pattern,
|
||||
self._handler,
|
||||
name=f"hook_forwarder({pattern}→dashboard)",
|
||||
)
|
||||
self._unregisters.append(unreg)
|
||||
|
||||
def unregister(self) -> None:
|
||||
"""Remove every handler the forwarder installed. Idempotent."""
|
||||
while self._unregisters:
|
||||
try:
|
||||
self._unregisters.pop()()
|
||||
except Exception: # pragma: no cover — defensive
|
||||
pass
|
||||
|
||||
# -- worker thread ---------------------------------------------------
|
||||
|
||||
def start_worker(self) -> None:
|
||||
"""Spawn the daemon worker thread that drains the queue."""
|
||||
if self._worker is not None and self._worker.is_alive():
|
||||
return
|
||||
self._stop.clear()
|
||||
self._worker = threading.Thread(
|
||||
target=self._worker_loop,
|
||||
name=f"hook-forwarder-{self.src}",
|
||||
daemon=True,
|
||||
)
|
||||
self._worker.start()
|
||||
|
||||
def stop_worker(self, *, join_timeout: float = 1.0) -> None:
|
||||
"""Signal the worker to exit and wait briefly for it to do so.
|
||||
|
||||
Idempotent. Safe to call from any thread. Daemon threads die
|
||||
with the process anyway; the join is only there so tests don't
|
||||
race on lingering threads between cases.
|
||||
"""
|
||||
self._stop.set()
|
||||
if self._worker is not None:
|
||||
try:
|
||||
self._worker.join(timeout=join_timeout)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
def _worker_loop(self) -> None:
|
||||
"""Drain the queue, POSTing each frame, until ``_stop`` is set."""
|
||||
# httpx is imported lazily so processes that never produce events
|
||||
# don't pay the import cost. In practice the gateway and TUI
|
||||
# both have httpx loaded by the time we get here, but defer
|
||||
# anyway to be polite.
|
||||
import httpx
|
||||
|
||||
next_probe_at: float = 0.0
|
||||
|
||||
with httpx.Client(timeout=_HTTP_TIMEOUT_S) as client:
|
||||
while not self._stop.is_set():
|
||||
# Probe the discovery file periodically so a dashboard
|
||||
# that comes up later (or restarts with a new token)
|
||||
# auto-attaches.
|
||||
now = time.monotonic()
|
||||
if now >= next_probe_at:
|
||||
next_probe_at = now + _PROBE_INTERVAL_S
|
||||
self._refresh_discovery(client)
|
||||
|
||||
# Block on the queue. Short timeout so we re-check the
|
||||
# stop flag and probe interval often enough.
|
||||
try:
|
||||
frame = self._queue.get(timeout=1.0)
|
||||
except Empty:
|
||||
continue
|
||||
|
||||
with self._discovery_lock:
|
||||
discovery = self._discovery
|
||||
|
||||
if discovery is None:
|
||||
# No dashboard available right now. Drop the frame;
|
||||
# the next probe will re-acquire discovery, and any
|
||||
# events fired between now and then are lost (the
|
||||
# design accepts best-effort delivery).
|
||||
continue
|
||||
|
||||
self._post_frame(client, discovery, frame)
|
||||
|
||||
def _refresh_discovery(self, client: "Any") -> None: # client: httpx.Client
|
||||
"""Reload the discovery file and probe the dashboard's health.
|
||||
|
||||
Updates ``self._discovery`` to the new value (or ``None`` if the
|
||||
dashboard is unreachable). Called from the worker thread.
|
||||
"""
|
||||
data = _read_discovery_file()
|
||||
if data is None:
|
||||
with self._discovery_lock:
|
||||
self._discovery = None
|
||||
return
|
||||
|
||||
url = data["url"].rstrip("/")
|
||||
try:
|
||||
resp = client.get(f"{url}/api/hooks/health", timeout=_HTTP_TIMEOUT_S)
|
||||
if resp.status_code != 200:
|
||||
with self._discovery_lock:
|
||||
self._discovery = None
|
||||
return
|
||||
except Exception:
|
||||
with self._discovery_lock:
|
||||
self._discovery = None
|
||||
return
|
||||
|
||||
with self._discovery_lock:
|
||||
self._discovery = data
|
||||
|
||||
def _post_frame(
|
||||
self,
|
||||
client: "Any", # httpx.Client
|
||||
discovery: dict,
|
||||
frame: dict,
|
||||
) -> None:
|
||||
"""POST one frame to ``/api/hooks/ingest``.
|
||||
|
||||
Errors are logged at most once per minute, regardless of how
|
||||
many failures accumulate. A 401 invalidates the cached
|
||||
discovery so the next probe re-reads the token (which may have
|
||||
rotated on a dashboard restart).
|
||||
"""
|
||||
url = discovery["url"].rstrip("/") + "/api/hooks/ingest"
|
||||
token = discovery["hooks_ingest_token"]
|
||||
# Filter the context: ``_forwarded`` etc. are added by the
|
||||
# dashboard on republish. Source-side contexts are passed as-is,
|
||||
# but they should never carry ``_forwarded=True`` (that's how
|
||||
# loop prevention works above).
|
||||
try:
|
||||
resp = client.post(
|
||||
url,
|
||||
json={
|
||||
"event_type": frame["event_type"],
|
||||
"context": frame["context"],
|
||||
"src": frame["src"],
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
timeout=_HTTP_TIMEOUT_S,
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log_error(f"POST {url} failed: {exc}")
|
||||
return
|
||||
|
||||
if resp.status_code == 401:
|
||||
# Token rotated (dashboard restarted with a new one).
|
||||
# Invalidate the cached discovery; the next probe will
|
||||
# re-read the file and pick up the new token.
|
||||
self._log_error(
|
||||
"POST /api/hooks/ingest returned 401 — token rotated; "
|
||||
"invalidating discovery cache"
|
||||
)
|
||||
with self._discovery_lock:
|
||||
self._discovery = None
|
||||
return
|
||||
if resp.status_code >= 400:
|
||||
self._log_error(
|
||||
f"POST /api/hooks/ingest returned {resp.status_code}: "
|
||||
f"{resp.text[:200]!r}"
|
||||
)
|
||||
|
||||
def _log_error(self, message: str) -> None:
|
||||
"""Rate-limited error logging — once per minute per process."""
|
||||
self._error_count_since_log += 1
|
||||
now = time.monotonic()
|
||||
if now - self._last_error_log_at < _ERROR_LOG_INTERVAL_S:
|
||||
return
|
||||
suppressed = self._error_count_since_log - 1
|
||||
suffix = f" ({suppressed} similar errors suppressed)" if suppressed else ""
|
||||
_log.warning("[hook_forwarder] %s%s", message, suffix)
|
||||
self._last_error_log_at = now
|
||||
self._error_count_since_log = 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Per-process forwarder. Multiple ``start_if_dashboard_available`` calls
|
||||
# in the same process are idempotent (they re-use this instance).
|
||||
_active: Optional[_HookForwarder] = None
|
||||
_active_lock = threading.Lock()
|
||||
|
||||
|
||||
def start_if_dashboard_available(
|
||||
registry: "object",
|
||||
*,
|
||||
src: str = "unknown",
|
||||
) -> Optional[_HookForwarder]:
|
||||
"""Wire the forwarder into ``registry`` if a dashboard is reachable.
|
||||
|
||||
Idempotent: repeated calls in the same process are no-ops. Returns
|
||||
the active forwarder if one is running (the same instance on every
|
||||
subsequent call within the process), or ``None`` if the forwarder
|
||||
was suppressed (no dashboard, or ``HERMES_HOOK_FORWARDER=0``).
|
||||
|
||||
The ``src`` argument is a short tag identifying the source process
|
||||
(``"gateway"``, ``"tui"``, ``"subagent"``, ``"batch"``). It's
|
||||
included in the wire frame for the dashboard's diagnostic logging
|
||||
and the republished context's ``_forwarded_from`` field, so a
|
||||
subscriber can tell which process originated each event.
|
||||
|
||||
Args:
|
||||
registry: The :class:`gateway.hooks.HookRegistry` whose events
|
||||
should be forwarded. Normally
|
||||
:func:`gateway.hooks.get_default_registry`.
|
||||
src: Source-process tag. See above.
|
||||
|
||||
Returns:
|
||||
The active forwarder, or ``None`` if forwarding is suppressed.
|
||||
"""
|
||||
global _active
|
||||
|
||||
if _disabled_by_env():
|
||||
_log.debug(
|
||||
"[hook_forwarder] HERMES_HOOK_FORWARDER=0 — skipping start"
|
||||
)
|
||||
return None
|
||||
|
||||
discovery = _read_discovery_file()
|
||||
if discovery is None:
|
||||
_log.debug(
|
||||
"[hook_forwarder] no dashboard.json — forwarder not started"
|
||||
)
|
||||
return None
|
||||
|
||||
with _active_lock:
|
||||
if _active is not None:
|
||||
# Already started in this process; nothing to do.
|
||||
return _active
|
||||
|
||||
fwd = _HookForwarder(src=src)
|
||||
try:
|
||||
fwd.register(registry)
|
||||
except Exception as e:
|
||||
_log.warning(
|
||||
"[hook_forwarder] failed to register handlers: %s", e
|
||||
)
|
||||
return None
|
||||
|
||||
fwd.start_worker()
|
||||
_active = fwd
|
||||
_log.info(
|
||||
"[hook_forwarder] started for src=%s, forwarding %d namespaces",
|
||||
src,
|
||||
len(_FORWARDED_NAMESPACES),
|
||||
)
|
||||
return fwd
|
||||
|
||||
|
||||
def stop() -> None:
|
||||
"""Tear down the active forwarder. Idempotent.
|
||||
|
||||
Primarily for tests; production processes just rely on the daemon
|
||||
thread dying with the process.
|
||||
"""
|
||||
global _active
|
||||
with _active_lock:
|
||||
if _active is None:
|
||||
return
|
||||
_active.unregister()
|
||||
_active.stop_worker()
|
||||
_active = None
|
||||
|
||||
|
||||
def _reset_for_tests() -> None:
|
||||
"""Test helper — clears active state without preserving its
|
||||
side-effects (registered handlers etc.)."""
|
||||
stop()
|
||||
|
||||
|
||||
def is_active() -> bool:
|
||||
"""Return whether a forwarder is currently registered in this process."""
|
||||
return _active is not None
|
||||
248
gateway/hooks.py
248
gateway/hooks.py
@@ -2,19 +2,40 @@
|
||||
Event Hook System
|
||||
|
||||
A lightweight event-driven system that fires handlers at key lifecycle points.
|
||||
Hooks are discovered from ~/.hermes/hooks/ directories, each containing:
|
||||
- HOOK.yaml (metadata: name, description, events list)
|
||||
- handler.py (Python handler with async def handle(event_type, context))
|
||||
|
||||
Events:
|
||||
- gateway:startup -- Gateway process starts
|
||||
- session:start -- New session created (first message of a new session)
|
||||
- session:end -- Session ends (user ran /new or /reset)
|
||||
- session:reset -- Session reset completed (new session entry created)
|
||||
- agent:start -- Agent begins processing a message
|
||||
- agent:step -- Each turn in the tool-calling loop
|
||||
- agent:end -- Agent finishes processing
|
||||
- command:* -- Any slash command executed (wildcard match)
|
||||
There are two ways to register a handler:
|
||||
|
||||
1. **File-system discovery** — drop a directory into ``~/.hermes/hooks/``
|
||||
containing ``HOOK.yaml`` (metadata: name, description, events list) and
|
||||
``handler.py`` (with ``def handle(event_type, context)``, sync or async).
|
||||
These are loaded by :meth:`HookRegistry.discover_and_load` at gateway
|
||||
startup.
|
||||
|
||||
2. **Programmatic registration** — call :meth:`HookRegistry.register` from
|
||||
inside the process. Useful for plugins that ship their own bundled hooks
|
||||
without expecting the user to maintain a ``~/.hermes/hooks/`` entry. Pairs
|
||||
with :func:`get_default_registry` so plugins don't have to hold a registry
|
||||
reference threaded through every call site.
|
||||
|
||||
Events fired today:
|
||||
|
||||
- ``gateway:startup`` — Gateway process starts
|
||||
- ``session:start`` — New session created (first message of a new session)
|
||||
- ``session:end`` — Session ends (user ran /new or /reset)
|
||||
- ``session:reset`` — Session reset completed (new session entry created)
|
||||
- ``agent:start`` — Agent begins processing a message
|
||||
- ``agent:step`` — Each turn in the tool-calling loop
|
||||
- ``agent:end`` — Agent finishes processing
|
||||
- ``command:*`` — Any slash command executed (wildcard match)
|
||||
- ``tui:<sub-event>`` — Any TUI gateway dispatch event mirrored to the
|
||||
bus (``tui:tool.start``, ``tui:message.delta``,
|
||||
``tui:reasoning.available``, etc.). Subscribe
|
||||
with the full name for one event, or
|
||||
``tui:*`` for all of them.
|
||||
|
||||
Wildcards match one colon-separated namespace level: a handler registered for
|
||||
``foo:*`` fires for every ``foo:<anything>`` event, but not for
|
||||
``bar:something``.
|
||||
|
||||
Errors in hooks are caught and logged but never block the main pipeline.
|
||||
"""
|
||||
@@ -32,6 +53,12 @@ from hermes_cli.config import get_hermes_home
|
||||
HOOKS_DIR = get_hermes_home() / "hooks"
|
||||
|
||||
|
||||
# Tracks handler functions we've already warned about for emit_sync's
|
||||
# async-without-loop case, so each bad combination only logs once per
|
||||
# process instead of flooding stderr on every event.
|
||||
_ASYNC_NO_LOOP_WARNED: "set[int]" = set()
|
||||
|
||||
|
||||
class HookRegistry:
|
||||
"""
|
||||
Discovers, loads, and fires event hooks.
|
||||
@@ -52,6 +79,60 @@ class HookRegistry:
|
||||
"""Return metadata about all loaded hooks."""
|
||||
return list(self._loaded_hooks)
|
||||
|
||||
def register(
|
||||
self,
|
||||
event_type: str,
|
||||
handler: Callable,
|
||||
*,
|
||||
name: Optional[str] = None,
|
||||
) -> Callable[[], None]:
|
||||
"""Programmatically register a handler for ``event_type``.
|
||||
|
||||
Intended for in-process plugins, tests, and built-in hooks. Pairs with
|
||||
the file-system discovery path (HOOK.yaml + handler.py) — both share
|
||||
the same dispatch and wildcard rules.
|
||||
|
||||
The handler signature matches discovered hooks: ``handle(event_type,
|
||||
context)`` where ``handler`` may be sync or async.
|
||||
|
||||
Returns a callable that, when invoked, removes this specific handler
|
||||
registration from the registry. Other handlers for the same event are
|
||||
unaffected.
|
||||
|
||||
Args:
|
||||
event_type: Event identifier such as ``agent:start`` or
|
||||
``tui:tool.start``. May also be a wildcard like ``command:*``.
|
||||
handler: Function or coroutine function to invoke when the
|
||||
event fires.
|
||||
name: Optional friendly name recorded alongside the
|
||||
registration metadata for listing/debugging. Defaults to the
|
||||
handler's ``__name__``.
|
||||
|
||||
Returns:
|
||||
A no-arg callable that unregisters this handler when called.
|
||||
"""
|
||||
self._handlers.setdefault(event_type, []).append(handler)
|
||||
|
||||
meta = {
|
||||
"name": name or getattr(handler, "__name__", "<anonymous>"),
|
||||
"description": "(registered programmatically)",
|
||||
"events": [event_type],
|
||||
"path": "<programmatic>",
|
||||
}
|
||||
self._loaded_hooks.append(meta)
|
||||
|
||||
def _unregister() -> None:
|
||||
try:
|
||||
self._handlers.get(event_type, []).remove(handler)
|
||||
except ValueError:
|
||||
pass
|
||||
try:
|
||||
self._loaded_hooks.remove(meta)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return _unregister
|
||||
|
||||
def _register_builtin_hooks(self) -> None:
|
||||
"""Register built-in hooks that are always active.
|
||||
|
||||
@@ -208,3 +289,146 @@ class HookRegistry:
|
||||
except Exception as e:
|
||||
print(f"[hooks] Error in handler for '{event_type}': {e}", flush=True)
|
||||
return results
|
||||
|
||||
def emit_sync(
|
||||
self,
|
||||
event_type: str,
|
||||
context: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
"""Fire handlers from a synchronous caller.
|
||||
|
||||
Companion to :meth:`emit` for hot-path callers that cannot await — most
|
||||
notably ``tui_gateway/server.py:_emit``, which serves both async dispatch
|
||||
paths and sync callback paths and must remain ``def`` (not ``async
|
||||
def``).
|
||||
|
||||
Behavior:
|
||||
|
||||
- Sync handlers run immediately, in registration order. Exceptions are
|
||||
caught and logged so a buggy handler can't break the host pipeline.
|
||||
- Async handlers (coroutine functions) are scheduled via
|
||||
``asyncio.ensure_future`` if a running event loop is available in the
|
||||
current thread. If no loop is running, the handler is **skipped** and
|
||||
a one-time warning is logged per handler — async handlers in a
|
||||
purely sync process don't have a way to make forward progress.
|
||||
|
||||
Like :meth:`emit`, never raises and never blocks waiting on async
|
||||
handlers — fire-and-forget for the async case.
|
||||
|
||||
Args:
|
||||
event_type: The event identifier (e.g. ``tui:tool.start``).
|
||||
context: Optional dict with event-specific data.
|
||||
"""
|
||||
if context is None:
|
||||
context = {}
|
||||
|
||||
for fn in self._resolve_handlers(event_type):
|
||||
try:
|
||||
result = fn(event_type, context)
|
||||
except Exception as e:
|
||||
print(f"[hooks] Error in handler for '{event_type}': {e}", flush=True)
|
||||
continue
|
||||
|
||||
if not asyncio.iscoroutine(result):
|
||||
continue
|
||||
|
||||
# Coroutine returned — needs a loop to make progress.
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No running loop in this thread.
|
||||
handler_id = id(fn)
|
||||
if handler_id not in _ASYNC_NO_LOOP_WARNED:
|
||||
_ASYNC_NO_LOOP_WARNED.add(handler_id)
|
||||
handler_name = getattr(fn, "__name__", "<anonymous>")
|
||||
print(
|
||||
f"[hooks] Skipping async handler {handler_name!r} for "
|
||||
f"'{event_type}' — emit_sync called with no running "
|
||||
f"event loop. Subsequent skips for this handler are "
|
||||
f"silent.",
|
||||
flush=True,
|
||||
)
|
||||
# Close the coroutine to suppress "coroutine was never
|
||||
# awaited" RuntimeWarning noise.
|
||||
try:
|
||||
result.close()
|
||||
except Exception:
|
||||
pass
|
||||
continue
|
||||
|
||||
try:
|
||||
# ensure_future schedules the coroutine on the loop and
|
||||
# returns immediately. Exceptions inside the coroutine
|
||||
# surface via the task's done callback (or asyncio's
|
||||
# default exception handler) — we don't await here.
|
||||
task = asyncio.ensure_future(result, loop=loop)
|
||||
task.add_done_callback(_log_task_exception)
|
||||
except Exception as e:
|
||||
print(
|
||||
f"[hooks] Failed to schedule async handler for "
|
||||
f"'{event_type}': {e}",
|
||||
flush=True,
|
||||
)
|
||||
|
||||
|
||||
def _log_task_exception(task: "asyncio.Task[Any]") -> None:
|
||||
"""Surface exceptions from scheduled async hook handlers.
|
||||
|
||||
Without this callback, an exception inside a fire-and-forget handler
|
||||
coroutine becomes "Task exception was never retrieved" noise from
|
||||
asyncio's default exception handler at GC time. Logging it explicitly
|
||||
keeps the failure mode visible and consistent with the sync path.
|
||||
"""
|
||||
if task.cancelled():
|
||||
return
|
||||
exc = task.exception()
|
||||
if exc is not None:
|
||||
print(f"[hooks] Async handler raised: {exc}", flush=True)
|
||||
|
||||
|
||||
# ── Module-level default registry ──────────────────────────────────
|
||||
#
|
||||
# Plugins and in-process callers (TUI gateway's ``_emit`` etc.) need a
|
||||
# stable place to find "the" registry without threading a reference
|
||||
# through every API. The gateway process installs its own
|
||||
# ``self.hooks`` instance as the default during startup so file-system
|
||||
# discovery and built-in hooks share state with programmatic
|
||||
# registrations. Other processes (TUI) lazily get their own default on
|
||||
# first access and run ``discover_and_load()`` themselves.
|
||||
|
||||
_default_registry: Optional["HookRegistry"] = None
|
||||
|
||||
|
||||
def get_default_registry() -> "HookRegistry":
|
||||
"""Return the process-wide default :class:`HookRegistry`.
|
||||
|
||||
Lazily creates one (without auto-running discovery) on first call. Callers
|
||||
that need file-system hook discovery should invoke
|
||||
:meth:`HookRegistry.discover_and_load` themselves after first access — the
|
||||
gateway already does this for the registry it installs as the default.
|
||||
"""
|
||||
global _default_registry
|
||||
if _default_registry is None:
|
||||
_default_registry = HookRegistry()
|
||||
return _default_registry
|
||||
|
||||
|
||||
def install_as_default(registry: "HookRegistry") -> None:
|
||||
"""Install ``registry`` as the process-wide default.
|
||||
|
||||
Intended for the gateway and other long-lived hosts that want their own
|
||||
:class:`HookRegistry` instance to be visible to in-process plugins through
|
||||
:func:`get_default_registry`. Idempotent — installing the same registry
|
||||
twice is a no-op; installing a different registry replaces the previous
|
||||
default.
|
||||
"""
|
||||
global _default_registry
|
||||
_default_registry = registry
|
||||
|
||||
|
||||
def _reset_default_registry_for_tests() -> None:
|
||||
"""Test helper — clears the cached default so each test starts fresh."""
|
||||
global _default_registry
|
||||
_default_registry = None
|
||||
_ASYNC_NO_LOOP_WARNED.clear()
|
||||
|
||||
|
||||
@@ -1862,8 +1862,23 @@ class GatewayRunner:
|
||||
self.pairing_store = PairingStore()
|
||||
|
||||
# Event hook system
|
||||
from gateway.hooks import HookRegistry
|
||||
from gateway.hooks import HookRegistry, install_as_default
|
||||
self.hooks = HookRegistry()
|
||||
# Expose this registry as the process-wide default so in-process
|
||||
# plugins (and any other component that uses ``get_default_registry()``)
|
||||
# share state with file-system-discovered hooks loaded into ``self.hooks``.
|
||||
install_as_default(self.hooks)
|
||||
# Cross-process delivery: start the hook forwarder if a
|
||||
# dashboard is reachable. No-op when the dashboard isn't
|
||||
# running, or when ``HERMES_HOOK_FORWARDER=0`` is set. See
|
||||
# gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
|
||||
try:
|
||||
from gateway import hook_forwarder
|
||||
hook_forwarder.start_if_dashboard_available(self.hooks, src="gateway")
|
||||
except Exception as e: # pragma: no cover — defensive
|
||||
# Forwarder failure must never break gateway startup. Log
|
||||
# and move on; dashboard plugins just won't see gateway events.
|
||||
print(f"[gateway] hook forwarder start failed: {e}", flush=True)
|
||||
|
||||
# Per-chat voice reply mode: "off" | "voice_only" | "all"
|
||||
self._voice_mode: Dict[str, str] = self._load_voice_modes()
|
||||
|
||||
@@ -41,6 +41,12 @@ _GATE_PUBLIC_PREFIXES: tuple[str, ...] = (
|
||||
"/ds-assets/",
|
||||
"/fonts/",
|
||||
"/fonts-terminal/",
|
||||
# Cross-process hook delivery — bearer-token authenticated independently
|
||||
# of the OAuth gate (see hermes_cli/hook_ingest.py + DESIGN-cross-
|
||||
# process-hooks.md). Forwarders running outside any logged-in browser
|
||||
# session need to reach these endpoints; the bearer token is the
|
||||
# security boundary.
|
||||
"/api/hooks/",
|
||||
)
|
||||
|
||||
|
||||
|
||||
282
hermes_cli/hook_ingest.py
Normal file
282
hermes_cli/hook_ingest.py
Normal file
@@ -0,0 +1,282 @@
|
||||
"""Hook registry cross-process delivery — dashboard-side endpoints.
|
||||
|
||||
The companion to ``gateway/hook_forwarder.py``. Provides:
|
||||
|
||||
* ``write_dashboard_discovery_file()`` / ``remove_dashboard_discovery_file()`` —
|
||||
drops/cleans ``$HERMES_HOME/dashboard.json`` with the bound dashboard URL and
|
||||
a freshly-generated bearer token. Long-lived source processes (gateway,
|
||||
TUI, subagents) read this file on startup to find the dashboard and
|
||||
authenticate ingest POSTs.
|
||||
|
||||
* :func:`build_hook_router` — returns a FastAPI router with two endpoints:
|
||||
|
||||
- ``GET /api/hooks/health`` — unauthenticated reachability probe. The
|
||||
forwarder GETs this before each round of POSTs so a downed dashboard
|
||||
fails fast without spamming the ingest endpoint with retries.
|
||||
|
||||
- ``POST /api/hooks/ingest`` — accepts ``{event_type, context, src}``
|
||||
frames from forwarders and republishes them via
|
||||
``get_default_registry().emit_sync(event_type, context)``. Bearer
|
||||
token from ``dashboard.json:hooks_ingest_token`` is the security
|
||||
boundary, independent of the dashboard's session token / OAuth
|
||||
gate. Loop-prevention: the republished context is stamped with
|
||||
``_forwarded=True`` and ``_forwarded_from=<src>`` so source-side
|
||||
forwarders skip these events if they ever round-trip back.
|
||||
|
||||
See ``DESIGN-cross-process-hooks.md`` for the full design rationale.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hmac
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import secrets
|
||||
import stat
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
|
||||
from hermes_cli.config import get_hermes_home
|
||||
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Discovery file
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# The dashboard writes this file on startup and removes it on shutdown.
|
||||
# Forwarders running in non-dashboard processes read it to discover the
|
||||
# dashboard URL + bearer token. Lives under HERMES_HOME so it's
|
||||
# profile-aware; no /tmp fallback.
|
||||
|
||||
_DISCOVERY_FILENAME = "dashboard.json"
|
||||
|
||||
# Token used by forwarders to authenticate POST /api/hooks/ingest.
|
||||
# Generated fresh on each dashboard startup; lives only in memory + the
|
||||
# 0600-mode discovery file. Stored at module level so the ingest
|
||||
# endpoint can compare against it without re-reading the file on every
|
||||
# request.
|
||||
_HOOKS_INGEST_TOKEN: str = ""
|
||||
|
||||
|
||||
def _discovery_path() -> Path:
|
||||
"""Return the absolute path of ``$HERMES_HOME/dashboard.json``."""
|
||||
return get_hermes_home() / _DISCOVERY_FILENAME
|
||||
|
||||
|
||||
def write_dashboard_discovery_file(host: str, port: int) -> str:
|
||||
"""Generate a fresh hooks-ingest token and write the discovery file.
|
||||
|
||||
The discovery file is written atomically (write to ``.tmp``, rename
|
||||
to final path) so a forwarder that reads concurrently never sees a
|
||||
partially-written file. The file mode is ``0600`` — owner-only —
|
||||
so a same-host non-root user can't read the token without already
|
||||
having compromised the dashboard user's account.
|
||||
|
||||
Idempotent in the trivial sense: calling twice in the same process
|
||||
rotates the token and overwrites the file. Not thread-safe; should
|
||||
only be called from the dashboard's startup path.
|
||||
|
||||
Args:
|
||||
host: The host the dashboard bound to (``127.0.0.1``, ``0.0.0.0``,
|
||||
a specific interface, …). Written verbatim into the discovery
|
||||
file so forwarders dial the right address.
|
||||
port: The port the dashboard bound to.
|
||||
|
||||
Returns:
|
||||
The freshly-generated bearer token (also stored module-locally
|
||||
so :func:`_hook_ingest_auth_ok` can validate POSTs).
|
||||
"""
|
||||
global _HOOKS_INGEST_TOKEN
|
||||
token = secrets.token_urlsafe(32)
|
||||
_HOOKS_INGEST_TOKEN = token
|
||||
|
||||
payload = {
|
||||
"url": f"http://{host}:{port}",
|
||||
"hooks_ingest_token": token,
|
||||
"pid": os.getpid(),
|
||||
"started_at": datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
path = _discovery_path()
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp_path = path.with_suffix(".json.tmp")
|
||||
try:
|
||||
tmp_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
|
||||
# 0600 — owner read+write, nothing for group or other. Same
|
||||
# posture as ~/.hermes/auth.json.
|
||||
os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR)
|
||||
tmp_path.replace(path)
|
||||
except OSError as exc:
|
||||
_log.warning(
|
||||
"[hooks-ingest] failed to write dashboard discovery file at %s: %s",
|
||||
path,
|
||||
exc,
|
||||
)
|
||||
try:
|
||||
tmp_path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
|
||||
_log.debug("[hooks-ingest] wrote %s", path)
|
||||
return token
|
||||
|
||||
|
||||
def remove_dashboard_discovery_file() -> None:
|
||||
"""Delete the discovery file. Idempotent; safe to call from atexit."""
|
||||
global _HOOKS_INGEST_TOKEN
|
||||
try:
|
||||
_discovery_path().unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
_HOOKS_INGEST_TOKEN = ""
|
||||
_log.debug("[hooks-ingest] removed discovery file")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auth helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _hook_ingest_auth_ok(request: Request) -> bool:
|
||||
"""True if the request carries a valid hooks-ingest bearer token.
|
||||
|
||||
The token is independent of the dashboard session token / OAuth
|
||||
cookie; only forwarders that have read ``dashboard.json`` know it.
|
||||
|
||||
Constant-time comparison via :func:`hmac.compare_digest`.
|
||||
"""
|
||||
if not _HOOKS_INGEST_TOKEN:
|
||||
# No token yet — discovery file not written; refuse everything.
|
||||
return False
|
||||
auth = request.headers.get("authorization", "")
|
||||
expected = f"Bearer {_HOOKS_INGEST_TOKEN}"
|
||||
return hmac.compare_digest(auth.encode(), expected.encode())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# FastAPI router
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def build_hook_router() -> APIRouter:
|
||||
"""Return a FastAPI router with ``/health`` and ``/ingest`` endpoints.
|
||||
|
||||
Caller mounts at ``/api/hooks`` to get the documented endpoints
|
||||
(``GET /api/hooks/health`` and ``POST /api/hooks/ingest``).
|
||||
"""
|
||||
router = APIRouter()
|
||||
|
||||
@router.get("/health")
|
||||
async def hook_health() -> dict:
|
||||
"""Unauthenticated reachability probe for the forwarder."""
|
||||
return {"ok": True}
|
||||
|
||||
@router.post("/ingest")
|
||||
async def hook_ingest(request: Request) -> dict:
|
||||
"""Republish a forwarded event on the dashboard's local registry.
|
||||
|
||||
Wire shape (POST body)::
|
||||
|
||||
{
|
||||
"event_type": "agent:start",
|
||||
"context": {"platform": "telegram", "user_id": "u-1", ...},
|
||||
"src": "gateway"
|
||||
}
|
||||
|
||||
The context is republished with ``_forwarded=True`` and
|
||||
``_forwarded_from=<src>`` stamped on it so source-side
|
||||
forwarders skip the event if it ever round-trips back (closing
|
||||
the loop).
|
||||
|
||||
Returns ``{"ok": True}`` on success, raises 401 on missing/bad
|
||||
token, 400 on malformed body.
|
||||
|
||||
``emit_sync`` is the right entry point: handlers run
|
||||
synchronously on the request thread (fast, push-to-queue style
|
||||
for typical plugin handlers); async handlers are scheduled on
|
||||
the running event loop via the existing ``asyncio.ensure_future``
|
||||
path inside ``emit_sync``.
|
||||
"""
|
||||
if not _hook_ingest_auth_ok(request):
|
||||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception as exc:
|
||||
raise HTTPException(
|
||||
status_code=400, detail=f"invalid JSON body: {exc}"
|
||||
) from exc
|
||||
|
||||
if not isinstance(body, dict):
|
||||
raise HTTPException(status_code=400, detail="body must be a JSON object")
|
||||
|
||||
event_type = body.get("event_type")
|
||||
if not isinstance(event_type, str) or not event_type:
|
||||
raise HTTPException(
|
||||
status_code=400, detail="missing or empty 'event_type'"
|
||||
)
|
||||
|
||||
context = body.get("context") or {}
|
||||
if not isinstance(context, dict):
|
||||
raise HTTPException(
|
||||
status_code=400, detail="'context' must be a JSON object"
|
||||
)
|
||||
|
||||
src = body.get("src", "?")
|
||||
if not isinstance(src, str):
|
||||
src = "?"
|
||||
|
||||
# Stamp the context with forwarding metadata so:
|
||||
# 1. Source-side forwarders skip this event if it ever round-trips
|
||||
# back to them (loop prevention).
|
||||
# 2. Subscribers can filter forwarded-vs-original on
|
||||
# ``context["_forwarded"]`` (useful for hooks that fire in
|
||||
# every process and don't want 2x firing).
|
||||
context = {**context, "_forwarded": True, "_forwarded_from": src}
|
||||
|
||||
# Lazy import to avoid a hard dependency on gateway.hooks at
|
||||
# module-load time (web_server can be imported in contexts
|
||||
# where gateway.hooks isn't ready).
|
||||
from gateway.hooks import get_default_registry
|
||||
|
||||
try:
|
||||
get_default_registry().emit_sync(event_type, context)
|
||||
except Exception as exc:
|
||||
# emit_sync swallows handler exceptions internally — if we
|
||||
# see one here it's a registry-level bug. Log it but still
|
||||
# return 200 so the forwarder doesn't retry-storm.
|
||||
_log.warning(
|
||||
"[hooks-ingest] emit_sync raised for %s: %s", event_type, exc
|
||||
)
|
||||
|
||||
return {"ok": True}
|
||||
|
||||
return router
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _reset_for_tests() -> None:
|
||||
"""Clear the cached token + remove any leftover discovery file."""
|
||||
global _HOOKS_INGEST_TOKEN
|
||||
_HOOKS_INGEST_TOKEN = ""
|
||||
remove_dashboard_discovery_file()
|
||||
|
||||
|
||||
def get_current_token_for_tests() -> str:
|
||||
"""Read the currently-active hooks-ingest token.
|
||||
|
||||
Test-only accessor; do not call from production code.
|
||||
"""
|
||||
return _HOOKS_INGEST_TOKEN
|
||||
@@ -120,6 +120,12 @@ _PUBLIC_API_PATHS: frozenset = frozenset({
|
||||
"/api/model/info",
|
||||
"/api/dashboard/themes",
|
||||
"/api/dashboard/plugins",
|
||||
# Cross-process hook delivery — these have their own auth model
|
||||
# (bearer token from dashboard.json, generated fresh each dashboard
|
||||
# startup). The session-token check below is bypassed; auth is
|
||||
# handled inside the route handlers themselves.
|
||||
"/api/hooks/health",
|
||||
"/api/hooks/ingest",
|
||||
})
|
||||
|
||||
|
||||
@@ -4815,6 +4821,13 @@ def _mount_plugin_api_routes():
|
||||
# Mount plugin API routes before the SPA catch-all.
|
||||
_mount_plugin_api_routes()
|
||||
|
||||
# Mount the cross-process hook ingest router (POST /api/hooks/ingest +
|
||||
# GET /api/hooks/health). These endpoints have their own bearer-token
|
||||
# auth (independent of _SESSION_TOKEN / the OAuth gate) — see
|
||||
# hermes_cli/hook_ingest.py and DESIGN-cross-process-hooks.md.
|
||||
from hermes_cli.hook_ingest import build_hook_router as _build_hook_router # noqa: E402
|
||||
app.include_router(_build_hook_router(), prefix="/api/hooks")
|
||||
|
||||
# Mount the dashboard auth routes (/login, /auth/*, /api/auth/*) before the
|
||||
# SPA catch-all so /{full_path:path} doesn't swallow them. These are
|
||||
# always mounted — the gate middleware decides whether to enforce auth,
|
||||
@@ -4910,6 +4923,29 @@ def start_server(
|
||||
app.state.bound_host = host
|
||||
app.state.bound_port = port
|
||||
|
||||
# Cross-process hook delivery: drop the discovery file so source
|
||||
# processes (gateway, TUI, subagents) can find this dashboard and
|
||||
# authenticate POSTs to /api/hooks/ingest. The file is removed via
|
||||
# atexit so a clean shutdown stops orphan forwarders from trying to
|
||||
# POST to a port that's now closed.
|
||||
import atexit
|
||||
from hermes_cli.hook_ingest import (
|
||||
remove_dashboard_discovery_file,
|
||||
write_dashboard_discovery_file,
|
||||
)
|
||||
try:
|
||||
write_dashboard_discovery_file(host, port)
|
||||
atexit.register(remove_dashboard_discovery_file)
|
||||
except OSError as exc:
|
||||
# Non-fatal — the dashboard still works without cross-process
|
||||
# hook delivery. Warn so the operator knows the orb (or any
|
||||
# other event-driven plugin) won't see gateway/TUI events.
|
||||
_log.warning(
|
||||
"Failed to write hook discovery file: %s. Cross-process hook "
|
||||
"delivery to dashboard plugins will be unavailable.",
|
||||
exc,
|
||||
)
|
||||
|
||||
if open_browser:
|
||||
import webbrowser
|
||||
|
||||
|
||||
405
tests/gateway/test_hook_forwarder.py
Normal file
405
tests/gateway/test_hook_forwarder.py
Normal file
@@ -0,0 +1,405 @@
|
||||
"""Unit tests for ``gateway/hook_forwarder.py``.
|
||||
|
||||
Covers:
|
||||
|
||||
* No-op behavior when discovery file absent / env var disables
|
||||
* Idempotent ``start_if_dashboard_available`` (repeat calls re-use same forwarder)
|
||||
* Handler registration on every forwarded namespace
|
||||
* Loop prevention via ``_forwarded`` context flag
|
||||
* Queue overflow drops oldest, enqueues newest
|
||||
* ``stop()`` and ``_reset_for_tests`` are clean and idempotent
|
||||
* Error-log rate limiting
|
||||
|
||||
The HTTP-POST path is exercised in the integration test
|
||||
(``test_hook_forwarder_integration.py``) where a real FastAPI app
|
||||
stands in for the dashboard. Unit tests here focus on the
|
||||
registry-side behaviors that don't need an HTTP server.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway import hook_forwarder
|
||||
from gateway.hooks import HookRegistry
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_forwarder():
|
||||
"""Reset forwarder state + env between tests so they're independent."""
|
||||
hook_forwarder._reset_for_tests()
|
||||
# Make sure HERMES_HOOK_FORWARDER isn't sticky from a previous test.
|
||||
saved = os.environ.pop("HERMES_HOOK_FORWARDER", None)
|
||||
yield
|
||||
hook_forwarder._reset_for_tests()
|
||||
if saved is not None:
|
||||
os.environ["HERMES_HOOK_FORWARDER"] = saved
|
||||
|
||||
|
||||
def _write_discovery(tmp_path: Path, *, url: str = "http://127.0.0.1:9119") -> Path:
|
||||
"""Write a valid dashboard.json into tmp_path and return its path."""
|
||||
discovery = tmp_path / "dashboard.json"
|
||||
discovery.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"url": url,
|
||||
"hooks_ingest_token": "test-token-abc",
|
||||
"pid": 99999,
|
||||
"started_at": "2026-05-29T12:00:00Z",
|
||||
}
|
||||
)
|
||||
)
|
||||
return discovery
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# No-op paths
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestStartIfDashboardAvailable:
|
||||
def test_no_dashboard_json_returns_none(self, tmp_path, monkeypatch):
|
||||
"""Missing discovery file ⇒ forwarder doesn't start."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
reg = HookRegistry()
|
||||
|
||||
result = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
|
||||
assert result is None
|
||||
assert not hook_forwarder.is_active()
|
||||
# No handlers registered on the registry.
|
||||
assert reg._handlers == {}
|
||||
|
||||
def test_env_disabled_returns_none_even_with_dashboard(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
"""``HERMES_HOOK_FORWARDER=0`` short-circuits even when discovery
|
||||
is otherwise valid."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOOK_FORWARDER", "0")
|
||||
reg = HookRegistry()
|
||||
|
||||
result = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
|
||||
assert result is None
|
||||
assert reg._handlers == {}
|
||||
|
||||
@pytest.mark.parametrize("value", ["0", "false", "FALSE", "No", "off"])
|
||||
def test_env_disabled_accepts_common_falsy_values(
|
||||
self, tmp_path, monkeypatch, value
|
||||
):
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOOK_FORWARDER", value)
|
||||
reg = HookRegistry()
|
||||
|
||||
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
|
||||
|
||||
def test_malformed_discovery_json_returns_none(self, tmp_path, monkeypatch):
|
||||
"""Garbage in ``dashboard.json`` ⇒ forwarder doesn't start."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
(tmp_path / "dashboard.json").write_text("{not json")
|
||||
reg = HookRegistry()
|
||||
|
||||
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
|
||||
|
||||
def test_discovery_missing_required_keys_returns_none(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
"""``dashboard.json`` without ``url`` or ``hooks_ingest_token`` ⇒
|
||||
no forwarder."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
(tmp_path / "dashboard.json").write_text(
|
||||
json.dumps({"pid": 123}) # missing url + token
|
||||
)
|
||||
reg = HookRegistry()
|
||||
|
||||
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
|
||||
|
||||
def test_repeated_calls_are_idempotent(self, tmp_path, monkeypatch):
|
||||
"""Two ``start_if_dashboard_available`` in the same process re-use
|
||||
the same forwarder instance and don't double-register handlers."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
reg = HookRegistry()
|
||||
|
||||
first = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
try:
|
||||
second = hook_forwarder.start_if_dashboard_available(reg, src="tui")
|
||||
|
||||
assert first is not None
|
||||
assert second is first # same instance
|
||||
# And handlers registered exactly once per namespace.
|
||||
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
|
||||
assert len(reg._handlers[pattern]) == 1
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registration coverage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegistration:
|
||||
def test_registers_every_forwarded_namespace(self, tmp_path, monkeypatch):
|
||||
"""Forwarder subscribes to all five canonical namespaces."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
reg = HookRegistry()
|
||||
|
||||
try:
|
||||
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
for pattern in (
|
||||
"tui:*",
|
||||
"agent:*",
|
||||
"session:*",
|
||||
"command:*",
|
||||
"gateway:*",
|
||||
):
|
||||
assert pattern in reg._handlers
|
||||
assert len(reg._handlers[pattern]) == 1
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
def test_stop_unregisters_all_handlers(self, tmp_path, monkeypatch):
|
||||
"""``stop()`` removes every handler the forwarder installed."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
reg = HookRegistry()
|
||||
|
||||
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
|
||||
assert pattern in reg._handlers
|
||||
|
||||
hook_forwarder.stop()
|
||||
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
|
||||
assert reg._handlers.get(pattern, []) == []
|
||||
|
||||
def test_stop_is_idempotent(self, tmp_path, monkeypatch):
|
||||
"""Calling ``stop()`` twice doesn't raise."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
reg = HookRegistry()
|
||||
|
||||
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
|
||||
hook_forwarder.stop()
|
||||
hook_forwarder.stop() # second call is no-op
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Handler behavior — loop prevention + queue management
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHandlerBehavior:
|
||||
"""Tests the handler in isolation — skipping the worker thread by
|
||||
constructing the ``_HookForwarder`` directly so we can inspect the
|
||||
queue without racing on a daemon thread."""
|
||||
|
||||
def test_handler_enqueues_event(self):
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
fwd._handler("agent:start", {"platform": "telegram", "user_id": "u-1"})
|
||||
|
||||
assert fwd._queue.qsize() == 1
|
||||
frame = fwd._queue.get_nowait()
|
||||
assert frame == {
|
||||
"event_type": "agent:start",
|
||||
"context": {"platform": "telegram", "user_id": "u-1"},
|
||||
"src": "gateway",
|
||||
}
|
||||
|
||||
def test_handler_skips_forwarded_events(self):
|
||||
"""The ``_forwarded=True`` flag closes the source ↔ dashboard loop."""
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
|
||||
# This came from the ingest endpoint; must not be shipped back.
|
||||
fwd._handler(
|
||||
"agent:start",
|
||||
{"platform": "telegram", "_forwarded": True, "_forwarded_from": "gateway"},
|
||||
)
|
||||
|
||||
assert fwd._queue.qsize() == 0
|
||||
|
||||
def test_handler_does_not_skip_explicit_false_forwarded(self):
|
||||
"""Only the literal ``True`` triggers loop prevention; a ``False``
|
||||
or absent flag is fine."""
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
|
||||
fwd._handler("agent:start", {"_forwarded": False})
|
||||
fwd._handler("agent:start", {"_forwarded": None})
|
||||
|
||||
assert fwd._queue.qsize() == 2
|
||||
|
||||
def test_handler_drops_oldest_on_queue_full(self):
|
||||
"""At ``_QUEUE_MAX`` capacity, oldest frame is evicted to make
|
||||
room for newest."""
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
# Replace the queue with a tiny one so we can exercise overflow
|
||||
# without filling 1024 slots.
|
||||
from queue import Queue as _Queue
|
||||
|
||||
fwd._queue = _Queue(maxsize=3)
|
||||
|
||||
for i in range(5):
|
||||
fwd._handler("agent:step", {"iteration": i})
|
||||
|
||||
# Only the last 3 should survive (iterations 2, 3, 4).
|
||||
iterations = []
|
||||
while not fwd._queue.empty():
|
||||
iterations.append(fwd._queue.get_nowait()["context"]["iteration"])
|
||||
assert iterations == [2, 3, 4]
|
||||
|
||||
def test_handler_never_raises_on_pathological_queue(self):
|
||||
"""Even if the queue is wedged (both put_nowait calls fail), the
|
||||
handler returns cleanly — never propagates to the publisher."""
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
from queue import Queue as _Queue, Full as _Full
|
||||
|
||||
# Build a queue stub whose put_nowait always raises Full.
|
||||
class _StubbedFull(_Queue):
|
||||
def __init__(self):
|
||||
super().__init__(maxsize=1)
|
||||
|
||||
def put_nowait(self, item):
|
||||
raise _Full()
|
||||
|
||||
def get_nowait(self):
|
||||
from queue import Empty
|
||||
raise Empty()
|
||||
|
||||
fwd._queue = _StubbedFull() # type: ignore[assignment]
|
||||
|
||||
# Should not raise.
|
||||
fwd._handler("agent:step", {"iteration": 1})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Discovery refresh / probe behavior — exercised without starting the worker
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDiscoveryRefresh:
|
||||
def test_refresh_picks_up_new_token(self, tmp_path, monkeypatch):
|
||||
"""A dashboard restart writes a new token; the next probe must
|
||||
pick it up."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
discovery_path = tmp_path / "dashboard.json"
|
||||
discovery_path.write_text(
|
||||
json.dumps({"url": "http://127.0.0.1:9119", "hooks_ingest_token": "first"})
|
||||
)
|
||||
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
|
||||
# Stub the http client's get() to always return 200 — the
|
||||
# discovery file is what we want to test here, not the probe.
|
||||
class _Stub200:
|
||||
status_code = 200
|
||||
|
||||
class _StubClient:
|
||||
def get(self, *a, **kw):
|
||||
return _Stub200()
|
||||
|
||||
fwd._refresh_discovery(_StubClient())
|
||||
assert fwd._discovery is not None
|
||||
assert fwd._discovery["hooks_ingest_token"] == "first"
|
||||
|
||||
# Now the dashboard "restarts" with a new token.
|
||||
discovery_path.write_text(
|
||||
json.dumps({"url": "http://127.0.0.1:9119", "hooks_ingest_token": "second"})
|
||||
)
|
||||
fwd._refresh_discovery(_StubClient())
|
||||
assert fwd._discovery["hooks_ingest_token"] == "second"
|
||||
|
||||
def test_refresh_invalidates_on_probe_failure(self, tmp_path, monkeypatch):
|
||||
"""If the health probe fails, discovery is cleared even though
|
||||
the file still exists."""
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
# First probe succeeds.
|
||||
class _Stub200:
|
||||
status_code = 200
|
||||
|
||||
class _OkClient:
|
||||
def get(self, *a, **kw):
|
||||
return _Stub200()
|
||||
|
||||
fwd._refresh_discovery(_OkClient())
|
||||
assert fwd._discovery is not None
|
||||
|
||||
# Now make the probe fail — connection refused, dashboard gone.
|
||||
class _FailingClient:
|
||||
def get(self, *a, **kw):
|
||||
raise ConnectionError("refused")
|
||||
|
||||
fwd._refresh_discovery(_FailingClient())
|
||||
assert fwd._discovery is None
|
||||
|
||||
def test_refresh_invalidates_on_non_200_response(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
_write_discovery(tmp_path)
|
||||
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
|
||||
class _Stub503:
|
||||
status_code = 503
|
||||
|
||||
class _BadClient:
|
||||
def get(self, *a, **kw):
|
||||
return _Stub503()
|
||||
|
||||
fwd._refresh_discovery(_BadClient())
|
||||
assert fwd._discovery is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Error-log rate limiting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestErrorRateLimit:
|
||||
def test_first_error_logs_immediately(self, caplog):
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
|
||||
fwd._log_error("first error")
|
||||
|
||||
assert any("first error" in r.message for r in caplog.records)
|
||||
|
||||
def test_subsequent_errors_suppressed_until_window_elapses(self, caplog):
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
|
||||
|
||||
fwd._log_error("first error")
|
||||
before = len(caplog.records)
|
||||
|
||||
# Many follow-up errors within the same minute — all suppressed.
|
||||
for _ in range(10):
|
||||
fwd._log_error("nth error")
|
||||
|
||||
assert len(caplog.records) == before
|
||||
|
||||
def test_suppression_count_surfaces_on_next_log(self, caplog, monkeypatch):
|
||||
fwd = hook_forwarder._HookForwarder(src="gateway")
|
||||
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
|
||||
|
||||
fwd._log_error("first error")
|
||||
for _ in range(5):
|
||||
fwd._log_error("suppressed")
|
||||
|
||||
# Simulate the rate-limit window elapsing.
|
||||
fwd._last_error_log_at = time.monotonic() - hook_forwarder._ERROR_LOG_INTERVAL_S - 1
|
||||
fwd._log_error("after window")
|
||||
|
||||
# Last record should include the "5 similar errors suppressed" suffix.
|
||||
last = caplog.records[-1].message
|
||||
assert "after window" in last
|
||||
assert "5 similar errors suppressed" in last
|
||||
@@ -316,3 +316,256 @@ class TestEmitCollect:
|
||||
await reg.emit_collect("agent:start") # no context arg
|
||||
|
||||
assert captured == [("agent:start", {})]
|
||||
|
||||
|
||||
class TestRegister:
|
||||
"""Tests for the programmatic ``HookRegistry.register`` API."""
|
||||
|
||||
def test_registers_handler(self):
|
||||
reg = HookRegistry()
|
||||
calls: list = []
|
||||
|
||||
def handler(event_type, context):
|
||||
calls.append((event_type, context))
|
||||
|
||||
reg.register("agent:start", handler)
|
||||
|
||||
assert "agent:start" in reg._handlers
|
||||
assert reg._handlers["agent:start"] == [handler]
|
||||
|
||||
def test_records_metadata_in_loaded_hooks(self):
|
||||
reg = HookRegistry()
|
||||
|
||||
def my_handler(_e, _c):
|
||||
return None
|
||||
|
||||
reg.register("tui:tool.start", my_handler)
|
||||
|
||||
assert len(reg.loaded_hooks) == 1
|
||||
meta = reg.loaded_hooks[0]
|
||||
assert meta["name"] == "my_handler"
|
||||
assert meta["events"] == ["tui:tool.start"]
|
||||
assert meta["path"] == "<programmatic>"
|
||||
|
||||
def test_custom_name_override(self):
|
||||
reg = HookRegistry()
|
||||
|
||||
reg.register("agent:end", lambda _e, _c: None, name="orb-collector")
|
||||
|
||||
assert reg.loaded_hooks[0]["name"] == "orb-collector"
|
||||
|
||||
def test_returns_working_unregister(self):
|
||||
reg = HookRegistry()
|
||||
|
||||
def handler(_e, _c):
|
||||
return None
|
||||
|
||||
unregister = reg.register("agent:start", handler)
|
||||
|
||||
assert handler in reg._handlers["agent:start"]
|
||||
assert len(reg.loaded_hooks) == 1
|
||||
|
||||
unregister()
|
||||
|
||||
assert handler not in reg._handlers["agent:start"]
|
||||
assert len(reg.loaded_hooks) == 0
|
||||
|
||||
def test_unregister_is_idempotent(self):
|
||||
reg = HookRegistry()
|
||||
unregister = reg.register("agent:start", lambda _e, _c: None)
|
||||
unregister()
|
||||
# Second call should not raise.
|
||||
unregister()
|
||||
|
||||
def test_multiple_handlers_same_event(self):
|
||||
reg = HookRegistry()
|
||||
calls: list = []
|
||||
|
||||
def h1(_e, _c):
|
||||
calls.append("h1")
|
||||
|
||||
def h2(_e, _c):
|
||||
calls.append("h2")
|
||||
|
||||
reg.register("agent:start", h1)
|
||||
reg.register("agent:start", h2)
|
||||
|
||||
assert reg._handlers["agent:start"] == [h1, h2]
|
||||
assert len(reg.loaded_hooks) == 2
|
||||
|
||||
def test_unregister_does_not_affect_other_handlers(self):
|
||||
reg = HookRegistry()
|
||||
|
||||
def h1(_e, _c):
|
||||
return None
|
||||
|
||||
def h2(_e, _c):
|
||||
return None
|
||||
|
||||
unreg1 = reg.register("agent:start", h1)
|
||||
reg.register("agent:start", h2)
|
||||
|
||||
unreg1()
|
||||
|
||||
assert h1 not in reg._handlers["agent:start"]
|
||||
assert h2 in reg._handlers["agent:start"]
|
||||
|
||||
|
||||
class TestEmitSync:
|
||||
"""Tests for the synchronous emit path used from hot non-async callers."""
|
||||
|
||||
def test_fires_sync_handler(self):
|
||||
reg = HookRegistry()
|
||||
calls: list = []
|
||||
|
||||
reg.register(
|
||||
"tui:tool.start",
|
||||
lambda e, c: calls.append((e, c)),
|
||||
)
|
||||
|
||||
reg.emit_sync("tui:tool.start", {"session_id": "s1", "payload": {"name": "foo"}})
|
||||
|
||||
assert calls == [("tui:tool.start", {"session_id": "s1", "payload": {"name": "foo"}})]
|
||||
|
||||
def test_default_context_when_none(self):
|
||||
reg = HookRegistry()
|
||||
seen: list = []
|
||||
reg.register("evt:x", lambda _e, c: seen.append(c))
|
||||
|
||||
reg.emit_sync("evt:x") # no context arg
|
||||
|
||||
assert seen == [{}]
|
||||
|
||||
def test_sync_handler_exception_isolated(self):
|
||||
reg = HookRegistry()
|
||||
calls: list = []
|
||||
|
||||
def bad(_e, _c):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
def good(_e, _c):
|
||||
calls.append("good")
|
||||
|
||||
reg.register("evt:x", bad)
|
||||
reg.register("evt:x", good)
|
||||
|
||||
# Must not raise; second handler still fires.
|
||||
reg.emit_sync("evt:x", {})
|
||||
|
||||
assert calls == ["good"]
|
||||
|
||||
def test_wildcard_matching(self):
|
||||
reg = HookRegistry()
|
||||
calls: list = []
|
||||
|
||||
reg.register("tui:*", lambda e, _c: calls.append(e))
|
||||
reg.register("tui:tool.start", lambda e, _c: calls.append(f"exact:{e}"))
|
||||
|
||||
reg.emit_sync("tui:tool.start", {})
|
||||
|
||||
# Exact match first, then wildcard.
|
||||
assert calls == ["exact:tui:tool.start", "tui:tool.start"]
|
||||
|
||||
def test_no_handlers_does_not_raise(self):
|
||||
reg = HookRegistry()
|
||||
# Just shouldn't blow up.
|
||||
reg.emit_sync("nobody:listening", {"foo": "bar"})
|
||||
|
||||
def test_async_handler_skipped_with_no_loop(self, capsys):
|
||||
from gateway.hooks import _reset_default_registry_for_tests
|
||||
|
||||
_reset_default_registry_for_tests()
|
||||
reg = HookRegistry()
|
||||
marker: list = []
|
||||
|
||||
async def async_handler(_e, _c):
|
||||
marker.append("ran")
|
||||
|
||||
reg.register("evt:x", async_handler, name="async_handler_unique")
|
||||
|
||||
# First emit logs a warning and skips.
|
||||
reg.emit_sync("evt:x", {})
|
||||
captured = capsys.readouterr()
|
||||
# The warning uses the handler's __name__ for diagnostic clarity.
|
||||
assert "async_handler" in captured.out
|
||||
assert "Skipping async handler" in captured.out
|
||||
assert marker == [] # async handler never ran
|
||||
|
||||
# Second emit is silent (warning suppressed).
|
||||
reg.emit_sync("evt:x", {})
|
||||
captured = capsys.readouterr()
|
||||
assert captured.out == ""
|
||||
assert marker == []
|
||||
|
||||
def test_async_handler_scheduled_when_loop_running(self):
|
||||
import asyncio as _asyncio
|
||||
|
||||
reg = HookRegistry()
|
||||
marker: list = []
|
||||
|
||||
async def async_handler(_e, _c):
|
||||
marker.append("ran")
|
||||
|
||||
reg.register("evt:x", async_handler)
|
||||
|
||||
async def driver():
|
||||
reg.emit_sync("evt:x", {})
|
||||
# Yield to the loop so the scheduled task can run.
|
||||
await _asyncio.sleep(0)
|
||||
await _asyncio.sleep(0)
|
||||
|
||||
_asyncio.run(driver())
|
||||
|
||||
assert marker == ["ran"]
|
||||
|
||||
|
||||
class TestDefaultRegistry:
|
||||
"""Tests for the module-level default-registry singleton."""
|
||||
|
||||
def test_get_default_returns_same_instance(self):
|
||||
from gateway.hooks import (
|
||||
_reset_default_registry_for_tests,
|
||||
get_default_registry,
|
||||
)
|
||||
|
||||
_reset_default_registry_for_tests()
|
||||
|
||||
first = get_default_registry()
|
||||
second = get_default_registry()
|
||||
|
||||
assert first is second
|
||||
|
||||
def test_install_as_default_replaces(self):
|
||||
from gateway.hooks import (
|
||||
_reset_default_registry_for_tests,
|
||||
get_default_registry,
|
||||
install_as_default,
|
||||
)
|
||||
|
||||
_reset_default_registry_for_tests()
|
||||
|
||||
custom = HookRegistry()
|
||||
install_as_default(custom)
|
||||
|
||||
assert get_default_registry() is custom
|
||||
|
||||
def test_install_then_get_picks_up_handlers(self):
|
||||
from gateway.hooks import (
|
||||
_reset_default_registry_for_tests,
|
||||
get_default_registry,
|
||||
install_as_default,
|
||||
)
|
||||
|
||||
_reset_default_registry_for_tests()
|
||||
|
||||
custom = HookRegistry()
|
||||
install_as_default(custom)
|
||||
|
||||
calls: list = []
|
||||
get_default_registry().register("agent:x", lambda _e, _c: calls.append("hit"))
|
||||
|
||||
# Same handler is visible on the installed instance.
|
||||
custom.emit_sync("agent:x", {})
|
||||
|
||||
assert calls == ["hit"]
|
||||
|
||||
|
||||
406
tests/hermes_cli/test_hook_ingest_endpoint.py
Normal file
406
tests/hermes_cli/test_hook_ingest_endpoint.py
Normal file
@@ -0,0 +1,406 @@
|
||||
"""Unit tests for ``hermes_cli/hook_ingest.py``.
|
||||
|
||||
Covers:
|
||||
|
||||
* Discovery file write/read/remove lifecycle
|
||||
* File permissions (0600) and atomic replace semantics
|
||||
* Auth gating on the ingest endpoint (401 without token, 200 with)
|
||||
* Body validation (400 on bad shape, missing keys, non-dict context)
|
||||
* Forwarded events are stamped with ``_forwarded=True`` and
|
||||
``_forwarded_from=<src>`` and republished via ``emit_sync``
|
||||
* Health endpoint returns 200 unconditionally and is unauthenticated
|
||||
* `--insecure` mode does not disable the ingest endpoint (the bearer
|
||||
token is the security boundary regardless of bind address)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from gateway.hooks import (
|
||||
HookRegistry,
|
||||
_reset_default_registry_for_tests,
|
||||
get_default_registry,
|
||||
install_as_default,
|
||||
)
|
||||
from hermes_cli import hook_ingest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolate(tmp_path, monkeypatch):
|
||||
"""Each test gets a fresh hermes home + fresh registry singleton +
|
||||
fresh ingest-token state. Without this the module-level
|
||||
``_HOOKS_INGEST_TOKEN`` from one test would leak into the next."""
|
||||
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: tmp_path)
|
||||
hook_ingest._reset_for_tests()
|
||||
_reset_default_registry_for_tests()
|
||||
yield
|
||||
hook_ingest._reset_for_tests()
|
||||
_reset_default_registry_for_tests()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_registry():
|
||||
"""Install a clean ``HookRegistry`` as the process default and return it."""
|
||||
reg = HookRegistry()
|
||||
install_as_default(reg)
|
||||
return reg
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hook_app(fresh_registry):
|
||||
"""A FastAPI app with the hook router mounted at /api/hooks."""
|
||||
app = FastAPI()
|
||||
app.include_router(hook_ingest.build_hook_router(), prefix="/api/hooks")
|
||||
return app
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client(hook_app):
|
||||
return TestClient(hook_app)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Discovery file lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDiscoveryFile:
|
||||
def test_write_creates_file_with_expected_shape(self, tmp_path):
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
path = tmp_path / "dashboard.json"
|
||||
assert path.exists()
|
||||
|
||||
data = json.loads(path.read_text())
|
||||
assert data["url"] == "http://127.0.0.1:9119"
|
||||
assert data["hooks_ingest_token"] == token
|
||||
assert isinstance(data["pid"], int)
|
||||
assert "started_at" in data
|
||||
# ISO-8601 timestamp.
|
||||
assert "T" in data["started_at"]
|
||||
|
||||
def test_write_returns_fresh_token_each_call(self, tmp_path):
|
||||
first = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
second = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
assert first != second # 32-byte urlsafe tokens collide ~never
|
||||
|
||||
def test_write_records_non_loopback_bind(self, tmp_path):
|
||||
"""``--insecure`` mode binds non-loopback; the discovery file
|
||||
must reflect that so forwarders dial the right address."""
|
||||
hook_ingest.write_dashboard_discovery_file("0.0.0.0", 9119)
|
||||
data = json.loads((tmp_path / "dashboard.json").read_text())
|
||||
assert data["url"] == "http://0.0.0.0:9119"
|
||||
|
||||
def test_write_creates_parent_dir_if_missing(self, monkeypatch, tmp_path):
|
||||
"""``$HERMES_HOME`` might not exist yet on first startup."""
|
||||
nested = tmp_path / "new" / "hermes"
|
||||
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: nested)
|
||||
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
assert (nested / "dashboard.json").exists()
|
||||
|
||||
def test_file_mode_is_0600(self, tmp_path):
|
||||
"""Discovery file holds a bearer token in cleartext; must be
|
||||
owner-only readable."""
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
path = tmp_path / "dashboard.json"
|
||||
mode = stat.S_IMODE(path.stat().st_mode)
|
||||
# 0o600 = read+write for owner, nothing for group/other.
|
||||
assert mode == 0o600, f"expected 0600, got {oct(mode)}"
|
||||
|
||||
def test_remove_deletes_file(self, tmp_path):
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
assert (tmp_path / "dashboard.json").exists()
|
||||
|
||||
hook_ingest.remove_dashboard_discovery_file()
|
||||
assert not (tmp_path / "dashboard.json").exists()
|
||||
|
||||
def test_remove_is_idempotent(self):
|
||||
# Never written.
|
||||
hook_ingest.remove_dashboard_discovery_file()
|
||||
# And again — must not raise.
|
||||
hook_ingest.remove_dashboard_discovery_file()
|
||||
|
||||
def test_remove_clears_in_memory_token(self):
|
||||
"""Once removed, no one can authenticate to ingest anymore."""
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
assert hook_ingest.get_current_token_for_tests() != ""
|
||||
|
||||
hook_ingest.remove_dashboard_discovery_file()
|
||||
assert hook_ingest.get_current_token_for_tests() == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /health endpoint
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestHealthEndpoint:
|
||||
def test_health_returns_200_unauthenticated(self, client):
|
||||
r = client.get("/api/hooks/health")
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"ok": True}
|
||||
|
||||
def test_health_returns_200_with_no_token_set(self, client):
|
||||
"""Health probe must succeed even before any token has been
|
||||
written (forwarder probes before discovery is established)."""
|
||||
# No write_dashboard_discovery_file call — token stays "".
|
||||
r = client.get("/api/hooks/health")
|
||||
assert r.status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /ingest endpoint — auth
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIngestAuth:
|
||||
def test_ingest_401_without_token(self, client):
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
del token # noqa: F841 — we intentionally don't send it.
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_ingest_401_with_wrong_token(self, client):
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
|
||||
headers={"Authorization": "Bearer wrong"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_ingest_401_when_no_token_set(self, client):
|
||||
"""Discovery file never written ⇒ ingest refuses everything."""
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
|
||||
headers={"Authorization": "Bearer something"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
|
||||
def test_ingest_200_with_valid_token(self, client):
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json() == {"ok": True}
|
||||
|
||||
def test_ingest_works_for_non_loopback_bind(self, client):
|
||||
"""``--insecure`` doesn't disable ingest. The bearer token IS
|
||||
the security boundary regardless of bind address (see
|
||||
DESIGN-cross-process-hooks.md "Bind-address independence")."""
|
||||
token = hook_ingest.write_dashboard_discovery_file("0.0.0.0", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /ingest endpoint — body validation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIngestBodyValidation:
|
||||
@pytest.fixture
|
||||
def auth_headers(self):
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
def test_400_on_non_json_body(self, client, auth_headers):
|
||||
# FastAPI itself produces 422 for parse failures, but we explicitly
|
||||
# request JSON parsing so the response should be 400 with our
|
||||
# message — actually, request.json() on bad JSON raises and we
|
||||
# convert it to 400.
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
content=b"not json",
|
||||
headers={**auth_headers, "Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_400_on_non_dict_body(self, client, auth_headers):
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json=["this", "is", "an", "array"],
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
assert "object" in r.json()["detail"]
|
||||
|
||||
def test_400_on_missing_event_type(self, client, auth_headers):
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"context": {}, "src": "gateway"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
assert "event_type" in r.json()["detail"]
|
||||
|
||||
def test_400_on_empty_event_type(self, client, auth_headers):
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "", "context": {}, "src": "gateway"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_400_on_non_dict_context(self, client, auth_headers):
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": "string", "src": "gateway"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 400
|
||||
|
||||
def test_200_with_missing_optional_fields(self, client, auth_headers):
|
||||
"""src and context are optional; default to "?" and {}."""
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start"},
|
||||
headers=auth_headers,
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# /ingest endpoint — republish behavior
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIngestRepublish:
|
||||
def test_republishes_via_emit_sync(self, client, fresh_registry):
|
||||
captured: list = []
|
||||
fresh_registry.register("agent:start", lambda e, c: captured.append((e, c)))
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={
|
||||
"event_type": "agent:start",
|
||||
"context": {"platform": "telegram", "user_id": "u-1"},
|
||||
"src": "gateway",
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert len(captured) == 1
|
||||
|
||||
event, ctx = captured[0]
|
||||
assert event == "agent:start"
|
||||
# Original context keys preserved.
|
||||
assert ctx["platform"] == "telegram"
|
||||
assert ctx["user_id"] == "u-1"
|
||||
# Forwarding metadata stamped.
|
||||
assert ctx["_forwarded"] is True
|
||||
assert ctx["_forwarded_from"] == "gateway"
|
||||
|
||||
def test_forwarded_stamp_overrides_caller_provided_value(
|
||||
self, client, fresh_registry
|
||||
):
|
||||
"""If a malicious/buggy caller tries to set _forwarded=False to
|
||||
smuggle the event past loop prevention, the endpoint overrides
|
||||
it. This isn't a security boundary (the auth gate is) but a
|
||||
defensive sanity check."""
|
||||
captured: list = []
|
||||
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={
|
||||
"event_type": "agent:start",
|
||||
"context": {"_forwarded": False},
|
||||
"src": "gateway",
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
|
||||
assert captured[0]["_forwarded"] is True
|
||||
|
||||
def test_wildcard_handlers_see_forwarded_events(self, client, fresh_registry):
|
||||
"""A subscriber to ``tui:*`` sees forwarded ``tui:tool.start`` events."""
|
||||
captured: list = []
|
||||
fresh_registry.register("tui:*", lambda e, c: captured.append(e))
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={
|
||||
"event_type": "tui:tool.start",
|
||||
"context": {"session_id": "s-1", "payload": {"name": "search_files"}},
|
||||
"src": "tui",
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
|
||||
assert captured == ["tui:tool.start"]
|
||||
|
||||
def test_src_defaults_to_question_mark(self, client, fresh_registry):
|
||||
captured: list = []
|
||||
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
|
||||
assert captured[0]["_forwarded_from"] == "?"
|
||||
|
||||
def test_non_string_src_is_normalized(self, client, fresh_registry):
|
||||
"""If something weird sends src=123, we don't propagate the bad type."""
|
||||
captured: list = []
|
||||
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:start", "context": {}, "src": 123},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert captured[0]["_forwarded_from"] == "?"
|
||||
|
||||
def test_handler_exception_does_not_break_ingest(self, client, fresh_registry):
|
||||
"""A buggy subscriber raising in emit_sync must not 500 the
|
||||
ingest endpoint — emit_sync swallows handler exceptions, but
|
||||
the route also has a top-level try/except defensive layer."""
|
||||
fresh_registry.register("agent:end", lambda _e, _c: 1 / 0)
|
||||
|
||||
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
|
||||
|
||||
r = client.post(
|
||||
"/api/hooks/ingest",
|
||||
json={"event_type": "agent:end", "context": {}, "src": "gateway"},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
319
tests/test_hook_forwarder_integration.py
Normal file
319
tests/test_hook_forwarder_integration.py
Normal file
@@ -0,0 +1,319 @@
|
||||
"""End-to-end integration test for cross-process hook delivery.
|
||||
|
||||
Wires together (in the SAME process to avoid subprocess complexity, but
|
||||
end-to-end through real HTTP + a real FastAPI app):
|
||||
|
||||
Source HookRegistry ──forwarder──HTTP──> /api/hooks/ingest ──> Dashboard HookRegistry
|
||||
│
|
||||
└─> handler fires
|
||||
|
||||
This validates the wire-format contract between the forwarder and the
|
||||
ingest endpoint: anything the forwarder produces must be accepted and
|
||||
republished correctly by the ingest endpoint.
|
||||
|
||||
The test runs uvicorn in a daemon thread so we have a real OS socket
|
||||
forwarders can POST to. ``dashboard.json`` is written into a temp
|
||||
$HERMES_HOME so the forwarder finds the test server instead of any
|
||||
real running dashboard on the dev box.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
import pytest
|
||||
import uvicorn
|
||||
from fastapi import FastAPI
|
||||
|
||||
from gateway import hook_forwarder
|
||||
from gateway.hooks import (
|
||||
HookRegistry,
|
||||
_reset_default_registry_for_tests,
|
||||
install_as_default,
|
||||
)
|
||||
from hermes_cli import hook_ingest
|
||||
|
||||
|
||||
def _free_port() -> int:
|
||||
"""Find an unused TCP port on loopback by binding port 0."""
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
s.bind(("127.0.0.1", 0))
|
||||
return s.getsockname()[1]
|
||||
|
||||
|
||||
def _wait_until(predicate: Callable[[], bool], *, timeout: float = 5.0) -> bool:
|
||||
"""Poll a predicate until it returns True or the timeout elapses.
|
||||
|
||||
Cheaper than ``time.sleep`` calls littered through the test body;
|
||||
keeps test runtime down when the system is fast and bounded when
|
||||
it's slow.
|
||||
"""
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
if predicate():
|
||||
return True
|
||||
time.sleep(0.02)
|
||||
return False
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def hermes_home(tmp_path, monkeypatch):
|
||||
"""Each test gets a fresh $HERMES_HOME so ``dashboard.json`` writes
|
||||
don't bleed across tests (or into the real dev box's home)."""
|
||||
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: tmp_path)
|
||||
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
|
||||
hook_ingest._reset_for_tests()
|
||||
hook_forwarder._reset_for_tests()
|
||||
yield tmp_path
|
||||
hook_ingest._reset_for_tests()
|
||||
hook_forwarder._reset_for_tests()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def dashboard_server(hermes_home):
|
||||
"""Start a real uvicorn server with the hook router mounted, write a
|
||||
matching ``dashboard.json``, and tear it all down on test exit."""
|
||||
# Build the dashboard's FastAPI app. Just the hook router — no need
|
||||
# to load the full web_server which would pull in the SPA build,
|
||||
# auth providers, etc.
|
||||
app = FastAPI()
|
||||
app.include_router(hook_ingest.build_hook_router(), prefix="/api/hooks")
|
||||
|
||||
# Install a fresh default registry on the (test) dashboard side.
|
||||
# This is what the ingest endpoint republishes events onto.
|
||||
_reset_default_registry_for_tests()
|
||||
dashboard_reg = HookRegistry()
|
||||
install_as_default(dashboard_reg)
|
||||
|
||||
port = _free_port()
|
||||
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
|
||||
server = uvicorn.Server(config)
|
||||
|
||||
server_thread = threading.Thread(target=server.run, daemon=True)
|
||||
server_thread.start()
|
||||
|
||||
# Wait for the server to start accepting connections.
|
||||
if not _wait_until(lambda: server.started, timeout=5.0):
|
||||
raise RuntimeError("uvicorn failed to start within 5s")
|
||||
|
||||
# Drop the discovery file pointing at our test server.
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
|
||||
|
||||
yield {"app": app, "port": port, "registry": dashboard_reg}
|
||||
|
||||
# Teardown: gracefully shut down the server.
|
||||
server.should_exit = True
|
||||
server_thread.join(timeout=5.0)
|
||||
_reset_default_registry_for_tests()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end: source → forwarder → HTTP → ingest → dashboard registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_end_to_end_event_delivery(dashboard_server, hermes_home):
|
||||
"""A single fired event in the source registry reaches a subscriber
|
||||
on the dashboard registry via real HTTP."""
|
||||
captured: list = []
|
||||
dashboard_server["registry"].register(
|
||||
"agent:start", lambda e, c: captured.append((e, c))
|
||||
)
|
||||
|
||||
# Source-side registry — simulates the gateway process.
|
||||
source_reg = HookRegistry()
|
||||
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
|
||||
try:
|
||||
source_reg.emit_sync(
|
||||
"agent:start",
|
||||
{"platform": "telegram", "user_id": "u-1", "session_id": "s-1"},
|
||||
)
|
||||
|
||||
# Wait for the daemon worker thread to flush the queue + POST.
|
||||
assert _wait_until(lambda: len(captured) >= 1, timeout=10.0), (
|
||||
"Event never reached the dashboard subscriber"
|
||||
)
|
||||
|
||||
event, ctx = captured[0]
|
||||
assert event == "agent:start"
|
||||
# Original context survived the round trip.
|
||||
assert ctx["platform"] == "telegram"
|
||||
assert ctx["user_id"] == "u-1"
|
||||
assert ctx["session_id"] == "s-1"
|
||||
# And the ingest endpoint stamped forwarding metadata.
|
||||
assert ctx["_forwarded"] is True
|
||||
assert ctx["_forwarded_from"] == "gateway"
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
|
||||
def test_multiple_namespaces_round_trip(dashboard_server, hermes_home):
|
||||
"""The forwarder covers every namespace the design promises."""
|
||||
captured: dict = {ns: [] for ns in ("tui:*", "agent:*", "session:*", "command:*")}
|
||||
for ns in captured:
|
||||
# Closure over ns — bind it as a default arg to avoid late binding.
|
||||
dashboard_server["registry"].register(
|
||||
ns, lambda e, _c, _ns=ns: captured[_ns].append(e)
|
||||
)
|
||||
|
||||
source_reg = HookRegistry()
|
||||
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
|
||||
try:
|
||||
# One event per namespace.
|
||||
source_reg.emit_sync("tui:tool.start", {"session_id": "s", "payload": {}})
|
||||
source_reg.emit_sync("agent:start", {"user_id": "u"})
|
||||
source_reg.emit_sync("session:reset", {"session_key": "k"})
|
||||
source_reg.emit_sync("command:reset", {"command": "reset"})
|
||||
|
||||
assert _wait_until(
|
||||
lambda: all(len(v) >= 1 for v in captured.values()),
|
||||
timeout=10.0,
|
||||
), f"Some events missing: {captured}"
|
||||
|
||||
assert captured["tui:*"] == ["tui:tool.start"]
|
||||
assert captured["agent:*"] == ["agent:start"]
|
||||
assert captured["session:*"] == ["session:reset"]
|
||||
assert captured["command:*"] == ["command:reset"]
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
|
||||
def test_loop_prevention_forwarded_events_not_reshipped(dashboard_server, hermes_home):
|
||||
"""An event whose context already has ``_forwarded=True`` is NOT
|
||||
shipped by the source-side forwarder.
|
||||
|
||||
This is what closes the loop: dashboard republishes an event into
|
||||
its own registry with ``_forwarded=True``, and if a forwarder were
|
||||
also running in that process (it isn't, by design — but defense in
|
||||
depth) it would skip the event instead of round-tripping it back.
|
||||
"""
|
||||
# Counter on the dashboard side — bumps every time the ingest
|
||||
# endpoint fires the agent:start handler.
|
||||
ingest_hits: list = []
|
||||
dashboard_server["registry"].register(
|
||||
"agent:start", lambda _e, _c: ingest_hits.append(1)
|
||||
)
|
||||
|
||||
source_reg = HookRegistry()
|
||||
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
|
||||
try:
|
||||
# Fire an event that's already marked as forwarded — simulates
|
||||
# an event that came back to the source process somehow. The
|
||||
# forwarder must skip it (no POST to /ingest).
|
||||
source_reg.emit_sync(
|
||||
"agent:start",
|
||||
{
|
||||
"platform": "telegram",
|
||||
"_forwarded": True,
|
||||
"_forwarded_from": "dashboard-echo",
|
||||
},
|
||||
)
|
||||
|
||||
# Wait long enough that if a POST were going to happen, it
|
||||
# would have. Then verify NO ingest hits occurred.
|
||||
time.sleep(0.3)
|
||||
assert ingest_hits == [], (
|
||||
f"Forwarder shipped a _forwarded=True event: {ingest_hits}"
|
||||
)
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
|
||||
def test_forwarder_recovers_from_dashboard_restart(dashboard_server, hermes_home):
|
||||
"""If the dashboard rotates its token (restart), the forwarder's
|
||||
next probe re-reads ``dashboard.json`` and picks up the new token.
|
||||
|
||||
Unit-tested in detail in ``test_hook_forwarder.py::TestDiscoveryRefresh``.
|
||||
Here we pin the integration: after a token rotation + probe, events
|
||||
still land at the dashboard.
|
||||
"""
|
||||
captured: list = []
|
||||
dashboard_server["registry"].register(
|
||||
"agent:start", lambda _e, c: captured.append(c)
|
||||
)
|
||||
|
||||
source_reg = HookRegistry()
|
||||
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
|
||||
try:
|
||||
# First event lands cleanly with the original token.
|
||||
source_reg.emit_sync("agent:start", {"seq": 1})
|
||||
assert _wait_until(lambda: len(captured) >= 1, timeout=5.0)
|
||||
assert captured[0]["seq"] == 1
|
||||
|
||||
# Simulate dashboard restart: token rotates in dashboard.json
|
||||
# AND in the ingest endpoint's module state (we're sharing the
|
||||
# same hook_ingest module so write_dashboard_discovery_file
|
||||
# updates both).
|
||||
port = dashboard_server["port"]
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
|
||||
|
||||
# Force a discovery refresh directly — simulates what the
|
||||
# forwarder's worker does on its 30s probe cycle. Using the
|
||||
# active forwarder's HTTP client this way avoids a 30s
|
||||
# real-time wait.
|
||||
import httpx
|
||||
with httpx.Client(timeout=2.0) as client:
|
||||
fwd = hook_forwarder._active
|
||||
assert fwd is not None
|
||||
fwd._refresh_discovery(client)
|
||||
|
||||
source_reg.emit_sync("agent:start", {"seq": 2})
|
||||
assert _wait_until(lambda: len(captured) >= 2, timeout=10.0)
|
||||
assert captured[1]["seq"] == 2
|
||||
finally:
|
||||
hook_forwarder.stop()
|
||||
|
||||
|
||||
def test_no_dashboard_available_silent_noop(hermes_home):
|
||||
"""When no dashboard is reachable (no discovery file), the forwarder
|
||||
is a complete no-op — no thread spawned, no handlers registered."""
|
||||
# Don't start the dashboard fixture — there's nothing to forward to.
|
||||
source_reg = HookRegistry()
|
||||
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
|
||||
assert result is None
|
||||
assert source_reg._handlers == {}
|
||||
# And firing an event doesn't cause anything to happen.
|
||||
source_reg.emit_sync("agent:start", {}) # must not raise
|
||||
|
||||
|
||||
def test_no_dashboard_then_dashboard_starts_later(hermes_home):
|
||||
"""``start_if_dashboard_available`` is a one-shot check at call time.
|
||||
|
||||
If no dashboard is running when the source process starts, the
|
||||
forwarder doesn't start. Documented behavior: source-process
|
||||
consumers must call ``start_if_dashboard_available`` once at startup;
|
||||
if a dashboard appears later, only newly-started processes pick it
|
||||
up. (A long-running process with no forwarder will never get one
|
||||
retroactively.)
|
||||
|
||||
This test pins that contract so we notice if we accidentally add
|
||||
retry behavior — that would be a behavior change worth discussing,
|
||||
not a sneak in.
|
||||
"""
|
||||
source_reg = HookRegistry()
|
||||
# First call: no dashboard yet.
|
||||
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
assert result is None
|
||||
|
||||
# Dashboard appears.
|
||||
port = _free_port()
|
||||
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
|
||||
|
||||
# Calling again does start the forwarder now. (The contract says
|
||||
# the wire-up sites only call once at startup, but the function
|
||||
# itself supports retries — useful for tests, and so callers can
|
||||
# call it from a post-config hook if they want.)
|
||||
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
|
||||
assert result is not None
|
||||
|
||||
hook_forwarder.stop()
|
||||
185
tests/test_tui_gateway_hook_bridge.py
Normal file
185
tests/test_tui_gateway_hook_bridge.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""Tests for the TUI gateway → ``gateway.hooks`` bridge.
|
||||
|
||||
Every call into ``tui_gateway.server._emit`` should mirror the event onto the
|
||||
process-wide ``HookRegistry`` under the ``tui:`` namespace so in-process
|
||||
plugins can subscribe via :func:`gateway.hooks.get_default_registry`.
|
||||
|
||||
The mirror runs as a side-effect after ``write_json`` and is wrapped in a
|
||||
broad try/except so a buggy subscriber can never break the main JSON-RPC
|
||||
dispatch path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.hooks import (
|
||||
HookRegistry,
|
||||
_reset_default_registry_for_tests,
|
||||
get_default_registry,
|
||||
install_as_default,
|
||||
)
|
||||
from tui_gateway import server
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_registry_and_module_cache():
|
||||
"""Reset the default registry and the TUI module-level cache before each test.
|
||||
|
||||
Without this the cache from a previous test (or a previous run within the
|
||||
same process) would shadow our fresh install_as_default call and the
|
||||
mirrored event would land on the wrong registry.
|
||||
"""
|
||||
_reset_default_registry_for_tests()
|
||||
# Force the deferred-import cache in the TUI module to re-resolve.
|
||||
server._hook_registry = None
|
||||
# Also reset the forwarder-start sentinel so each test's first emit
|
||||
# re-evaluates "is a dashboard reachable?" against the fixture's
|
||||
# state instead of remembering a previous test's outcome.
|
||||
server._forwarder_started = False
|
||||
yield
|
||||
_reset_default_registry_for_tests()
|
||||
server._hook_registry = None
|
||||
server._forwarder_started = False
|
||||
|
||||
|
||||
class _StubTransport:
|
||||
"""Captures write_json calls so the test doesn't actually touch stdout."""
|
||||
|
||||
def __init__(self):
|
||||
self.written: list[dict] = []
|
||||
|
||||
def write(self, obj):
|
||||
self.written.append(obj)
|
||||
return True
|
||||
|
||||
|
||||
def test_emit_mirrors_to_default_registry():
|
||||
transport = _StubTransport()
|
||||
captured: list = []
|
||||
reg = HookRegistry()
|
||||
install_as_default(reg)
|
||||
reg.register("tui:tool.start", lambda e, c: captured.append((e, c)))
|
||||
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "sid-123", {"name": "search_files"})
|
||||
|
||||
# The JSON-RPC event was written as before.
|
||||
assert len(transport.written) == 1
|
||||
assert transport.written[0]["params"]["type"] == "tool.start"
|
||||
|
||||
# The hook bus saw a tui:-prefixed mirror.
|
||||
assert captured == [
|
||||
(
|
||||
"tui:tool.start",
|
||||
{"session_id": "sid-123", "payload": {"name": "search_files"}},
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
def test_emit_with_no_payload_yields_empty_payload_dict():
|
||||
transport = _StubTransport()
|
||||
captured: list = []
|
||||
reg = HookRegistry()
|
||||
install_as_default(reg)
|
||||
reg.register("tui:session.info", lambda _e, c: captured.append(c))
|
||||
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("session.info", "sid-1", None)
|
||||
|
||||
assert captured == [{"session_id": "sid-1", "payload": {}}]
|
||||
|
||||
|
||||
def test_emit_subscriber_exception_does_not_break_dispatch():
|
||||
transport = _StubTransport()
|
||||
reg = HookRegistry()
|
||||
install_as_default(reg)
|
||||
|
||||
def broken(_e, _c):
|
||||
raise RuntimeError("subscriber blew up")
|
||||
|
||||
reg.register("tui:tool.start", broken)
|
||||
|
||||
# If _publish_tui_hook propagated, this would raise.
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "sid-1", {"name": "x"})
|
||||
|
||||
# JSON-RPC event still landed on stdout — host pipeline intact.
|
||||
assert len(transport.written) == 1
|
||||
assert transport.written[0]["params"]["type"] == "tool.start"
|
||||
|
||||
|
||||
def test_wildcard_subscriber_sees_all_tui_events():
|
||||
transport = _StubTransport()
|
||||
seen_types: list = []
|
||||
reg = HookRegistry()
|
||||
install_as_default(reg)
|
||||
reg.register("tui:*", lambda e, _c: seen_types.append(e))
|
||||
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "s", {})
|
||||
server._emit("message.delta", "s", {"text": "hi"})
|
||||
server._emit("session.info", "s", {})
|
||||
|
||||
assert seen_types == [
|
||||
"tui:tool.start",
|
||||
"tui:message.delta",
|
||||
"tui:session.info",
|
||||
]
|
||||
|
||||
|
||||
def test_emit_does_not_blow_up_when_no_subscribers():
|
||||
transport = _StubTransport()
|
||||
# No registry installed beyond the lazy default — and no handlers.
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "s", {"name": "x"})
|
||||
|
||||
# Dispatch worked, no subscribers fired (default registry is empty).
|
||||
assert len(transport.written) == 1
|
||||
|
||||
|
||||
def test_hook_registry_resolved_lazily_via_get_default_registry():
|
||||
"""The TUI module caches whatever ``get_default_registry`` returns at first
|
||||
use. Re-set the default before the first ``_emit`` and confirm the cache
|
||||
picks up the new instance, not a stale or never-installed one."""
|
||||
transport = _StubTransport()
|
||||
captured: list = []
|
||||
custom = HookRegistry()
|
||||
install_as_default(custom)
|
||||
custom.register("tui:tool.start", lambda _e, c: captured.append(c))
|
||||
|
||||
# Sanity: server._hook_registry starts unset thanks to the fixture.
|
||||
assert server._hook_registry is None
|
||||
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "s", {"x": 1})
|
||||
|
||||
# The cache now points at the installed registry.
|
||||
assert server._hook_registry is custom
|
||||
assert captured == [{"session_id": "s", "payload": {"x": 1}}]
|
||||
|
||||
# Subsequent emits keep using the cached reference even if the default
|
||||
# is swapped — the contract is "resolve once, cache thereafter."
|
||||
new_reg = HookRegistry()
|
||||
install_as_default(new_reg)
|
||||
second: list = []
|
||||
new_reg.register("tui:tool.start", lambda _e, c: second.append(c))
|
||||
custom.register("tui:tool.start", lambda _e, c: captured.append({"second": c}))
|
||||
|
||||
with patch.object(server, "_stdio_transport", transport):
|
||||
server._emit("tool.start", "s", {"x": 2})
|
||||
|
||||
# The cached registry (``custom``) saw the new event, the freshly-installed
|
||||
# ``new_reg`` did not.
|
||||
assert second == []
|
||||
# ``custom`` has two handlers registered now (the original lambda still
|
||||
# fires on every event, plus the second one that wraps payload in
|
||||
# ``{"second": ...}``). Both fire on the second ``_emit`` call.
|
||||
assert captured == [
|
||||
{"session_id": "s", "payload": {"x": 1}},
|
||||
{"session_id": "s", "payload": {"x": 2}},
|
||||
{"second": {"session_id": "s", "payload": {"x": 2}}},
|
||||
]
|
||||
assert get_default_registry() is new_reg
|
||||
@@ -388,6 +388,53 @@ def _emit(event: str, sid: str, payload: dict | None = None):
|
||||
if payload is not None:
|
||||
params["payload"] = payload
|
||||
write_json({"jsonrpc": "2.0", "method": "event", "params": params})
|
||||
_publish_tui_hook(event, sid, payload)
|
||||
|
||||
|
||||
def _publish_tui_hook(event: str, sid: str, payload: dict | None) -> None:
|
||||
"""Mirror a TUI gateway dispatch event onto the hook bus.
|
||||
|
||||
Every call from ``_emit`` produces a ``tui:<event>`` hook event so plugins
|
||||
can observe TUI activity (tool starts, message deltas, etc.) without
|
||||
forking ``_emit`` itself. Wrapped in a broad try/except — a bus subscriber
|
||||
bug must never break the main JSON-RPC dispatch path that already wrote
|
||||
the event to stdout above.
|
||||
|
||||
The import is deferred to first call to keep TUI cold-start cheap; the
|
||||
gateway hook module isn't otherwise needed in the TUI process.
|
||||
|
||||
On first call we also opportunistically start the cross-process hook
|
||||
forwarder so dashboard plugins see TUI events. No-op when no dashboard
|
||||
is running.
|
||||
"""
|
||||
try:
|
||||
global _hook_registry, _forwarder_started # noqa: PLW0603
|
||||
if _hook_registry is None:
|
||||
from gateway.hooks import get_default_registry # local import
|
||||
_hook_registry = get_default_registry()
|
||||
if not _forwarder_started:
|
||||
_forwarder_started = True
|
||||
try:
|
||||
from gateway import hook_forwarder
|
||||
hook_forwarder.start_if_dashboard_available(
|
||||
_hook_registry, src="tui"
|
||||
)
|
||||
except Exception:
|
||||
# Forwarder failure must never break TUI dispatch.
|
||||
pass
|
||||
_hook_registry.emit_sync( # type: ignore[union-attr]
|
||||
f"tui:{event}",
|
||||
{"session_id": sid, "payload": payload or {}},
|
||||
)
|
||||
except Exception:
|
||||
# Never propagate. Hook bus is best-effort.
|
||||
pass
|
||||
|
||||
|
||||
# Lazily-resolved on first ``_emit`` call. Cached in module scope so the
|
||||
# steady-state cost of mirroring is a dict lookup, not an import.
|
||||
_hook_registry: "object | None" = None
|
||||
_forwarder_started: bool = False
|
||||
|
||||
|
||||
def _status_update(sid: str, kind: str, text: str | None = None):
|
||||
|
||||
@@ -82,10 +82,99 @@ async def handle(event_type: str, context: dict):
|
||||
| `agent:step` | Each iteration of the tool-calling loop | `platform`, `user_id`, `session_id`, `iteration`, `tool_names` |
|
||||
| `agent:end` | Agent finishes processing | `platform`, `user_id`, `session_id`, `message`, `response` |
|
||||
| `command:*` | Any slash command executed | `platform`, `user_id`, `command`, `args` |
|
||||
| `tui:<sub-event>` | TUI dispatch event mirrored to the bus (see below) | `session_id`, `payload` (raw TUI payload) |
|
||||
|
||||
#### Wildcard Matching
|
||||
|
||||
Handlers registered for `command:*` fire for any `command:` event (`command:model`, `command:reset`, etc.). Monitor all slash commands with a single subscription.
|
||||
Handlers registered for `command:*` fire for any `command:<name>` event (`command:model`, `command:reset`, etc.) — useful for monitoring all slash commands with a single subscription. The same pattern works for `tui:*` to receive every TUI dispatch event.
|
||||
|
||||
Wildcards match **one** colon-separated namespace level: `foo:*` matches every `foo:<anything>` event, but does not cross another colon (a handler for `agent:*` does not fire for an unrelated `gateway:startup`).
|
||||
|
||||
#### `tui:*` events
|
||||
|
||||
The TUI gateway (`hermes --tui` or the embedded dashboard PTY) mirrors every JSON-RPC event it sends to the front-end onto the hook bus under the `tui:` namespace. Each handler receives `context = {"session_id": str, "payload": dict}` — the `payload` matches the JSON-RPC event payload the TUI wrote to stdout for that frame.
|
||||
|
||||
Common sub-events:
|
||||
|
||||
| Sub-event | When it fires |
|
||||
|-----------|--------------|
|
||||
| `tui:tool.start` | A tool call begins |
|
||||
| `tui:tool.progress` | A long-running tool reports progress |
|
||||
| `tui:tool.complete` | A tool call finishes |
|
||||
| `tui:message.start` / `message.delta` / `message.complete` | Assistant message lifecycle |
|
||||
| `tui:reasoning.available` | Reasoning content for the latest turn |
|
||||
| `tui:thinking.delta` | Streamed thinking text |
|
||||
| `tui:session.info` | Session metadata changed (model, tools, etc.) |
|
||||
| `tui:status.update` | Inline status line update |
|
||||
| `tui:error` | Error frame |
|
||||
|
||||
This list isn't exhaustive — anything ``_emit`` writes ends up on the bus. The full set is whatever `tui_gateway/server.py:_emit` happens to send today; payload shapes are documented alongside the front-end consumers in `ui-tui/src/`.
|
||||
|
||||
Subscribers fire **synchronously** inside the TUI's hot dispatch path, so handlers should be cheap (push to a queue, set an `asyncio.Event`, etc.) and never block. Exceptions inside a handler are caught and logged — they never break TUI dispatch.
|
||||
|
||||
### Programmatic registration
|
||||
|
||||
Discovery from `~/.hermes/hooks/` is the common path, but plugins, tests, and built-in code can also register handlers in-process:
|
||||
|
||||
```python
|
||||
from gateway.hooks import get_default_registry
|
||||
|
||||
def on_tool_start(event_type, context):
|
||||
name = context["payload"].get("name", "?")
|
||||
print(f"[my-plugin] {event_type} -> {name}")
|
||||
|
||||
unregister = get_default_registry().register("tui:tool.start", on_tool_start)
|
||||
# ... later, to clean up:
|
||||
unregister()
|
||||
```
|
||||
|
||||
`HookRegistry.register(event_type, handler, *, name=None)` accepts the same handler signature as discovered hooks (`handle(event_type, context)`, sync or async) and returns a no-arg callable that removes that specific handler. Other handlers on the same event are unaffected.
|
||||
|
||||
For callers that cannot `await` — the TUI's `_emit` is a good example — fire events via the synchronous path:
|
||||
|
||||
```python
|
||||
registry.emit_sync("tui:tool.start", {"session_id": "s1", "payload": {...}})
|
||||
```
|
||||
|
||||
`emit_sync` runs sync handlers immediately; async handlers are scheduled on the running event loop when one is available, or skipped with a one-time per-handler warning when there isn't.
|
||||
|
||||
### Per-process registries
|
||||
|
||||
Each Hermes process (gateway, TUI, dashboard, batch-runner, CLI) has its **own** `HookRegistry`. Hooks dropped into `~/.hermes/hooks/` are discovered independently in each process. A handler registered programmatically in the gateway's registry is **not** visible to the TUI's registry and vice-versa.
|
||||
|
||||
For programmatic handlers (especially dashboard plugins) that want to observe events from other processes, Hermes ships a built-in **cross-process forwarder** that bridges the gateway/TUI/CLI/batch-runner processes to the dashboard process. See "Cross-process delivery" below.
|
||||
|
||||
For file-system hooks (`HOOK.yaml` + `handler.py`), the simplest pattern is still: drop the hook directory, every process picks it up on startup. With cross-process delivery enabled, this means a file-system hook subscribed to e.g. `agent:end` will fire **twice** for each event — once in the gateway process (where the event originates) and once in the dashboard process (where it's republished by the forwarder). Filter on `context.get("_forwarded")` if you only want one or the other.
|
||||
|
||||
### Cross-process delivery
|
||||
|
||||
In a typical `hermes dashboard` + `hermes start` deployment, the dashboard and the gateway are separate processes. Events fired in the gateway (`agent:*`, `session:*`, `command:*`) and the TUI sidecar (`tui:*`) need to reach dashboard plugins (e.g. the orb visualization) that subscribe via `get_default_registry().register(...)` in the dashboard process.
|
||||
|
||||
Hermes handles this automatically. The dashboard writes a discovery file at `~/.hermes/dashboard.json` on startup containing its bound URL and a freshly-generated bearer token (0600 mode, owner-readable only). Non-dashboard processes (gateway, TUI, CLI, batch-runner) read this file on startup, register a built-in forwarder for the canonical namespaces (`tui:*`, `agent:*`, `session:*`, `command:*`, `gateway:*`), and POST every fired event to the dashboard's `POST /api/hooks/ingest` endpoint. The dashboard republishes each forwarded event on its own default registry via `emit_sync`, with `context["_forwarded"] = True` and `context["_forwarded_from"] = "<src>"` added so:
|
||||
|
||||
1. Source-side forwarders skip the event if it ever round-trips back (loop prevention).
|
||||
2. Handlers can filter on `_forwarded` if they want to dedupe between the source-process firing and the dashboard-republish.
|
||||
|
||||
**Behavior:**
|
||||
|
||||
- **Zero-config** when both processes are running. No configuration needed; the discovery file appears and disappears with the dashboard.
|
||||
- **Bind-address independent.** The forwarder works regardless of whether the dashboard binds loopback (`127.0.0.1`) or a public interface (`--insecure`). The bearer token in `dashboard.json` is the security boundary, not the bind address.
|
||||
- **No-op when no dashboard is running.** The forwarder doesn't register handlers; events fire in their source process and stay there.
|
||||
- **Best-effort.** A bounded queue (1024 frames per source process) drops oldest on overflow. POSTs use a 2-second timeout. Recency beats history when the dashboard is slow.
|
||||
- **Non-blocking.** The forwarder's handler enqueues onto a queue and returns immediately. A daemon thread does the HTTP POSTs. The hot publisher path is never blocked on network I/O.
|
||||
|
||||
**Disabling:** set `HERMES_HOOK_FORWARDER=0` (or `false`/`no`/`off`) in the source process's environment. Useful for paranoid security postures or for debugging hook double-fires.
|
||||
|
||||
**Failure modes:**
|
||||
|
||||
| Scenario | Behavior |
|
||||
|---|---|
|
||||
| Dashboard not running | `dashboard.json` absent; forwarder no-op until next probe (30s). |
|
||||
| Dashboard restarts (new token) | Forwarder's next probe re-reads the file and picks up the new token. |
|
||||
| Dashboard up but slow | Forwarder queue overflows and drops oldest. |
|
||||
| Forwarder POST timeout | Logged once per minute, not per failure. Queue drains as POSTs return. |
|
||||
| `--insecure` mode | Forwarder works identically to loopback mode. Plaintext-over-LAN; same exposure as the rest of `--insecure`. |
|
||||
|
||||
|
||||
### Examples
|
||||
|
||||
|
||||
Reference in New Issue
Block a user