Compare commits

...

2 Commits

Author SHA1 Message Date
Ben
f24346c011 feat(hooks): cross-process delivery via built-in forwarder + dashboard ingest endpoint
Without this, the hook registry extensions are only half useful: dashboard
plugins can subscribe to events via get_default_registry().register(...), but
they only ever see events fired in the dashboard process itself.  In a
typical hermes dashboard + hermes start deployment that means agent:*,
session:*, command:* (fired by the gateway) and tui:* (fired by the TUI
sidecar) are invisible — exactly the events most dashboard plugins want.

This commit closes the gap with a built-in cross-process bridge.  Source
processes (gateway, TUI, CLI, batch-runner) auto-discover the dashboard
via ~/.hermes/dashboard.json and forward every fired event via HTTP POST
to /api/hooks/ingest, which republishes them on the dashboard's default
registry.  Zero-config, bind-address-independent, best-effort, never
blocks the publisher.

## New components

### gateway/hook_forwarder.py (~450 LOC)

Source-side shipper. start_if_dashboard_available(registry, src=...) is the
entry point; non-dashboard processes call it once at startup. Behavior:

- Reads $HERMES_HOME/dashboard.json to find the dashboard URL + bearer
  token.  No-op when the file is absent or HERMES_HOOK_FORWARDER=0.
- Registers a sync handler on every canonical namespace (tui:*, agent:*,
  session:*, command:*, gateway:*) that enqueues fired events onto a
  bounded Queue (1024 max; drops oldest on overflow).
- Daemon worker thread drains the queue and POSTs each frame via httpx
  (2s timeout, keep-alive).  Loop prevention: events whose context
  carries _forwarded=True are skipped — those came from the ingest
  endpoint and must not be reshipped.
- Periodic probe (30s) re-reads the discovery file so a dashboard that
  restarts (new token) auto-recovers.  401 responses also invalidate the
  cached token so the next probe picks up the new one.
- Error logging is rate-limited to once per minute per process — a downed
  dashboard can't spam agent.log.
- Idempotent module singleton: repeated calls in the same process re-use
  the same forwarder instance.

### hermes_cli/hook_ingest.py (~280 LOC)

Dashboard-side receiver. Provides:

- write_dashboard_discovery_file(host, port): atomically writes
  $HERMES_HOME/dashboard.json with 0600 mode and a freshly-generated
  hooks_ingest_token.  Called from web_server.py:start_server().
- remove_dashboard_discovery_file(): atexit hook clears the file on
  clean shutdown so orphan forwarders don't keep POSTing to a closed port.
- build_hook_router(): returns a FastAPI router with two routes:
  - GET /api/hooks/health — unauthenticated reachability probe
  - POST /api/hooks/ingest — bearer-token authenticated; republishes
    via get_default_registry().emit_sync(event_type, context).
    Stamps context['_forwarded'] = True and ['_forwarded_from'] = src
    so source-side forwarders skip the event if it round-trips back.

The bearer token is independent of _SESSION_TOKEN / the OAuth gate.
It's the security boundary regardless of bind address — works
identically in --insecure mode (token-on-disk vs network bind are
orthogonal concerns; same trust model as ~/.hermes/auth.json).

## Wire-up

- gateway/run.py — Gateway.__init__ calls start_if_dashboard_available(
  self.hooks, src='gateway') after install_as_default(self.hooks).
- tui_gateway/server.py — first _emit() call lazily starts the forwarder
  with src='tui' alongside the existing default-registry resolution.
- hermes_cli/web_server.py:start_server — writes the discovery file
  after binding (so forwarders find the right port) and registers an
  atexit cleanup.  Mounts build_hook_router() at /api/hooks/.
- hermes_cli/dashboard_auth/middleware.py — adds /api/hooks/ to the
  OAuth gate's public prefixes so the bearer-token auth model isn't
  preempted by the cookie auth gate.
- hermes_cli/web_server.py:_PUBLIC_API_PATHS — adds the two ingest
  routes so the session-token middleware doesn't preempt them either.
- cli.py:HermesCLI.run + batch_runner.py:main — future-proof
  start_if_dashboard_available calls so CLI/batch processes that may
  fire hooks in the future (via tools, etc.) participate too.

## Tests

- tests/gateway/test_hook_forwarder.py — 24 unit tests covering no-op
  paths (no dashboard, env disabled, malformed discovery), registration
  on every forwarded namespace, handler behavior (enqueue, loop
  prevention, queue overflow drops oldest, never raises),
  discovery-refresh semantics (token rotation, probe failure
  invalidation), and rate-limited error logging.

- tests/hermes_cli/test_hook_ingest_endpoint.py — 27 unit tests covering
  the discovery file lifecycle (0600 mode, atomic write, parent dir
  creation), token rotation per write, idempotent removal, in-memory
  token cleared on remove, /health unauthenticated, /ingest 401 paths
  (no token / wrong token / no-token-set), /ingest 200 paths (including
  --insecure mode parity), body validation (400 on non-JSON, non-dict
  body, missing/empty event_type, non-dict context), republish
  semantics (_forwarded stamp overrides caller-provided value, wildcards
  see forwarded events, src defaults to '?', non-string src normalized),
  and handler-exception isolation.

- tests/test_hook_forwarder_integration.py — 6 end-to-end integration
  tests with a real uvicorn server (free port + dashboard.json + the
  actual hook router mounted): single-event delivery, multi-namespace
  delivery, loop prevention via _forwarded=True, recovery from dashboard
  restart (token rotation + probe), silent no-op when no dashboard,
  one-shot semantics of start_if_dashboard_available.

Test count: 57 new tests in three new files.  Full
./scripts/run_tests.sh tests/gateway/ tests/hermes_cli/
tests/test_tui_gateway_* tests/test_hook_forwarder_integration.py run:
11,850 tests passed, 0 failed (546 files, 132s on 24 workers).

## Documentation

website/docs/user-guide/features/hooks.md gets a 'Cross-process delivery'
section covering:

- The architecture (discovery file + bearer token + ingest endpoint).
- Behavior guarantees (zero-config, bind-address-independent, no-op when
  no dashboard, best-effort with bounded queue, non-blocking).
- HERMES_HOOK_FORWARDER=0 disable knob.
- Failure-mode table.

Plus a note on file-system hooks firing 2x with forwarding enabled
(once per source process, once on the dashboard re-publish) with the
_forwarded flag as the dedup signal.

## Out of scope (follow-up)

- Worker processes spawned by batch_runner's multiprocessing.Pool don't
  yet get their own forwarder — would need Pool(initializer=...) wire-up.
  Master process is covered.
- Subagent-process forwarders for processes spawned outside the gateway —
  delegate_task today runs subagents in-process, so this isn't needed
  for current workflows.
- WebSocket transport for high-volume scenarios — HTTP POST handles peak
  ~70 events/sec easily on loopback; revisit if measurement shows
  backpressure.
- Cross-host delivery (gateway on box A, dashboard on box B).
2026-05-29 13:15:36 +10:00
Ben
299b9dba67 feat(hooks): extend HookRegistry with programmatic registration, sync emit, and tui:* mirror
Adds three small extensions to the existing event hook system so plugins can
observe agent activity without introducing a parallel pub/sub bus (cf. closed
PR #34195 which duplicated this surface).

Changes to gateway/hooks.py:

- HookRegistry.register(event_type, handler, *, name=None) — programmatic
  registration that pairs with file-system discovery from ~/.hermes/hooks/.
  Returns a no-arg callable that deregisters that specific handler. Other
  handlers on the same event are unaffected.

- HookRegistry.emit_sync(event_type, context) — companion to the async
  emit() for hot-path callers that cannot await. Sync handlers run
  immediately; async handlers are scheduled on the current running event
  loop (if any) via asyncio.ensure_future, or skipped with a one-time
  per-handler warning when no loop is available. Like emit(), it never
  raises and a buggy subscriber can't break the host pipeline.

- get_default_registry() / install_as_default(registry) — module-level
  default registry singleton so plugins and in-process callers can find
  'the' registry without threading a reference through every API. The
  gateway installs its own self.hooks as the default during startup.

Changes to tui_gateway/server.py:

- _emit() now mirrors every JSON-RPC event onto the default registry as
  a 'tui:<sub-event>' hook event with context = {session_id, payload}.
  The mirror runs as a side-effect after write_json and is wrapped in a
  broad try/except so a subscriber bug can never break TUI dispatch. The
  gateway.hooks module is imported lazily on first _emit call to keep
  TUI cold-start cheap.

Wildcard semantics unchanged — handlers registered for 'tui:*' fire for
every tui:<anything> event, just like the existing 'command:*' pattern.

Test coverage: +10 unit tests in tests/gateway/test_hooks.py covering
register/unregister, emit_sync sync+async+wildcard+exception paths, and
default-registry singleton behavior. New tests/test_tui_gateway_hook_bridge.py
exercises the _emit → registry plumbing end-to-end including subscriber
exception isolation, wildcard subscriptions, and the lazy resolve cache.

Docs: website/docs/user-guide/features/hooks.md gains a 'tui:*' events
table, the new 'Programmatic registration' section, and a note that
each Hermes process has its own registry (gateway-discovered hooks are
loaded independently in each process).

Test counts: 38 hooks tests (was 28), 6 new bridge tests.
Full ./scripts/run_tests.sh tests/gateway/ tests/test_tui_gateway* run:
6127 tests passed, 0 failed (272 files, 52s on 24 workers).
2026-05-29 11:58:20 +10:00
15 changed files with 2764 additions and 14 deletions

View File

@@ -1218,6 +1218,23 @@ def main(
# List available distributions
python batch_runner.py --list_distributions
"""
# Cross-process hook delivery: start the forwarder if a dashboard
# is reachable. No-op when no dashboard is running or when
# ``HERMES_HOOK_FORWARDER=0`` is set. Currently the batch runner
# emits no hooks directly (only the gateway and TUI do today), but
# wiring here future-proofs batch-spawned agent runs that may emit
# hooks via tools. Forwarder workers spawned by ``multiprocessing.Pool``
# below would each need their own; that's deferred to a follow-up.
# See gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
try:
from gateway import hook_forwarder
from gateway.hooks import get_default_registry
hook_forwarder.start_if_dashboard_available(
get_default_registry(), src="batch"
)
except Exception:
pass
# Handle list distributions
if list_distributions:
from toolset_distributions import print_distribution_info

16
cli.py
View File

@@ -12515,6 +12515,22 @@ class HermesCLI:
def run(self):
"""Run the interactive CLI loop with persistent input at bottom."""
# Cross-process hook delivery: start the forwarder if a
# dashboard is reachable. No-op when no dashboard is running
# or when ``HERMES_HOOK_FORWARDER=0`` is set. Currently the
# CLI process emits no hooks directly (only the gateway and
# TUI do today), but wiring here future-proofs CLI-spawned
# agent runs that may emit hooks via tools. See
# gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
try:
from gateway import hook_forwarder
from gateway.hooks import get_default_registry
hook_forwarder.start_if_dashboard_available(
get_default_registry(), src="cli"
)
except Exception:
pass
# Detect light/dark terminal mode now (before pt grabs the tty).
# Caches the result so subsequent _hex_to_ansi / style calls
# don't risk re-querying mid-render.

450
gateway/hook_forwarder.py Normal file
View File

@@ -0,0 +1,450 @@
"""Cross-process hook forwarder.
Subscribes to a fixed set of namespaces on a process-local
:class:`gateway.hooks.HookRegistry` and POSTs each fired event to the
dashboard's ``/api/hooks/ingest`` endpoint. The dashboard republishes
the event on its own default registry so plugins running in the
dashboard process see events that originated in the gateway, TUI,
subagent, or batch-runner processes.
Design constraints (see ``DESIGN-cross-process-hooks.md``):
* Never blocks the publisher. Handler enqueues onto a bounded queue
and returns immediately; a daemon worker thread does the HTTP POSTs.
* Bounded queue drops *oldest* on overflow. Observability events are
best-effort; recency beats history when the dashboard is slow.
* Loop prevention. Events whose context carries ``_forwarded=True``
are skipped — those came from the ingest endpoint and must not be
shipped back.
* No-op when no dashboard is available. Discovery file absent ⇒ no
registration, no thread. Probe re-checks every 30s so a dashboard
that starts later auto-attaches.
* ``HERMES_HOOK_FORWARDER=0`` short-circuits everything for paranoid
security postures, or for tests that don't want the daemon thread.
The forwarder is wired in by long-lived non-dashboard processes
(gateway, TUI, subagents, batch runners) via
:func:`start_if_dashboard_available`.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from pathlib import Path
from queue import Empty, Full, Queue
from typing import Any, Callable, Optional
from hermes_cli.config import get_hermes_home
_log = logging.getLogger(__name__)
# Namespaces the forwarder ships to the dashboard. Picked to cover every
# event type the registry emits today. Adding a new namespace is a
# one-line append here. Wildcards use the existing registry semantics
# (``<namespace>:*`` matches every ``<namespace>:<anything>`` event).
_FORWARDED_NAMESPACES = (
"tui:*",
"agent:*",
"session:*",
"command:*",
"gateway:*",
)
# Bounded queue size per source process. At peak (~70 events/sec, see the
# design doc's "Performance" section) this is ~14 seconds of backlog
# before drop-oldest kicks in. Generous for a purely-observability feed.
_QUEUE_MAX = 1024
# Probe cadence. When the dashboard isn't running we re-check the
# discovery file every 30s so a delayed ``hermes dashboard`` startup
# eventually attaches. Cheap (one stat + one health GET) so this can
# run forever without overhead.
_PROBE_INTERVAL_S = 30.0
# HTTP timeouts. Connection is loopback in the common case so anything
# beyond 2s probably means the dashboard is wedged; better to drop the
# frame than queue up retries that won't help.
_HTTP_TIMEOUT_S = 2.0
# Error-logging cadence. POST failures get logged once per minute, not
# per failure, so a downed dashboard doesn't spam ``agent.log``.
_ERROR_LOG_INTERVAL_S = 60.0
def _dashboard_discovery_path() -> Path:
"""Return the path the dashboard writes its discovery JSON to.
Always under ``$HERMES_HOME``; no ``/tmp`` fallback. Config
consistency across processes is a precondition, not something the
forwarder patches over.
"""
return get_hermes_home() / "dashboard.json"
def _read_discovery_file() -> Optional[dict]:
"""Read and parse ``dashboard.json``.
Returns ``None`` when the file is absent, unreadable, malformed, or
missing required keys. Never raises — callers treat a ``None``
result as "no dashboard available" and re-probe on the next cycle.
"""
path = _dashboard_discovery_path()
try:
if not path.exists():
return None
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(data, dict):
return None
if "url" not in data or "hooks_ingest_token" not in data:
return None
return data
def _disabled_by_env() -> bool:
"""``HERMES_HOOK_FORWARDER=0`` (or ``false``/``no``) short-circuits."""
val = os.environ.get("HERMES_HOOK_FORWARDER", "").strip().lower()
return val in ("0", "false", "no", "off")
# ---------------------------------------------------------------------------
# HookForwarder — per-process singleton-ish
# ---------------------------------------------------------------------------
class _HookForwarder:
"""The actual forwarder.
Tracks the registry it's attached to, the unregister callables (so
:meth:`stop` can clean up), and the worker thread state.
Multiple instances are technically supported but in practice each
process holds at most one — see :func:`start_if_dashboard_available`
and the module-level ``_active`` reference.
"""
def __init__(self, src: str) -> None:
self.src = src
self._queue: Queue[dict] = Queue(maxsize=_QUEUE_MAX)
self._unregisters: list[Callable[[], None]] = []
self._stop = threading.Event()
self._worker: Optional[threading.Thread] = None
# Discovery state — refreshed by the worker thread before each
# POST so a dashboard restart (with a new token) auto-recovers.
self._discovery: Optional[dict] = None
self._discovery_lock = threading.Lock()
# Error-log rate limit state.
self._last_error_log_at: float = 0.0
self._error_count_since_log: int = 0
# -- registry side ---------------------------------------------------
def _handler(self, event_type: str, context: dict) -> None:
"""Sync hook handler — enqueues the event for the worker thread."""
# Loop prevention: events forwarded *into* this process must not
# be shipped back. The dashboard's ingest endpoint stamps every
# republished context with ``_forwarded=True``.
if context.get("_forwarded") is True:
return
try:
self._queue.put_nowait(
{"event_type": event_type, "context": context, "src": self.src}
)
except Full:
# Drop oldest, enqueue newest. Observability is best-effort.
try:
self._queue.get_nowait()
except Empty:
pass
try:
self._queue.put_nowait(
{"event_type": event_type, "context": context, "src": self.src}
)
except Full:
# Pathological — worker isn't draining at all. Give up
# silently; next event will overwrite this slot.
pass
def register(self, registry: "object") -> None:
"""Register the handler for every forwarded namespace."""
for pattern in _FORWARDED_NAMESPACES:
unreg = registry.register( # type: ignore[union-attr]
pattern,
self._handler,
name=f"hook_forwarder({pattern}→dashboard)",
)
self._unregisters.append(unreg)
def unregister(self) -> None:
"""Remove every handler the forwarder installed. Idempotent."""
while self._unregisters:
try:
self._unregisters.pop()()
except Exception: # pragma: no cover — defensive
pass
# -- worker thread ---------------------------------------------------
def start_worker(self) -> None:
"""Spawn the daemon worker thread that drains the queue."""
if self._worker is not None and self._worker.is_alive():
return
self._stop.clear()
self._worker = threading.Thread(
target=self._worker_loop,
name=f"hook-forwarder-{self.src}",
daemon=True,
)
self._worker.start()
def stop_worker(self, *, join_timeout: float = 1.0) -> None:
"""Signal the worker to exit and wait briefly for it to do so.
Idempotent. Safe to call from any thread. Daemon threads die
with the process anyway; the join is only there so tests don't
race on lingering threads between cases.
"""
self._stop.set()
if self._worker is not None:
try:
self._worker.join(timeout=join_timeout)
except RuntimeError:
pass
def _worker_loop(self) -> None:
"""Drain the queue, POSTing each frame, until ``_stop`` is set."""
# httpx is imported lazily so processes that never produce events
# don't pay the import cost. In practice the gateway and TUI
# both have httpx loaded by the time we get here, but defer
# anyway to be polite.
import httpx
next_probe_at: float = 0.0
with httpx.Client(timeout=_HTTP_TIMEOUT_S) as client:
while not self._stop.is_set():
# Probe the discovery file periodically so a dashboard
# that comes up later (or restarts with a new token)
# auto-attaches.
now = time.monotonic()
if now >= next_probe_at:
next_probe_at = now + _PROBE_INTERVAL_S
self._refresh_discovery(client)
# Block on the queue. Short timeout so we re-check the
# stop flag and probe interval often enough.
try:
frame = self._queue.get(timeout=1.0)
except Empty:
continue
with self._discovery_lock:
discovery = self._discovery
if discovery is None:
# No dashboard available right now. Drop the frame;
# the next probe will re-acquire discovery, and any
# events fired between now and then are lost (the
# design accepts best-effort delivery).
continue
self._post_frame(client, discovery, frame)
def _refresh_discovery(self, client: "Any") -> None: # client: httpx.Client
"""Reload the discovery file and probe the dashboard's health.
Updates ``self._discovery`` to the new value (or ``None`` if the
dashboard is unreachable). Called from the worker thread.
"""
data = _read_discovery_file()
if data is None:
with self._discovery_lock:
self._discovery = None
return
url = data["url"].rstrip("/")
try:
resp = client.get(f"{url}/api/hooks/health", timeout=_HTTP_TIMEOUT_S)
if resp.status_code != 200:
with self._discovery_lock:
self._discovery = None
return
except Exception:
with self._discovery_lock:
self._discovery = None
return
with self._discovery_lock:
self._discovery = data
def _post_frame(
self,
client: "Any", # httpx.Client
discovery: dict,
frame: dict,
) -> None:
"""POST one frame to ``/api/hooks/ingest``.
Errors are logged at most once per minute, regardless of how
many failures accumulate. A 401 invalidates the cached
discovery so the next probe re-reads the token (which may have
rotated on a dashboard restart).
"""
url = discovery["url"].rstrip("/") + "/api/hooks/ingest"
token = discovery["hooks_ingest_token"]
# Filter the context: ``_forwarded`` etc. are added by the
# dashboard on republish. Source-side contexts are passed as-is,
# but they should never carry ``_forwarded=True`` (that's how
# loop prevention works above).
try:
resp = client.post(
url,
json={
"event_type": frame["event_type"],
"context": frame["context"],
"src": frame["src"],
},
headers={"Authorization": f"Bearer {token}"},
timeout=_HTTP_TIMEOUT_S,
)
except Exception as exc:
self._log_error(f"POST {url} failed: {exc}")
return
if resp.status_code == 401:
# Token rotated (dashboard restarted with a new one).
# Invalidate the cached discovery; the next probe will
# re-read the file and pick up the new token.
self._log_error(
"POST /api/hooks/ingest returned 401 — token rotated; "
"invalidating discovery cache"
)
with self._discovery_lock:
self._discovery = None
return
if resp.status_code >= 400:
self._log_error(
f"POST /api/hooks/ingest returned {resp.status_code}: "
f"{resp.text[:200]!r}"
)
def _log_error(self, message: str) -> None:
"""Rate-limited error logging — once per minute per process."""
self._error_count_since_log += 1
now = time.monotonic()
if now - self._last_error_log_at < _ERROR_LOG_INTERVAL_S:
return
suppressed = self._error_count_since_log - 1
suffix = f" ({suppressed} similar errors suppressed)" if suppressed else ""
_log.warning("[hook_forwarder] %s%s", message, suffix)
self._last_error_log_at = now
self._error_count_since_log = 0
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
# Per-process forwarder. Multiple ``start_if_dashboard_available`` calls
# in the same process are idempotent (they re-use this instance).
_active: Optional[_HookForwarder] = None
_active_lock = threading.Lock()
def start_if_dashboard_available(
registry: "object",
*,
src: str = "unknown",
) -> Optional[_HookForwarder]:
"""Wire the forwarder into ``registry`` if a dashboard is reachable.
Idempotent: repeated calls in the same process are no-ops. Returns
the active forwarder if one is running (the same instance on every
subsequent call within the process), or ``None`` if the forwarder
was suppressed (no dashboard, or ``HERMES_HOOK_FORWARDER=0``).
The ``src`` argument is a short tag identifying the source process
(``"gateway"``, ``"tui"``, ``"subagent"``, ``"batch"``). It's
included in the wire frame for the dashboard's diagnostic logging
and the republished context's ``_forwarded_from`` field, so a
subscriber can tell which process originated each event.
Args:
registry: The :class:`gateway.hooks.HookRegistry` whose events
should be forwarded. Normally
:func:`gateway.hooks.get_default_registry`.
src: Source-process tag. See above.
Returns:
The active forwarder, or ``None`` if forwarding is suppressed.
"""
global _active
if _disabled_by_env():
_log.debug(
"[hook_forwarder] HERMES_HOOK_FORWARDER=0 — skipping start"
)
return None
discovery = _read_discovery_file()
if discovery is None:
_log.debug(
"[hook_forwarder] no dashboard.json — forwarder not started"
)
return None
with _active_lock:
if _active is not None:
# Already started in this process; nothing to do.
return _active
fwd = _HookForwarder(src=src)
try:
fwd.register(registry)
except Exception as e:
_log.warning(
"[hook_forwarder] failed to register handlers: %s", e
)
return None
fwd.start_worker()
_active = fwd
_log.info(
"[hook_forwarder] started for src=%s, forwarding %d namespaces",
src,
len(_FORWARDED_NAMESPACES),
)
return fwd
def stop() -> None:
"""Tear down the active forwarder. Idempotent.
Primarily for tests; production processes just rely on the daemon
thread dying with the process.
"""
global _active
with _active_lock:
if _active is None:
return
_active.unregister()
_active.stop_worker()
_active = None
def _reset_for_tests() -> None:
"""Test helper — clears active state without preserving its
side-effects (registered handlers etc.)."""
stop()
def is_active() -> bool:
"""Return whether a forwarder is currently registered in this process."""
return _active is not None

View File

@@ -2,19 +2,40 @@
Event Hook System
A lightweight event-driven system that fires handlers at key lifecycle points.
Hooks are discovered from ~/.hermes/hooks/ directories, each containing:
- HOOK.yaml (metadata: name, description, events list)
- handler.py (Python handler with async def handle(event_type, context))
Events:
- gateway:startup -- Gateway process starts
- session:start -- New session created (first message of a new session)
- session:end -- Session ends (user ran /new or /reset)
- session:reset -- Session reset completed (new session entry created)
- agent:start -- Agent begins processing a message
- agent:step -- Each turn in the tool-calling loop
- agent:end -- Agent finishes processing
- command:* -- Any slash command executed (wildcard match)
There are two ways to register a handler:
1. **File-system discovery** — drop a directory into ``~/.hermes/hooks/``
containing ``HOOK.yaml`` (metadata: name, description, events list) and
``handler.py`` (with ``def handle(event_type, context)``, sync or async).
These are loaded by :meth:`HookRegistry.discover_and_load` at gateway
startup.
2. **Programmatic registration** — call :meth:`HookRegistry.register` from
inside the process. Useful for plugins that ship their own bundled hooks
without expecting the user to maintain a ``~/.hermes/hooks/`` entry. Pairs
with :func:`get_default_registry` so plugins don't have to hold a registry
reference threaded through every call site.
Events fired today:
- ``gateway:startup`` — Gateway process starts
- ``session:start`` — New session created (first message of a new session)
- ``session:end`` — Session ends (user ran /new or /reset)
- ``session:reset`` — Session reset completed (new session entry created)
- ``agent:start`` — Agent begins processing a message
- ``agent:step`` — Each turn in the tool-calling loop
- ``agent:end`` — Agent finishes processing
- ``command:*`` — Any slash command executed (wildcard match)
- ``tui:<sub-event>`` — Any TUI gateway dispatch event mirrored to the
bus (``tui:tool.start``, ``tui:message.delta``,
``tui:reasoning.available``, etc.). Subscribe
with the full name for one event, or
``tui:*`` for all of them.
Wildcards match one colon-separated namespace level: a handler registered for
``foo:*`` fires for every ``foo:<anything>`` event, but not for
``bar:something``.
Errors in hooks are caught and logged but never block the main pipeline.
"""
@@ -32,6 +53,12 @@ from hermes_cli.config import get_hermes_home
HOOKS_DIR = get_hermes_home() / "hooks"
# Tracks handler functions we've already warned about for emit_sync's
# async-without-loop case, so each bad combination only logs once per
# process instead of flooding stderr on every event.
_ASYNC_NO_LOOP_WARNED: "set[int]" = set()
class HookRegistry:
"""
Discovers, loads, and fires event hooks.
@@ -52,6 +79,60 @@ class HookRegistry:
"""Return metadata about all loaded hooks."""
return list(self._loaded_hooks)
def register(
self,
event_type: str,
handler: Callable,
*,
name: Optional[str] = None,
) -> Callable[[], None]:
"""Programmatically register a handler for ``event_type``.
Intended for in-process plugins, tests, and built-in hooks. Pairs with
the file-system discovery path (HOOK.yaml + handler.py) — both share
the same dispatch and wildcard rules.
The handler signature matches discovered hooks: ``handle(event_type,
context)`` where ``handler`` may be sync or async.
Returns a callable that, when invoked, removes this specific handler
registration from the registry. Other handlers for the same event are
unaffected.
Args:
event_type: Event identifier such as ``agent:start`` or
``tui:tool.start``. May also be a wildcard like ``command:*``.
handler: Function or coroutine function to invoke when the
event fires.
name: Optional friendly name recorded alongside the
registration metadata for listing/debugging. Defaults to the
handler's ``__name__``.
Returns:
A no-arg callable that unregisters this handler when called.
"""
self._handlers.setdefault(event_type, []).append(handler)
meta = {
"name": name or getattr(handler, "__name__", "<anonymous>"),
"description": "(registered programmatically)",
"events": [event_type],
"path": "<programmatic>",
}
self._loaded_hooks.append(meta)
def _unregister() -> None:
try:
self._handlers.get(event_type, []).remove(handler)
except ValueError:
pass
try:
self._loaded_hooks.remove(meta)
except ValueError:
pass
return _unregister
def _register_builtin_hooks(self) -> None:
"""Register built-in hooks that are always active.
@@ -208,3 +289,146 @@ class HookRegistry:
except Exception as e:
print(f"[hooks] Error in handler for '{event_type}': {e}", flush=True)
return results
def emit_sync(
self,
event_type: str,
context: Optional[Dict[str, Any]] = None,
) -> None:
"""Fire handlers from a synchronous caller.
Companion to :meth:`emit` for hot-path callers that cannot await — most
notably ``tui_gateway/server.py:_emit``, which serves both async dispatch
paths and sync callback paths and must remain ``def`` (not ``async
def``).
Behavior:
- Sync handlers run immediately, in registration order. Exceptions are
caught and logged so a buggy handler can't break the host pipeline.
- Async handlers (coroutine functions) are scheduled via
``asyncio.ensure_future`` if a running event loop is available in the
current thread. If no loop is running, the handler is **skipped** and
a one-time warning is logged per handler — async handlers in a
purely sync process don't have a way to make forward progress.
Like :meth:`emit`, never raises and never blocks waiting on async
handlers — fire-and-forget for the async case.
Args:
event_type: The event identifier (e.g. ``tui:tool.start``).
context: Optional dict with event-specific data.
"""
if context is None:
context = {}
for fn in self._resolve_handlers(event_type):
try:
result = fn(event_type, context)
except Exception as e:
print(f"[hooks] Error in handler for '{event_type}': {e}", flush=True)
continue
if not asyncio.iscoroutine(result):
continue
# Coroutine returned — needs a loop to make progress.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop in this thread.
handler_id = id(fn)
if handler_id not in _ASYNC_NO_LOOP_WARNED:
_ASYNC_NO_LOOP_WARNED.add(handler_id)
handler_name = getattr(fn, "__name__", "<anonymous>")
print(
f"[hooks] Skipping async handler {handler_name!r} for "
f"'{event_type}' — emit_sync called with no running "
f"event loop. Subsequent skips for this handler are "
f"silent.",
flush=True,
)
# Close the coroutine to suppress "coroutine was never
# awaited" RuntimeWarning noise.
try:
result.close()
except Exception:
pass
continue
try:
# ensure_future schedules the coroutine on the loop and
# returns immediately. Exceptions inside the coroutine
# surface via the task's done callback (or asyncio's
# default exception handler) — we don't await here.
task = asyncio.ensure_future(result, loop=loop)
task.add_done_callback(_log_task_exception)
except Exception as e:
print(
f"[hooks] Failed to schedule async handler for "
f"'{event_type}': {e}",
flush=True,
)
def _log_task_exception(task: "asyncio.Task[Any]") -> None:
"""Surface exceptions from scheduled async hook handlers.
Without this callback, an exception inside a fire-and-forget handler
coroutine becomes "Task exception was never retrieved" noise from
asyncio's default exception handler at GC time. Logging it explicitly
keeps the failure mode visible and consistent with the sync path.
"""
if task.cancelled():
return
exc = task.exception()
if exc is not None:
print(f"[hooks] Async handler raised: {exc}", flush=True)
# ── Module-level default registry ──────────────────────────────────
#
# Plugins and in-process callers (TUI gateway's ``_emit`` etc.) need a
# stable place to find "the" registry without threading a reference
# through every API. The gateway process installs its own
# ``self.hooks`` instance as the default during startup so file-system
# discovery and built-in hooks share state with programmatic
# registrations. Other processes (TUI) lazily get their own default on
# first access and run ``discover_and_load()`` themselves.
_default_registry: Optional["HookRegistry"] = None
def get_default_registry() -> "HookRegistry":
"""Return the process-wide default :class:`HookRegistry`.
Lazily creates one (without auto-running discovery) on first call. Callers
that need file-system hook discovery should invoke
:meth:`HookRegistry.discover_and_load` themselves after first access — the
gateway already does this for the registry it installs as the default.
"""
global _default_registry
if _default_registry is None:
_default_registry = HookRegistry()
return _default_registry
def install_as_default(registry: "HookRegistry") -> None:
"""Install ``registry`` as the process-wide default.
Intended for the gateway and other long-lived hosts that want their own
:class:`HookRegistry` instance to be visible to in-process plugins through
:func:`get_default_registry`. Idempotent — installing the same registry
twice is a no-op; installing a different registry replaces the previous
default.
"""
global _default_registry
_default_registry = registry
def _reset_default_registry_for_tests() -> None:
"""Test helper — clears the cached default so each test starts fresh."""
global _default_registry
_default_registry = None
_ASYNC_NO_LOOP_WARNED.clear()

View File

@@ -1862,8 +1862,23 @@ class GatewayRunner:
self.pairing_store = PairingStore()
# Event hook system
from gateway.hooks import HookRegistry
from gateway.hooks import HookRegistry, install_as_default
self.hooks = HookRegistry()
# Expose this registry as the process-wide default so in-process
# plugins (and any other component that uses ``get_default_registry()``)
# share state with file-system-discovered hooks loaded into ``self.hooks``.
install_as_default(self.hooks)
# Cross-process delivery: start the hook forwarder if a
# dashboard is reachable. No-op when the dashboard isn't
# running, or when ``HERMES_HOOK_FORWARDER=0`` is set. See
# gateway/hook_forwarder.py + DESIGN-cross-process-hooks.md.
try:
from gateway import hook_forwarder
hook_forwarder.start_if_dashboard_available(self.hooks, src="gateway")
except Exception as e: # pragma: no cover — defensive
# Forwarder failure must never break gateway startup. Log
# and move on; dashboard plugins just won't see gateway events.
print(f"[gateway] hook forwarder start failed: {e}", flush=True)
# Per-chat voice reply mode: "off" | "voice_only" | "all"
self._voice_mode: Dict[str, str] = self._load_voice_modes()

View File

@@ -41,6 +41,12 @@ _GATE_PUBLIC_PREFIXES: tuple[str, ...] = (
"/ds-assets/",
"/fonts/",
"/fonts-terminal/",
# Cross-process hook delivery — bearer-token authenticated independently
# of the OAuth gate (see hermes_cli/hook_ingest.py + DESIGN-cross-
# process-hooks.md). Forwarders running outside any logged-in browser
# session need to reach these endpoints; the bearer token is the
# security boundary.
"/api/hooks/",
)

282
hermes_cli/hook_ingest.py Normal file
View File

@@ -0,0 +1,282 @@
"""Hook registry cross-process delivery — dashboard-side endpoints.
The companion to ``gateway/hook_forwarder.py``. Provides:
* ``write_dashboard_discovery_file()`` / ``remove_dashboard_discovery_file()`` —
drops/cleans ``$HERMES_HOME/dashboard.json`` with the bound dashboard URL and
a freshly-generated bearer token. Long-lived source processes (gateway,
TUI, subagents) read this file on startup to find the dashboard and
authenticate ingest POSTs.
* :func:`build_hook_router` — returns a FastAPI router with two endpoints:
- ``GET /api/hooks/health`` — unauthenticated reachability probe. The
forwarder GETs this before each round of POSTs so a downed dashboard
fails fast without spamming the ingest endpoint with retries.
- ``POST /api/hooks/ingest`` — accepts ``{event_type, context, src}``
frames from forwarders and republishes them via
``get_default_registry().emit_sync(event_type, context)``. Bearer
token from ``dashboard.json:hooks_ingest_token`` is the security
boundary, independent of the dashboard's session token / OAuth
gate. Loop-prevention: the republished context is stamped with
``_forwarded=True`` and ``_forwarded_from=<src>`` so source-side
forwarders skip these events if they ever round-trip back.
See ``DESIGN-cross-process-hooks.md`` for the full design rationale.
"""
from __future__ import annotations
import hmac
import json
import logging
import os
import secrets
import stat
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from fastapi import APIRouter, HTTPException, Request
from hermes_cli.config import get_hermes_home
_log = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Discovery file
# ---------------------------------------------------------------------------
#
# The dashboard writes this file on startup and removes it on shutdown.
# Forwarders running in non-dashboard processes read it to discover the
# dashboard URL + bearer token. Lives under HERMES_HOME so it's
# profile-aware; no /tmp fallback.
_DISCOVERY_FILENAME = "dashboard.json"
# Token used by forwarders to authenticate POST /api/hooks/ingest.
# Generated fresh on each dashboard startup; lives only in memory + the
# 0600-mode discovery file. Stored at module level so the ingest
# endpoint can compare against it without re-reading the file on every
# request.
_HOOKS_INGEST_TOKEN: str = ""
def _discovery_path() -> Path:
"""Return the absolute path of ``$HERMES_HOME/dashboard.json``."""
return get_hermes_home() / _DISCOVERY_FILENAME
def write_dashboard_discovery_file(host: str, port: int) -> str:
"""Generate a fresh hooks-ingest token and write the discovery file.
The discovery file is written atomically (write to ``.tmp``, rename
to final path) so a forwarder that reads concurrently never sees a
partially-written file. The file mode is ``0600`` — owner-only —
so a same-host non-root user can't read the token without already
having compromised the dashboard user's account.
Idempotent in the trivial sense: calling twice in the same process
rotates the token and overwrites the file. Not thread-safe; should
only be called from the dashboard's startup path.
Args:
host: The host the dashboard bound to (``127.0.0.1``, ``0.0.0.0``,
a specific interface, …). Written verbatim into the discovery
file so forwarders dial the right address.
port: The port the dashboard bound to.
Returns:
The freshly-generated bearer token (also stored module-locally
so :func:`_hook_ingest_auth_ok` can validate POSTs).
"""
global _HOOKS_INGEST_TOKEN
token = secrets.token_urlsafe(32)
_HOOKS_INGEST_TOKEN = token
payload = {
"url": f"http://{host}:{port}",
"hooks_ingest_token": token,
"pid": os.getpid(),
"started_at": datetime.now(timezone.utc).isoformat(),
}
path = _discovery_path()
path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = path.with_suffix(".json.tmp")
try:
tmp_path.write_text(json.dumps(payload, indent=2), encoding="utf-8")
# 0600 — owner read+write, nothing for group or other. Same
# posture as ~/.hermes/auth.json.
os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR)
tmp_path.replace(path)
except OSError as exc:
_log.warning(
"[hooks-ingest] failed to write dashboard discovery file at %s: %s",
path,
exc,
)
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
raise
_log.debug("[hooks-ingest] wrote %s", path)
return token
def remove_dashboard_discovery_file() -> None:
"""Delete the discovery file. Idempotent; safe to call from atexit."""
global _HOOKS_INGEST_TOKEN
try:
_discovery_path().unlink(missing_ok=True)
except OSError:
pass
_HOOKS_INGEST_TOKEN = ""
_log.debug("[hooks-ingest] removed discovery file")
# ---------------------------------------------------------------------------
# Auth helper
# ---------------------------------------------------------------------------
def _hook_ingest_auth_ok(request: Request) -> bool:
"""True if the request carries a valid hooks-ingest bearer token.
The token is independent of the dashboard session token / OAuth
cookie; only forwarders that have read ``dashboard.json`` know it.
Constant-time comparison via :func:`hmac.compare_digest`.
"""
if not _HOOKS_INGEST_TOKEN:
# No token yet — discovery file not written; refuse everything.
return False
auth = request.headers.get("authorization", "")
expected = f"Bearer {_HOOKS_INGEST_TOKEN}"
return hmac.compare_digest(auth.encode(), expected.encode())
# ---------------------------------------------------------------------------
# FastAPI router
# ---------------------------------------------------------------------------
def build_hook_router() -> APIRouter:
"""Return a FastAPI router with ``/health`` and ``/ingest`` endpoints.
Caller mounts at ``/api/hooks`` to get the documented endpoints
(``GET /api/hooks/health`` and ``POST /api/hooks/ingest``).
"""
router = APIRouter()
@router.get("/health")
async def hook_health() -> dict:
"""Unauthenticated reachability probe for the forwarder."""
return {"ok": True}
@router.post("/ingest")
async def hook_ingest(request: Request) -> dict:
"""Republish a forwarded event on the dashboard's local registry.
Wire shape (POST body)::
{
"event_type": "agent:start",
"context": {"platform": "telegram", "user_id": "u-1", ...},
"src": "gateway"
}
The context is republished with ``_forwarded=True`` and
``_forwarded_from=<src>`` stamped on it so source-side
forwarders skip the event if it ever round-trips back (closing
the loop).
Returns ``{"ok": True}`` on success, raises 401 on missing/bad
token, 400 on malformed body.
``emit_sync`` is the right entry point: handlers run
synchronously on the request thread (fast, push-to-queue style
for typical plugin handlers); async handlers are scheduled on
the running event loop via the existing ``asyncio.ensure_future``
path inside ``emit_sync``.
"""
if not _hook_ingest_auth_ok(request):
raise HTTPException(status_code=401, detail="Unauthorized")
try:
body = await request.json()
except Exception as exc:
raise HTTPException(
status_code=400, detail=f"invalid JSON body: {exc}"
) from exc
if not isinstance(body, dict):
raise HTTPException(status_code=400, detail="body must be a JSON object")
event_type = body.get("event_type")
if not isinstance(event_type, str) or not event_type:
raise HTTPException(
status_code=400, detail="missing or empty 'event_type'"
)
context = body.get("context") or {}
if not isinstance(context, dict):
raise HTTPException(
status_code=400, detail="'context' must be a JSON object"
)
src = body.get("src", "?")
if not isinstance(src, str):
src = "?"
# Stamp the context with forwarding metadata so:
# 1. Source-side forwarders skip this event if it ever round-trips
# back to them (loop prevention).
# 2. Subscribers can filter forwarded-vs-original on
# ``context["_forwarded"]`` (useful for hooks that fire in
# every process and don't want 2x firing).
context = {**context, "_forwarded": True, "_forwarded_from": src}
# Lazy import to avoid a hard dependency on gateway.hooks at
# module-load time (web_server can be imported in contexts
# where gateway.hooks isn't ready).
from gateway.hooks import get_default_registry
try:
get_default_registry().emit_sync(event_type, context)
except Exception as exc:
# emit_sync swallows handler exceptions internally — if we
# see one here it's a registry-level bug. Log it but still
# return 200 so the forwarder doesn't retry-storm.
_log.warning(
"[hooks-ingest] emit_sync raised for %s: %s", event_type, exc
)
return {"ok": True}
return router
# ---------------------------------------------------------------------------
# Test helper
# ---------------------------------------------------------------------------
def _reset_for_tests() -> None:
"""Clear the cached token + remove any leftover discovery file."""
global _HOOKS_INGEST_TOKEN
_HOOKS_INGEST_TOKEN = ""
remove_dashboard_discovery_file()
def get_current_token_for_tests() -> str:
"""Read the currently-active hooks-ingest token.
Test-only accessor; do not call from production code.
"""
return _HOOKS_INGEST_TOKEN

View File

@@ -120,6 +120,12 @@ _PUBLIC_API_PATHS: frozenset = frozenset({
"/api/model/info",
"/api/dashboard/themes",
"/api/dashboard/plugins",
# Cross-process hook delivery — these have their own auth model
# (bearer token from dashboard.json, generated fresh each dashboard
# startup). The session-token check below is bypassed; auth is
# handled inside the route handlers themselves.
"/api/hooks/health",
"/api/hooks/ingest",
})
@@ -4815,6 +4821,13 @@ def _mount_plugin_api_routes():
# Mount plugin API routes before the SPA catch-all.
_mount_plugin_api_routes()
# Mount the cross-process hook ingest router (POST /api/hooks/ingest +
# GET /api/hooks/health). These endpoints have their own bearer-token
# auth (independent of _SESSION_TOKEN / the OAuth gate) — see
# hermes_cli/hook_ingest.py and DESIGN-cross-process-hooks.md.
from hermes_cli.hook_ingest import build_hook_router as _build_hook_router # noqa: E402
app.include_router(_build_hook_router(), prefix="/api/hooks")
# Mount the dashboard auth routes (/login, /auth/*, /api/auth/*) before the
# SPA catch-all so /{full_path:path} doesn't swallow them. These are
# always mounted — the gate middleware decides whether to enforce auth,
@@ -4910,6 +4923,29 @@ def start_server(
app.state.bound_host = host
app.state.bound_port = port
# Cross-process hook delivery: drop the discovery file so source
# processes (gateway, TUI, subagents) can find this dashboard and
# authenticate POSTs to /api/hooks/ingest. The file is removed via
# atexit so a clean shutdown stops orphan forwarders from trying to
# POST to a port that's now closed.
import atexit
from hermes_cli.hook_ingest import (
remove_dashboard_discovery_file,
write_dashboard_discovery_file,
)
try:
write_dashboard_discovery_file(host, port)
atexit.register(remove_dashboard_discovery_file)
except OSError as exc:
# Non-fatal — the dashboard still works without cross-process
# hook delivery. Warn so the operator knows the orb (or any
# other event-driven plugin) won't see gateway/TUI events.
_log.warning(
"Failed to write hook discovery file: %s. Cross-process hook "
"delivery to dashboard plugins will be unavailable.",
exc,
)
if open_browser:
import webbrowser

View File

@@ -0,0 +1,405 @@
"""Unit tests for ``gateway/hook_forwarder.py``.
Covers:
* No-op behavior when discovery file absent / env var disables
* Idempotent ``start_if_dashboard_available`` (repeat calls re-use same forwarder)
* Handler registration on every forwarded namespace
* Loop prevention via ``_forwarded`` context flag
* Queue overflow drops oldest, enqueues newest
* ``stop()`` and ``_reset_for_tests`` are clean and idempotent
* Error-log rate limiting
The HTTP-POST path is exercised in the integration test
(``test_hook_forwarder_integration.py``) where a real FastAPI app
stands in for the dashboard. Unit tests here focus on the
registry-side behaviors that don't need an HTTP server.
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from unittest.mock import patch
import pytest
from gateway import hook_forwarder
from gateway.hooks import HookRegistry
@pytest.fixture(autouse=True)
def _clean_forwarder():
"""Reset forwarder state + env between tests so they're independent."""
hook_forwarder._reset_for_tests()
# Make sure HERMES_HOOK_FORWARDER isn't sticky from a previous test.
saved = os.environ.pop("HERMES_HOOK_FORWARDER", None)
yield
hook_forwarder._reset_for_tests()
if saved is not None:
os.environ["HERMES_HOOK_FORWARDER"] = saved
def _write_discovery(tmp_path: Path, *, url: str = "http://127.0.0.1:9119") -> Path:
"""Write a valid dashboard.json into tmp_path and return its path."""
discovery = tmp_path / "dashboard.json"
discovery.write_text(
json.dumps(
{
"url": url,
"hooks_ingest_token": "test-token-abc",
"pid": 99999,
"started_at": "2026-05-29T12:00:00Z",
}
)
)
return discovery
# ---------------------------------------------------------------------------
# No-op paths
# ---------------------------------------------------------------------------
class TestStartIfDashboardAvailable:
def test_no_dashboard_json_returns_none(self, tmp_path, monkeypatch):
"""Missing discovery file ⇒ forwarder doesn't start."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
reg = HookRegistry()
result = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
assert result is None
assert not hook_forwarder.is_active()
# No handlers registered on the registry.
assert reg._handlers == {}
def test_env_disabled_returns_none_even_with_dashboard(
self, tmp_path, monkeypatch
):
"""``HERMES_HOOK_FORWARDER=0`` short-circuits even when discovery
is otherwise valid."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
monkeypatch.setenv("HERMES_HOOK_FORWARDER", "0")
reg = HookRegistry()
result = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
assert result is None
assert reg._handlers == {}
@pytest.mark.parametrize("value", ["0", "false", "FALSE", "No", "off"])
def test_env_disabled_accepts_common_falsy_values(
self, tmp_path, monkeypatch, value
):
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
monkeypatch.setenv("HERMES_HOOK_FORWARDER", value)
reg = HookRegistry()
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
def test_malformed_discovery_json_returns_none(self, tmp_path, monkeypatch):
"""Garbage in ``dashboard.json`` ⇒ forwarder doesn't start."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
(tmp_path / "dashboard.json").write_text("{not json")
reg = HookRegistry()
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
def test_discovery_missing_required_keys_returns_none(
self, tmp_path, monkeypatch
):
"""``dashboard.json`` without ``url`` or ``hooks_ingest_token`` ⇒
no forwarder."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
(tmp_path / "dashboard.json").write_text(
json.dumps({"pid": 123}) # missing url + token
)
reg = HookRegistry()
assert hook_forwarder.start_if_dashboard_available(reg, src="gateway") is None
def test_repeated_calls_are_idempotent(self, tmp_path, monkeypatch):
"""Two ``start_if_dashboard_available`` in the same process re-use
the same forwarder instance and don't double-register handlers."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
reg = HookRegistry()
first = hook_forwarder.start_if_dashboard_available(reg, src="gateway")
try:
second = hook_forwarder.start_if_dashboard_available(reg, src="tui")
assert first is not None
assert second is first # same instance
# And handlers registered exactly once per namespace.
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
assert len(reg._handlers[pattern]) == 1
finally:
hook_forwarder.stop()
# ---------------------------------------------------------------------------
# Registration coverage
# ---------------------------------------------------------------------------
class TestRegistration:
def test_registers_every_forwarded_namespace(self, tmp_path, monkeypatch):
"""Forwarder subscribes to all five canonical namespaces."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
reg = HookRegistry()
try:
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
for pattern in (
"tui:*",
"agent:*",
"session:*",
"command:*",
"gateway:*",
):
assert pattern in reg._handlers
assert len(reg._handlers[pattern]) == 1
finally:
hook_forwarder.stop()
def test_stop_unregisters_all_handlers(self, tmp_path, monkeypatch):
"""``stop()`` removes every handler the forwarder installed."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
assert pattern in reg._handlers
hook_forwarder.stop()
for pattern in hook_forwarder._FORWARDED_NAMESPACES:
assert reg._handlers.get(pattern, []) == []
def test_stop_is_idempotent(self, tmp_path, monkeypatch):
"""Calling ``stop()`` twice doesn't raise."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(reg, src="gateway")
hook_forwarder.stop()
hook_forwarder.stop() # second call is no-op
# ---------------------------------------------------------------------------
# Handler behavior — loop prevention + queue management
# ---------------------------------------------------------------------------
class TestHandlerBehavior:
"""Tests the handler in isolation — skipping the worker thread by
constructing the ``_HookForwarder`` directly so we can inspect the
queue without racing on a daemon thread."""
def test_handler_enqueues_event(self):
fwd = hook_forwarder._HookForwarder(src="gateway")
fwd._handler("agent:start", {"platform": "telegram", "user_id": "u-1"})
assert fwd._queue.qsize() == 1
frame = fwd._queue.get_nowait()
assert frame == {
"event_type": "agent:start",
"context": {"platform": "telegram", "user_id": "u-1"},
"src": "gateway",
}
def test_handler_skips_forwarded_events(self):
"""The ``_forwarded=True`` flag closes the source ↔ dashboard loop."""
fwd = hook_forwarder._HookForwarder(src="gateway")
# This came from the ingest endpoint; must not be shipped back.
fwd._handler(
"agent:start",
{"platform": "telegram", "_forwarded": True, "_forwarded_from": "gateway"},
)
assert fwd._queue.qsize() == 0
def test_handler_does_not_skip_explicit_false_forwarded(self):
"""Only the literal ``True`` triggers loop prevention; a ``False``
or absent flag is fine."""
fwd = hook_forwarder._HookForwarder(src="gateway")
fwd._handler("agent:start", {"_forwarded": False})
fwd._handler("agent:start", {"_forwarded": None})
assert fwd._queue.qsize() == 2
def test_handler_drops_oldest_on_queue_full(self):
"""At ``_QUEUE_MAX`` capacity, oldest frame is evicted to make
room for newest."""
fwd = hook_forwarder._HookForwarder(src="gateway")
# Replace the queue with a tiny one so we can exercise overflow
# without filling 1024 slots.
from queue import Queue as _Queue
fwd._queue = _Queue(maxsize=3)
for i in range(5):
fwd._handler("agent:step", {"iteration": i})
# Only the last 3 should survive (iterations 2, 3, 4).
iterations = []
while not fwd._queue.empty():
iterations.append(fwd._queue.get_nowait()["context"]["iteration"])
assert iterations == [2, 3, 4]
def test_handler_never_raises_on_pathological_queue(self):
"""Even if the queue is wedged (both put_nowait calls fail), the
handler returns cleanly — never propagates to the publisher."""
fwd = hook_forwarder._HookForwarder(src="gateway")
from queue import Queue as _Queue, Full as _Full
# Build a queue stub whose put_nowait always raises Full.
class _StubbedFull(_Queue):
def __init__(self):
super().__init__(maxsize=1)
def put_nowait(self, item):
raise _Full()
def get_nowait(self):
from queue import Empty
raise Empty()
fwd._queue = _StubbedFull() # type: ignore[assignment]
# Should not raise.
fwd._handler("agent:step", {"iteration": 1})
# ---------------------------------------------------------------------------
# Discovery refresh / probe behavior — exercised without starting the worker
# ---------------------------------------------------------------------------
class TestDiscoveryRefresh:
def test_refresh_picks_up_new_token(self, tmp_path, monkeypatch):
"""A dashboard restart writes a new token; the next probe must
pick it up."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
discovery_path = tmp_path / "dashboard.json"
discovery_path.write_text(
json.dumps({"url": "http://127.0.0.1:9119", "hooks_ingest_token": "first"})
)
fwd = hook_forwarder._HookForwarder(src="gateway")
# Stub the http client's get() to always return 200 — the
# discovery file is what we want to test here, not the probe.
class _Stub200:
status_code = 200
class _StubClient:
def get(self, *a, **kw):
return _Stub200()
fwd._refresh_discovery(_StubClient())
assert fwd._discovery is not None
assert fwd._discovery["hooks_ingest_token"] == "first"
# Now the dashboard "restarts" with a new token.
discovery_path.write_text(
json.dumps({"url": "http://127.0.0.1:9119", "hooks_ingest_token": "second"})
)
fwd._refresh_discovery(_StubClient())
assert fwd._discovery["hooks_ingest_token"] == "second"
def test_refresh_invalidates_on_probe_failure(self, tmp_path, monkeypatch):
"""If the health probe fails, discovery is cleared even though
the file still exists."""
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
fwd = hook_forwarder._HookForwarder(src="gateway")
# First probe succeeds.
class _Stub200:
status_code = 200
class _OkClient:
def get(self, *a, **kw):
return _Stub200()
fwd._refresh_discovery(_OkClient())
assert fwd._discovery is not None
# Now make the probe fail — connection refused, dashboard gone.
class _FailingClient:
def get(self, *a, **kw):
raise ConnectionError("refused")
fwd._refresh_discovery(_FailingClient())
assert fwd._discovery is None
def test_refresh_invalidates_on_non_200_response(self, tmp_path, monkeypatch):
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
_write_discovery(tmp_path)
fwd = hook_forwarder._HookForwarder(src="gateway")
class _Stub503:
status_code = 503
class _BadClient:
def get(self, *a, **kw):
return _Stub503()
fwd._refresh_discovery(_BadClient())
assert fwd._discovery is None
# ---------------------------------------------------------------------------
# Error-log rate limiting
# ---------------------------------------------------------------------------
class TestErrorRateLimit:
def test_first_error_logs_immediately(self, caplog):
fwd = hook_forwarder._HookForwarder(src="gateway")
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
fwd._log_error("first error")
assert any("first error" in r.message for r in caplog.records)
def test_subsequent_errors_suppressed_until_window_elapses(self, caplog):
fwd = hook_forwarder._HookForwarder(src="gateway")
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
fwd._log_error("first error")
before = len(caplog.records)
# Many follow-up errors within the same minute — all suppressed.
for _ in range(10):
fwd._log_error("nth error")
assert len(caplog.records) == before
def test_suppression_count_surfaces_on_next_log(self, caplog, monkeypatch):
fwd = hook_forwarder._HookForwarder(src="gateway")
caplog.set_level("WARNING", logger="gateway.hook_forwarder")
fwd._log_error("first error")
for _ in range(5):
fwd._log_error("suppressed")
# Simulate the rate-limit window elapsing.
fwd._last_error_log_at = time.monotonic() - hook_forwarder._ERROR_LOG_INTERVAL_S - 1
fwd._log_error("after window")
# Last record should include the "5 similar errors suppressed" suffix.
last = caplog.records[-1].message
assert "after window" in last
assert "5 similar errors suppressed" in last

View File

@@ -316,3 +316,256 @@ class TestEmitCollect:
await reg.emit_collect("agent:start") # no context arg
assert captured == [("agent:start", {})]
class TestRegister:
"""Tests for the programmatic ``HookRegistry.register`` API."""
def test_registers_handler(self):
reg = HookRegistry()
calls: list = []
def handler(event_type, context):
calls.append((event_type, context))
reg.register("agent:start", handler)
assert "agent:start" in reg._handlers
assert reg._handlers["agent:start"] == [handler]
def test_records_metadata_in_loaded_hooks(self):
reg = HookRegistry()
def my_handler(_e, _c):
return None
reg.register("tui:tool.start", my_handler)
assert len(reg.loaded_hooks) == 1
meta = reg.loaded_hooks[0]
assert meta["name"] == "my_handler"
assert meta["events"] == ["tui:tool.start"]
assert meta["path"] == "<programmatic>"
def test_custom_name_override(self):
reg = HookRegistry()
reg.register("agent:end", lambda _e, _c: None, name="orb-collector")
assert reg.loaded_hooks[0]["name"] == "orb-collector"
def test_returns_working_unregister(self):
reg = HookRegistry()
def handler(_e, _c):
return None
unregister = reg.register("agent:start", handler)
assert handler in reg._handlers["agent:start"]
assert len(reg.loaded_hooks) == 1
unregister()
assert handler not in reg._handlers["agent:start"]
assert len(reg.loaded_hooks) == 0
def test_unregister_is_idempotent(self):
reg = HookRegistry()
unregister = reg.register("agent:start", lambda _e, _c: None)
unregister()
# Second call should not raise.
unregister()
def test_multiple_handlers_same_event(self):
reg = HookRegistry()
calls: list = []
def h1(_e, _c):
calls.append("h1")
def h2(_e, _c):
calls.append("h2")
reg.register("agent:start", h1)
reg.register("agent:start", h2)
assert reg._handlers["agent:start"] == [h1, h2]
assert len(reg.loaded_hooks) == 2
def test_unregister_does_not_affect_other_handlers(self):
reg = HookRegistry()
def h1(_e, _c):
return None
def h2(_e, _c):
return None
unreg1 = reg.register("agent:start", h1)
reg.register("agent:start", h2)
unreg1()
assert h1 not in reg._handlers["agent:start"]
assert h2 in reg._handlers["agent:start"]
class TestEmitSync:
"""Tests for the synchronous emit path used from hot non-async callers."""
def test_fires_sync_handler(self):
reg = HookRegistry()
calls: list = []
reg.register(
"tui:tool.start",
lambda e, c: calls.append((e, c)),
)
reg.emit_sync("tui:tool.start", {"session_id": "s1", "payload": {"name": "foo"}})
assert calls == [("tui:tool.start", {"session_id": "s1", "payload": {"name": "foo"}})]
def test_default_context_when_none(self):
reg = HookRegistry()
seen: list = []
reg.register("evt:x", lambda _e, c: seen.append(c))
reg.emit_sync("evt:x") # no context arg
assert seen == [{}]
def test_sync_handler_exception_isolated(self):
reg = HookRegistry()
calls: list = []
def bad(_e, _c):
raise RuntimeError("boom")
def good(_e, _c):
calls.append("good")
reg.register("evt:x", bad)
reg.register("evt:x", good)
# Must not raise; second handler still fires.
reg.emit_sync("evt:x", {})
assert calls == ["good"]
def test_wildcard_matching(self):
reg = HookRegistry()
calls: list = []
reg.register("tui:*", lambda e, _c: calls.append(e))
reg.register("tui:tool.start", lambda e, _c: calls.append(f"exact:{e}"))
reg.emit_sync("tui:tool.start", {})
# Exact match first, then wildcard.
assert calls == ["exact:tui:tool.start", "tui:tool.start"]
def test_no_handlers_does_not_raise(self):
reg = HookRegistry()
# Just shouldn't blow up.
reg.emit_sync("nobody:listening", {"foo": "bar"})
def test_async_handler_skipped_with_no_loop(self, capsys):
from gateway.hooks import _reset_default_registry_for_tests
_reset_default_registry_for_tests()
reg = HookRegistry()
marker: list = []
async def async_handler(_e, _c):
marker.append("ran")
reg.register("evt:x", async_handler, name="async_handler_unique")
# First emit logs a warning and skips.
reg.emit_sync("evt:x", {})
captured = capsys.readouterr()
# The warning uses the handler's __name__ for diagnostic clarity.
assert "async_handler" in captured.out
assert "Skipping async handler" in captured.out
assert marker == [] # async handler never ran
# Second emit is silent (warning suppressed).
reg.emit_sync("evt:x", {})
captured = capsys.readouterr()
assert captured.out == ""
assert marker == []
def test_async_handler_scheduled_when_loop_running(self):
import asyncio as _asyncio
reg = HookRegistry()
marker: list = []
async def async_handler(_e, _c):
marker.append("ran")
reg.register("evt:x", async_handler)
async def driver():
reg.emit_sync("evt:x", {})
# Yield to the loop so the scheduled task can run.
await _asyncio.sleep(0)
await _asyncio.sleep(0)
_asyncio.run(driver())
assert marker == ["ran"]
class TestDefaultRegistry:
"""Tests for the module-level default-registry singleton."""
def test_get_default_returns_same_instance(self):
from gateway.hooks import (
_reset_default_registry_for_tests,
get_default_registry,
)
_reset_default_registry_for_tests()
first = get_default_registry()
second = get_default_registry()
assert first is second
def test_install_as_default_replaces(self):
from gateway.hooks import (
_reset_default_registry_for_tests,
get_default_registry,
install_as_default,
)
_reset_default_registry_for_tests()
custom = HookRegistry()
install_as_default(custom)
assert get_default_registry() is custom
def test_install_then_get_picks_up_handlers(self):
from gateway.hooks import (
_reset_default_registry_for_tests,
get_default_registry,
install_as_default,
)
_reset_default_registry_for_tests()
custom = HookRegistry()
install_as_default(custom)
calls: list = []
get_default_registry().register("agent:x", lambda _e, _c: calls.append("hit"))
# Same handler is visible on the installed instance.
custom.emit_sync("agent:x", {})
assert calls == ["hit"]

View File

@@ -0,0 +1,406 @@
"""Unit tests for ``hermes_cli/hook_ingest.py``.
Covers:
* Discovery file write/read/remove lifecycle
* File permissions (0600) and atomic replace semantics
* Auth gating on the ingest endpoint (401 without token, 200 with)
* Body validation (400 on bad shape, missing keys, non-dict context)
* Forwarded events are stamped with ``_forwarded=True`` and
``_forwarded_from=<src>`` and republished via ``emit_sync``
* Health endpoint returns 200 unconditionally and is unauthenticated
* `--insecure` mode does not disable the ingest endpoint (the bearer
token is the security boundary regardless of bind address)
"""
from __future__ import annotations
import json
import os
import stat
from pathlib import Path
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from gateway.hooks import (
HookRegistry,
_reset_default_registry_for_tests,
get_default_registry,
install_as_default,
)
from hermes_cli import hook_ingest
@pytest.fixture(autouse=True)
def _isolate(tmp_path, monkeypatch):
"""Each test gets a fresh hermes home + fresh registry singleton +
fresh ingest-token state. Without this the module-level
``_HOOKS_INGEST_TOKEN`` from one test would leak into the next."""
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: tmp_path)
hook_ingest._reset_for_tests()
_reset_default_registry_for_tests()
yield
hook_ingest._reset_for_tests()
_reset_default_registry_for_tests()
@pytest.fixture
def fresh_registry():
"""Install a clean ``HookRegistry`` as the process default and return it."""
reg = HookRegistry()
install_as_default(reg)
return reg
@pytest.fixture
def hook_app(fresh_registry):
"""A FastAPI app with the hook router mounted at /api/hooks."""
app = FastAPI()
app.include_router(hook_ingest.build_hook_router(), prefix="/api/hooks")
return app
@pytest.fixture
def client(hook_app):
return TestClient(hook_app)
# ---------------------------------------------------------------------------
# Discovery file lifecycle
# ---------------------------------------------------------------------------
class TestDiscoveryFile:
def test_write_creates_file_with_expected_shape(self, tmp_path):
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
path = tmp_path / "dashboard.json"
assert path.exists()
data = json.loads(path.read_text())
assert data["url"] == "http://127.0.0.1:9119"
assert data["hooks_ingest_token"] == token
assert isinstance(data["pid"], int)
assert "started_at" in data
# ISO-8601 timestamp.
assert "T" in data["started_at"]
def test_write_returns_fresh_token_each_call(self, tmp_path):
first = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
second = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
assert first != second # 32-byte urlsafe tokens collide ~never
def test_write_records_non_loopback_bind(self, tmp_path):
"""``--insecure`` mode binds non-loopback; the discovery file
must reflect that so forwarders dial the right address."""
hook_ingest.write_dashboard_discovery_file("0.0.0.0", 9119)
data = json.loads((tmp_path / "dashboard.json").read_text())
assert data["url"] == "http://0.0.0.0:9119"
def test_write_creates_parent_dir_if_missing(self, monkeypatch, tmp_path):
"""``$HERMES_HOME`` might not exist yet on first startup."""
nested = tmp_path / "new" / "hermes"
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: nested)
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
assert (nested / "dashboard.json").exists()
def test_file_mode_is_0600(self, tmp_path):
"""Discovery file holds a bearer token in cleartext; must be
owner-only readable."""
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
path = tmp_path / "dashboard.json"
mode = stat.S_IMODE(path.stat().st_mode)
# 0o600 = read+write for owner, nothing for group/other.
assert mode == 0o600, f"expected 0600, got {oct(mode)}"
def test_remove_deletes_file(self, tmp_path):
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
assert (tmp_path / "dashboard.json").exists()
hook_ingest.remove_dashboard_discovery_file()
assert not (tmp_path / "dashboard.json").exists()
def test_remove_is_idempotent(self):
# Never written.
hook_ingest.remove_dashboard_discovery_file()
# And again — must not raise.
hook_ingest.remove_dashboard_discovery_file()
def test_remove_clears_in_memory_token(self):
"""Once removed, no one can authenticate to ingest anymore."""
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
assert hook_ingest.get_current_token_for_tests() != ""
hook_ingest.remove_dashboard_discovery_file()
assert hook_ingest.get_current_token_for_tests() == ""
# ---------------------------------------------------------------------------
# /health endpoint
# ---------------------------------------------------------------------------
class TestHealthEndpoint:
def test_health_returns_200_unauthenticated(self, client):
r = client.get("/api/hooks/health")
assert r.status_code == 200
assert r.json() == {"ok": True}
def test_health_returns_200_with_no_token_set(self, client):
"""Health probe must succeed even before any token has been
written (forwarder probes before discovery is established)."""
# No write_dashboard_discovery_file call — token stays "".
r = client.get("/api/hooks/health")
assert r.status_code == 200
# ---------------------------------------------------------------------------
# /ingest endpoint — auth
# ---------------------------------------------------------------------------
class TestIngestAuth:
def test_ingest_401_without_token(self, client):
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
del token # noqa: F841 — we intentionally don't send it.
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
)
assert r.status_code == 401
def test_ingest_401_with_wrong_token(self, client):
hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
headers={"Authorization": "Bearer wrong"},
)
assert r.status_code == 401
def test_ingest_401_when_no_token_set(self, client):
"""Discovery file never written ⇒ ingest refuses everything."""
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
headers={"Authorization": "Bearer something"},
)
assert r.status_code == 401
def test_ingest_200_with_valid_token(self, client):
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert r.json() == {"ok": True}
def test_ingest_works_for_non_loopback_bind(self, client):
"""``--insecure`` doesn't disable ingest. The bearer token IS
the security boundary regardless of bind address (see
DESIGN-cross-process-hooks.md "Bind-address independence")."""
token = hook_ingest.write_dashboard_discovery_file("0.0.0.0", 9119)
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": "gateway"},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
# ---------------------------------------------------------------------------
# /ingest endpoint — body validation
# ---------------------------------------------------------------------------
class TestIngestBodyValidation:
@pytest.fixture
def auth_headers(self):
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
return {"Authorization": f"Bearer {token}"}
def test_400_on_non_json_body(self, client, auth_headers):
# FastAPI itself produces 422 for parse failures, but we explicitly
# request JSON parsing so the response should be 400 with our
# message — actually, request.json() on bad JSON raises and we
# convert it to 400.
r = client.post(
"/api/hooks/ingest",
content=b"not json",
headers={**auth_headers, "Content-Type": "application/json"},
)
assert r.status_code == 400
def test_400_on_non_dict_body(self, client, auth_headers):
r = client.post(
"/api/hooks/ingest",
json=["this", "is", "an", "array"],
headers=auth_headers,
)
assert r.status_code == 400
assert "object" in r.json()["detail"]
def test_400_on_missing_event_type(self, client, auth_headers):
r = client.post(
"/api/hooks/ingest",
json={"context": {}, "src": "gateway"},
headers=auth_headers,
)
assert r.status_code == 400
assert "event_type" in r.json()["detail"]
def test_400_on_empty_event_type(self, client, auth_headers):
r = client.post(
"/api/hooks/ingest",
json={"event_type": "", "context": {}, "src": "gateway"},
headers=auth_headers,
)
assert r.status_code == 400
def test_400_on_non_dict_context(self, client, auth_headers):
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": "string", "src": "gateway"},
headers=auth_headers,
)
assert r.status_code == 400
def test_200_with_missing_optional_fields(self, client, auth_headers):
"""src and context are optional; default to "?" and {}."""
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start"},
headers=auth_headers,
)
assert r.status_code == 200
# ---------------------------------------------------------------------------
# /ingest endpoint — republish behavior
# ---------------------------------------------------------------------------
class TestIngestRepublish:
def test_republishes_via_emit_sync(self, client, fresh_registry):
captured: list = []
fresh_registry.register("agent:start", lambda e, c: captured.append((e, c)))
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
r = client.post(
"/api/hooks/ingest",
json={
"event_type": "agent:start",
"context": {"platform": "telegram", "user_id": "u-1"},
"src": "gateway",
},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert len(captured) == 1
event, ctx = captured[0]
assert event == "agent:start"
# Original context keys preserved.
assert ctx["platform"] == "telegram"
assert ctx["user_id"] == "u-1"
# Forwarding metadata stamped.
assert ctx["_forwarded"] is True
assert ctx["_forwarded_from"] == "gateway"
def test_forwarded_stamp_overrides_caller_provided_value(
self, client, fresh_registry
):
"""If a malicious/buggy caller tries to set _forwarded=False to
smuggle the event past loop prevention, the endpoint overrides
it. This isn't a security boundary (the auth gate is) but a
defensive sanity check."""
captured: list = []
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
client.post(
"/api/hooks/ingest",
json={
"event_type": "agent:start",
"context": {"_forwarded": False},
"src": "gateway",
},
headers={"Authorization": f"Bearer {token}"},
)
assert captured[0]["_forwarded"] is True
def test_wildcard_handlers_see_forwarded_events(self, client, fresh_registry):
"""A subscriber to ``tui:*`` sees forwarded ``tui:tool.start`` events."""
captured: list = []
fresh_registry.register("tui:*", lambda e, c: captured.append(e))
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
client.post(
"/api/hooks/ingest",
json={
"event_type": "tui:tool.start",
"context": {"session_id": "s-1", "payload": {"name": "search_files"}},
"src": "tui",
},
headers={"Authorization": f"Bearer {token}"},
)
assert captured == ["tui:tool.start"]
def test_src_defaults_to_question_mark(self, client, fresh_registry):
captured: list = []
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}},
headers={"Authorization": f"Bearer {token}"},
)
assert captured[0]["_forwarded_from"] == "?"
def test_non_string_src_is_normalized(self, client, fresh_registry):
"""If something weird sends src=123, we don't propagate the bad type."""
captured: list = []
fresh_registry.register("agent:start", lambda e, c: captured.append(c))
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:start", "context": {}, "src": 123},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200
assert captured[0]["_forwarded_from"] == "?"
def test_handler_exception_does_not_break_ingest(self, client, fresh_registry):
"""A buggy subscriber raising in emit_sync must not 500 the
ingest endpoint — emit_sync swallows handler exceptions, but
the route also has a top-level try/except defensive layer."""
fresh_registry.register("agent:end", lambda _e, _c: 1 / 0)
token = hook_ingest.write_dashboard_discovery_file("127.0.0.1", 9119)
r = client.post(
"/api/hooks/ingest",
json={"event_type": "agent:end", "context": {}, "src": "gateway"},
headers={"Authorization": f"Bearer {token}"},
)
assert r.status_code == 200

View File

@@ -0,0 +1,319 @@
"""End-to-end integration test for cross-process hook delivery.
Wires together (in the SAME process to avoid subprocess complexity, but
end-to-end through real HTTP + a real FastAPI app):
Source HookRegistry ──forwarder──HTTP──> /api/hooks/ingest ──> Dashboard HookRegistry
└─> handler fires
This validates the wire-format contract between the forwarder and the
ingest endpoint: anything the forwarder produces must be accepted and
republished correctly by the ingest endpoint.
The test runs uvicorn in a daemon thread so we have a real OS socket
forwarders can POST to. ``dashboard.json`` is written into a temp
$HERMES_HOME so the forwarder finds the test server instead of any
real running dashboard on the dev box.
"""
from __future__ import annotations
import json
import socket
import threading
import time
from pathlib import Path
from typing import Callable
import pytest
import uvicorn
from fastapi import FastAPI
from gateway import hook_forwarder
from gateway.hooks import (
HookRegistry,
_reset_default_registry_for_tests,
install_as_default,
)
from hermes_cli import hook_ingest
def _free_port() -> int:
"""Find an unused TCP port on loopback by binding port 0."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _wait_until(predicate: Callable[[], bool], *, timeout: float = 5.0) -> bool:
"""Poll a predicate until it returns True or the timeout elapses.
Cheaper than ``time.sleep`` calls littered through the test body;
keeps test runtime down when the system is fast and bounded when
it's slow.
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(0.02)
return False
@pytest.fixture
def hermes_home(tmp_path, monkeypatch):
"""Each test gets a fresh $HERMES_HOME so ``dashboard.json`` writes
don't bleed across tests (or into the real dev box's home)."""
monkeypatch.setattr(hook_ingest, "get_hermes_home", lambda: tmp_path)
monkeypatch.setattr(hook_forwarder, "get_hermes_home", lambda: tmp_path)
hook_ingest._reset_for_tests()
hook_forwarder._reset_for_tests()
yield tmp_path
hook_ingest._reset_for_tests()
hook_forwarder._reset_for_tests()
@pytest.fixture
def dashboard_server(hermes_home):
"""Start a real uvicorn server with the hook router mounted, write a
matching ``dashboard.json``, and tear it all down on test exit."""
# Build the dashboard's FastAPI app. Just the hook router — no need
# to load the full web_server which would pull in the SPA build,
# auth providers, etc.
app = FastAPI()
app.include_router(hook_ingest.build_hook_router(), prefix="/api/hooks")
# Install a fresh default registry on the (test) dashboard side.
# This is what the ingest endpoint republishes events onto.
_reset_default_registry_for_tests()
dashboard_reg = HookRegistry()
install_as_default(dashboard_reg)
port = _free_port()
config = uvicorn.Config(app, host="127.0.0.1", port=port, log_level="warning")
server = uvicorn.Server(config)
server_thread = threading.Thread(target=server.run, daemon=True)
server_thread.start()
# Wait for the server to start accepting connections.
if not _wait_until(lambda: server.started, timeout=5.0):
raise RuntimeError("uvicorn failed to start within 5s")
# Drop the discovery file pointing at our test server.
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
yield {"app": app, "port": port, "registry": dashboard_reg}
# Teardown: gracefully shut down the server.
server.should_exit = True
server_thread.join(timeout=5.0)
_reset_default_registry_for_tests()
# ---------------------------------------------------------------------------
# End-to-end: source → forwarder → HTTP → ingest → dashboard registry
# ---------------------------------------------------------------------------
def test_end_to_end_event_delivery(dashboard_server, hermes_home):
"""A single fired event in the source registry reaches a subscriber
on the dashboard registry via real HTTP."""
captured: list = []
dashboard_server["registry"].register(
"agent:start", lambda e, c: captured.append((e, c))
)
# Source-side registry — simulates the gateway process.
source_reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
try:
source_reg.emit_sync(
"agent:start",
{"platform": "telegram", "user_id": "u-1", "session_id": "s-1"},
)
# Wait for the daemon worker thread to flush the queue + POST.
assert _wait_until(lambda: len(captured) >= 1, timeout=10.0), (
"Event never reached the dashboard subscriber"
)
event, ctx = captured[0]
assert event == "agent:start"
# Original context survived the round trip.
assert ctx["platform"] == "telegram"
assert ctx["user_id"] == "u-1"
assert ctx["session_id"] == "s-1"
# And the ingest endpoint stamped forwarding metadata.
assert ctx["_forwarded"] is True
assert ctx["_forwarded_from"] == "gateway"
finally:
hook_forwarder.stop()
def test_multiple_namespaces_round_trip(dashboard_server, hermes_home):
"""The forwarder covers every namespace the design promises."""
captured: dict = {ns: [] for ns in ("tui:*", "agent:*", "session:*", "command:*")}
for ns in captured:
# Closure over ns — bind it as a default arg to avoid late binding.
dashboard_server["registry"].register(
ns, lambda e, _c, _ns=ns: captured[_ns].append(e)
)
source_reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
try:
# One event per namespace.
source_reg.emit_sync("tui:tool.start", {"session_id": "s", "payload": {}})
source_reg.emit_sync("agent:start", {"user_id": "u"})
source_reg.emit_sync("session:reset", {"session_key": "k"})
source_reg.emit_sync("command:reset", {"command": "reset"})
assert _wait_until(
lambda: all(len(v) >= 1 for v in captured.values()),
timeout=10.0,
), f"Some events missing: {captured}"
assert captured["tui:*"] == ["tui:tool.start"]
assert captured["agent:*"] == ["agent:start"]
assert captured["session:*"] == ["session:reset"]
assert captured["command:*"] == ["command:reset"]
finally:
hook_forwarder.stop()
def test_loop_prevention_forwarded_events_not_reshipped(dashboard_server, hermes_home):
"""An event whose context already has ``_forwarded=True`` is NOT
shipped by the source-side forwarder.
This is what closes the loop: dashboard republishes an event into
its own registry with ``_forwarded=True``, and if a forwarder were
also running in that process (it isn't, by design — but defense in
depth) it would skip the event instead of round-tripping it back.
"""
# Counter on the dashboard side — bumps every time the ingest
# endpoint fires the agent:start handler.
ingest_hits: list = []
dashboard_server["registry"].register(
"agent:start", lambda _e, _c: ingest_hits.append(1)
)
source_reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
try:
# Fire an event that's already marked as forwarded — simulates
# an event that came back to the source process somehow. The
# forwarder must skip it (no POST to /ingest).
source_reg.emit_sync(
"agent:start",
{
"platform": "telegram",
"_forwarded": True,
"_forwarded_from": "dashboard-echo",
},
)
# Wait long enough that if a POST were going to happen, it
# would have. Then verify NO ingest hits occurred.
time.sleep(0.3)
assert ingest_hits == [], (
f"Forwarder shipped a _forwarded=True event: {ingest_hits}"
)
finally:
hook_forwarder.stop()
def test_forwarder_recovers_from_dashboard_restart(dashboard_server, hermes_home):
"""If the dashboard rotates its token (restart), the forwarder's
next probe re-reads ``dashboard.json`` and picks up the new token.
Unit-tested in detail in ``test_hook_forwarder.py::TestDiscoveryRefresh``.
Here we pin the integration: after a token rotation + probe, events
still land at the dashboard.
"""
captured: list = []
dashboard_server["registry"].register(
"agent:start", lambda _e, c: captured.append(c)
)
source_reg = HookRegistry()
hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
try:
# First event lands cleanly with the original token.
source_reg.emit_sync("agent:start", {"seq": 1})
assert _wait_until(lambda: len(captured) >= 1, timeout=5.0)
assert captured[0]["seq"] == 1
# Simulate dashboard restart: token rotates in dashboard.json
# AND in the ingest endpoint's module state (we're sharing the
# same hook_ingest module so write_dashboard_discovery_file
# updates both).
port = dashboard_server["port"]
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
# Force a discovery refresh directly — simulates what the
# forwarder's worker does on its 30s probe cycle. Using the
# active forwarder's HTTP client this way avoids a 30s
# real-time wait.
import httpx
with httpx.Client(timeout=2.0) as client:
fwd = hook_forwarder._active
assert fwd is not None
fwd._refresh_discovery(client)
source_reg.emit_sync("agent:start", {"seq": 2})
assert _wait_until(lambda: len(captured) >= 2, timeout=10.0)
assert captured[1]["seq"] == 2
finally:
hook_forwarder.stop()
def test_no_dashboard_available_silent_noop(hermes_home):
"""When no dashboard is reachable (no discovery file), the forwarder
is a complete no-op — no thread spawned, no handlers registered."""
# Don't start the dashboard fixture — there's nothing to forward to.
source_reg = HookRegistry()
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
assert result is None
assert source_reg._handlers == {}
# And firing an event doesn't cause anything to happen.
source_reg.emit_sync("agent:start", {}) # must not raise
def test_no_dashboard_then_dashboard_starts_later(hermes_home):
"""``start_if_dashboard_available`` is a one-shot check at call time.
If no dashboard is running when the source process starts, the
forwarder doesn't start. Documented behavior: source-process
consumers must call ``start_if_dashboard_available`` once at startup;
if a dashboard appears later, only newly-started processes pick it
up. (A long-running process with no forwarder will never get one
retroactively.)
This test pins that contract so we notice if we accidentally add
retry behavior — that would be a behavior change worth discussing,
not a sneak in.
"""
source_reg = HookRegistry()
# First call: no dashboard yet.
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
assert result is None
# Dashboard appears.
port = _free_port()
hook_ingest.write_dashboard_discovery_file("127.0.0.1", port)
# Calling again does start the forwarder now. (The contract says
# the wire-up sites only call once at startup, but the function
# itself supports retries — useful for tests, and so callers can
# call it from a post-config hook if they want.)
result = hook_forwarder.start_if_dashboard_available(source_reg, src="gateway")
assert result is not None
hook_forwarder.stop()

View File

@@ -0,0 +1,185 @@
"""Tests for the TUI gateway → ``gateway.hooks`` bridge.
Every call into ``tui_gateway.server._emit`` should mirror the event onto the
process-wide ``HookRegistry`` under the ``tui:`` namespace so in-process
plugins can subscribe via :func:`gateway.hooks.get_default_registry`.
The mirror runs as a side-effect after ``write_json`` and is wrapped in a
broad try/except so a buggy subscriber can never break the main JSON-RPC
dispatch path.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from gateway.hooks import (
HookRegistry,
_reset_default_registry_for_tests,
get_default_registry,
install_as_default,
)
from tui_gateway import server
@pytest.fixture(autouse=True)
def _reset_registry_and_module_cache():
"""Reset the default registry and the TUI module-level cache before each test.
Without this the cache from a previous test (or a previous run within the
same process) would shadow our fresh install_as_default call and the
mirrored event would land on the wrong registry.
"""
_reset_default_registry_for_tests()
# Force the deferred-import cache in the TUI module to re-resolve.
server._hook_registry = None
# Also reset the forwarder-start sentinel so each test's first emit
# re-evaluates "is a dashboard reachable?" against the fixture's
# state instead of remembering a previous test's outcome.
server._forwarder_started = False
yield
_reset_default_registry_for_tests()
server._hook_registry = None
server._forwarder_started = False
class _StubTransport:
"""Captures write_json calls so the test doesn't actually touch stdout."""
def __init__(self):
self.written: list[dict] = []
def write(self, obj):
self.written.append(obj)
return True
def test_emit_mirrors_to_default_registry():
transport = _StubTransport()
captured: list = []
reg = HookRegistry()
install_as_default(reg)
reg.register("tui:tool.start", lambda e, c: captured.append((e, c)))
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "sid-123", {"name": "search_files"})
# The JSON-RPC event was written as before.
assert len(transport.written) == 1
assert transport.written[0]["params"]["type"] == "tool.start"
# The hook bus saw a tui:-prefixed mirror.
assert captured == [
(
"tui:tool.start",
{"session_id": "sid-123", "payload": {"name": "search_files"}},
)
]
def test_emit_with_no_payload_yields_empty_payload_dict():
transport = _StubTransport()
captured: list = []
reg = HookRegistry()
install_as_default(reg)
reg.register("tui:session.info", lambda _e, c: captured.append(c))
with patch.object(server, "_stdio_transport", transport):
server._emit("session.info", "sid-1", None)
assert captured == [{"session_id": "sid-1", "payload": {}}]
def test_emit_subscriber_exception_does_not_break_dispatch():
transport = _StubTransport()
reg = HookRegistry()
install_as_default(reg)
def broken(_e, _c):
raise RuntimeError("subscriber blew up")
reg.register("tui:tool.start", broken)
# If _publish_tui_hook propagated, this would raise.
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "sid-1", {"name": "x"})
# JSON-RPC event still landed on stdout — host pipeline intact.
assert len(transport.written) == 1
assert transport.written[0]["params"]["type"] == "tool.start"
def test_wildcard_subscriber_sees_all_tui_events():
transport = _StubTransport()
seen_types: list = []
reg = HookRegistry()
install_as_default(reg)
reg.register("tui:*", lambda e, _c: seen_types.append(e))
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "s", {})
server._emit("message.delta", "s", {"text": "hi"})
server._emit("session.info", "s", {})
assert seen_types == [
"tui:tool.start",
"tui:message.delta",
"tui:session.info",
]
def test_emit_does_not_blow_up_when_no_subscribers():
transport = _StubTransport()
# No registry installed beyond the lazy default — and no handlers.
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "s", {"name": "x"})
# Dispatch worked, no subscribers fired (default registry is empty).
assert len(transport.written) == 1
def test_hook_registry_resolved_lazily_via_get_default_registry():
"""The TUI module caches whatever ``get_default_registry`` returns at first
use. Re-set the default before the first ``_emit`` and confirm the cache
picks up the new instance, not a stale or never-installed one."""
transport = _StubTransport()
captured: list = []
custom = HookRegistry()
install_as_default(custom)
custom.register("tui:tool.start", lambda _e, c: captured.append(c))
# Sanity: server._hook_registry starts unset thanks to the fixture.
assert server._hook_registry is None
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "s", {"x": 1})
# The cache now points at the installed registry.
assert server._hook_registry is custom
assert captured == [{"session_id": "s", "payload": {"x": 1}}]
# Subsequent emits keep using the cached reference even if the default
# is swapped — the contract is "resolve once, cache thereafter."
new_reg = HookRegistry()
install_as_default(new_reg)
second: list = []
new_reg.register("tui:tool.start", lambda _e, c: second.append(c))
custom.register("tui:tool.start", lambda _e, c: captured.append({"second": c}))
with patch.object(server, "_stdio_transport", transport):
server._emit("tool.start", "s", {"x": 2})
# The cached registry (``custom``) saw the new event, the freshly-installed
# ``new_reg`` did not.
assert second == []
# ``custom`` has two handlers registered now (the original lambda still
# fires on every event, plus the second one that wraps payload in
# ``{"second": ...}``). Both fire on the second ``_emit`` call.
assert captured == [
{"session_id": "s", "payload": {"x": 1}},
{"session_id": "s", "payload": {"x": 2}},
{"second": {"session_id": "s", "payload": {"x": 2}}},
]
assert get_default_registry() is new_reg

View File

@@ -388,6 +388,53 @@ def _emit(event: str, sid: str, payload: dict | None = None):
if payload is not None:
params["payload"] = payload
write_json({"jsonrpc": "2.0", "method": "event", "params": params})
_publish_tui_hook(event, sid, payload)
def _publish_tui_hook(event: str, sid: str, payload: dict | None) -> None:
"""Mirror a TUI gateway dispatch event onto the hook bus.
Every call from ``_emit`` produces a ``tui:<event>`` hook event so plugins
can observe TUI activity (tool starts, message deltas, etc.) without
forking ``_emit`` itself. Wrapped in a broad try/except — a bus subscriber
bug must never break the main JSON-RPC dispatch path that already wrote
the event to stdout above.
The import is deferred to first call to keep TUI cold-start cheap; the
gateway hook module isn't otherwise needed in the TUI process.
On first call we also opportunistically start the cross-process hook
forwarder so dashboard plugins see TUI events. No-op when no dashboard
is running.
"""
try:
global _hook_registry, _forwarder_started # noqa: PLW0603
if _hook_registry is None:
from gateway.hooks import get_default_registry # local import
_hook_registry = get_default_registry()
if not _forwarder_started:
_forwarder_started = True
try:
from gateway import hook_forwarder
hook_forwarder.start_if_dashboard_available(
_hook_registry, src="tui"
)
except Exception:
# Forwarder failure must never break TUI dispatch.
pass
_hook_registry.emit_sync( # type: ignore[union-attr]
f"tui:{event}",
{"session_id": sid, "payload": payload or {}},
)
except Exception:
# Never propagate. Hook bus is best-effort.
pass
# Lazily-resolved on first ``_emit`` call. Cached in module scope so the
# steady-state cost of mirroring is a dict lookup, not an import.
_hook_registry: "object | None" = None
_forwarder_started: bool = False
def _status_update(sid: str, kind: str, text: str | None = None):

View File

@@ -82,10 +82,99 @@ async def handle(event_type: str, context: dict):
| `agent:step` | Each iteration of the tool-calling loop | `platform`, `user_id`, `session_id`, `iteration`, `tool_names` |
| `agent:end` | Agent finishes processing | `platform`, `user_id`, `session_id`, `message`, `response` |
| `command:*` | Any slash command executed | `platform`, `user_id`, `command`, `args` |
| `tui:<sub-event>` | TUI dispatch event mirrored to the bus (see below) | `session_id`, `payload` (raw TUI payload) |
#### Wildcard Matching
Handlers registered for `command:*` fire for any `command:` event (`command:model`, `command:reset`, etc.). Monitor all slash commands with a single subscription.
Handlers registered for `command:*` fire for any `command:<name>` event (`command:model`, `command:reset`, etc.) — useful for monitoring all slash commands with a single subscription. The same pattern works for `tui:*` to receive every TUI dispatch event.
Wildcards match **one** colon-separated namespace level: `foo:*` matches every `foo:<anything>` event, but does not cross another colon (a handler for `agent:*` does not fire for an unrelated `gateway:startup`).
#### `tui:*` events
The TUI gateway (`hermes --tui` or the embedded dashboard PTY) mirrors every JSON-RPC event it sends to the front-end onto the hook bus under the `tui:` namespace. Each handler receives `context = {"session_id": str, "payload": dict}` — the `payload` matches the JSON-RPC event payload the TUI wrote to stdout for that frame.
Common sub-events:
| Sub-event | When it fires |
|-----------|--------------|
| `tui:tool.start` | A tool call begins |
| `tui:tool.progress` | A long-running tool reports progress |
| `tui:tool.complete` | A tool call finishes |
| `tui:message.start` / `message.delta` / `message.complete` | Assistant message lifecycle |
| `tui:reasoning.available` | Reasoning content for the latest turn |
| `tui:thinking.delta` | Streamed thinking text |
| `tui:session.info` | Session metadata changed (model, tools, etc.) |
| `tui:status.update` | Inline status line update |
| `tui:error` | Error frame |
This list isn't exhaustive — anything ``_emit`` writes ends up on the bus. The full set is whatever `tui_gateway/server.py:_emit` happens to send today; payload shapes are documented alongside the front-end consumers in `ui-tui/src/`.
Subscribers fire **synchronously** inside the TUI's hot dispatch path, so handlers should be cheap (push to a queue, set an `asyncio.Event`, etc.) and never block. Exceptions inside a handler are caught and logged — they never break TUI dispatch.
### Programmatic registration
Discovery from `~/.hermes/hooks/` is the common path, but plugins, tests, and built-in code can also register handlers in-process:
```python
from gateway.hooks import get_default_registry
def on_tool_start(event_type, context):
name = context["payload"].get("name", "?")
print(f"[my-plugin] {event_type} -> {name}")
unregister = get_default_registry().register("tui:tool.start", on_tool_start)
# ... later, to clean up:
unregister()
```
`HookRegistry.register(event_type, handler, *, name=None)` accepts the same handler signature as discovered hooks (`handle(event_type, context)`, sync or async) and returns a no-arg callable that removes that specific handler. Other handlers on the same event are unaffected.
For callers that cannot `await` — the TUI's `_emit` is a good example — fire events via the synchronous path:
```python
registry.emit_sync("tui:tool.start", {"session_id": "s1", "payload": {...}})
```
`emit_sync` runs sync handlers immediately; async handlers are scheduled on the running event loop when one is available, or skipped with a one-time per-handler warning when there isn't.
### Per-process registries
Each Hermes process (gateway, TUI, dashboard, batch-runner, CLI) has its **own** `HookRegistry`. Hooks dropped into `~/.hermes/hooks/` are discovered independently in each process. A handler registered programmatically in the gateway's registry is **not** visible to the TUI's registry and vice-versa.
For programmatic handlers (especially dashboard plugins) that want to observe events from other processes, Hermes ships a built-in **cross-process forwarder** that bridges the gateway/TUI/CLI/batch-runner processes to the dashboard process. See "Cross-process delivery" below.
For file-system hooks (`HOOK.yaml` + `handler.py`), the simplest pattern is still: drop the hook directory, every process picks it up on startup. With cross-process delivery enabled, this means a file-system hook subscribed to e.g. `agent:end` will fire **twice** for each event — once in the gateway process (where the event originates) and once in the dashboard process (where it's republished by the forwarder). Filter on `context.get("_forwarded")` if you only want one or the other.
### Cross-process delivery
In a typical `hermes dashboard` + `hermes start` deployment, the dashboard and the gateway are separate processes. Events fired in the gateway (`agent:*`, `session:*`, `command:*`) and the TUI sidecar (`tui:*`) need to reach dashboard plugins (e.g. the orb visualization) that subscribe via `get_default_registry().register(...)` in the dashboard process.
Hermes handles this automatically. The dashboard writes a discovery file at `~/.hermes/dashboard.json` on startup containing its bound URL and a freshly-generated bearer token (0600 mode, owner-readable only). Non-dashboard processes (gateway, TUI, CLI, batch-runner) read this file on startup, register a built-in forwarder for the canonical namespaces (`tui:*`, `agent:*`, `session:*`, `command:*`, `gateway:*`), and POST every fired event to the dashboard's `POST /api/hooks/ingest` endpoint. The dashboard republishes each forwarded event on its own default registry via `emit_sync`, with `context["_forwarded"] = True` and `context["_forwarded_from"] = "<src>"` added so:
1. Source-side forwarders skip the event if it ever round-trips back (loop prevention).
2. Handlers can filter on `_forwarded` if they want to dedupe between the source-process firing and the dashboard-republish.
**Behavior:**
- **Zero-config** when both processes are running. No configuration needed; the discovery file appears and disappears with the dashboard.
- **Bind-address independent.** The forwarder works regardless of whether the dashboard binds loopback (`127.0.0.1`) or a public interface (`--insecure`). The bearer token in `dashboard.json` is the security boundary, not the bind address.
- **No-op when no dashboard is running.** The forwarder doesn't register handlers; events fire in their source process and stay there.
- **Best-effort.** A bounded queue (1024 frames per source process) drops oldest on overflow. POSTs use a 2-second timeout. Recency beats history when the dashboard is slow.
- **Non-blocking.** The forwarder's handler enqueues onto a queue and returns immediately. A daemon thread does the HTTP POSTs. The hot publisher path is never blocked on network I/O.
**Disabling:** set `HERMES_HOOK_FORWARDER=0` (or `false`/`no`/`off`) in the source process's environment. Useful for paranoid security postures or for debugging hook double-fires.
**Failure modes:**
| Scenario | Behavior |
|---|---|
| Dashboard not running | `dashboard.json` absent; forwarder no-op until next probe (30s). |
| Dashboard restarts (new token) | Forwarder's next probe re-reads the file and picks up the new token. |
| Dashboard up but slow | Forwarder queue overflows and drops oldest. |
| Forwarder POST timeout | Logged once per minute, not per failure. Queue drains as POSTs return. |
| `--insecure` mode | Forwarder works identically to loopback mode. Plaintext-over-LAN; same exposure as the rest of `--insecure`. |
### Examples