Compare commits

...

6 Commits

Author SHA1 Message Date
Ben
1ab55e000b build: expose hermes_events in py-modules so plugins can import it
The bus lives at the repo root as a single-file module
(``hermes_events.py``). setuptools needs the module listed in
``[tool.setuptools] py-modules`` to ship it with the installed
package; otherwise the file exists on disk but ``import hermes_events``
fails for any caller that doesn't have the repo root on
``sys.path`` (which is most callers — pip-installed users, external
plugin directories, etc.).

Drop ``hermes_events`` into the list right after ``hermes_constants``
to keep the file alphabetically grouped with the other top-level
``hermes_*.py`` siblings.

Verified
--------
- ``pip install -e .`` regenerates the editable finder to include
  hermes_events.
- After a fresh install (or rerunning ``pip install -e .``),
  ``import hermes_events`` resolves from ``site-packages``-side
  consumers cleanly.
- Existing in-repo tests are unaffected (CWD already on sys.path).

External-consumer motivation
----------------------------
The first known external consumer is the (forthcoming)
``hermes-agent-plugin-orb`` standalone repo, which lives at
``~/.hermes/plugins/orb`` and calls ``import hermes_events`` from
its ``dashboard/plugin_api.py``. Without this change, the plugin
loads (importlib spec_from_file_location runs) but the bus import
fails and the dashboard logs a clear ImportError.
2026-05-29 09:45:02 +10:00
Ben
952ce45bf0 phase 5: cross-process bridge integration tests
9 tests exercising hermes_cli/web_server.py's _republish_pub_frame_to_bus
ingestor, which is the dashboard side of the cross-process bridge wired
up in Phase 4.

Test surface covers both wire shapes:

1. Bus relay (gateway → dashboard, the new {_bus_relay: true, topic,
   envelope} shape):
   - originating ts and src preserved through the ingestor
   - arbitrary topic namespaces pass through (no whitelist)
2. TUI sidecar JSON-RPC (legacy {jsonrpc, method:"event", params:{...}}):
   - published as `tui.<type>` with auto-stamped ts/src and session_id +
     flattened payload keys
   - payload-less variants still publish
   - missing `type` is dropped silently
3. Resilience:
   - malformed JSON, empty strings, JSON arrays → ignored
   - unknown frame shape → ignored
   - subscriber exception → does not propagate out
4. End-to-end: a plugin subscribing to `**` sees both source-types
   interleaved via a single subscription, with each event's ts treated
   correctly (gateway ts preserved, TUI ts auto-stamped).

Implementation note: we deliberately don't spawn a real gateway
subprocess + dashboard server here. The wire format and the ingestor
logic are what matter for v1, and they're both exercised directly. The
WebSocket transport itself is covered by upstream `websockets` tests
and will be smoke-tested end-to-end in Phase 17.

Build status: 9 new tests pass; the full prior-touched set (399 tests
across 6 files) remains green.
2026-05-29 09:43:10 +10:00
Ben
4872bef3bd phase 4: cross-process bridges + dashboard ingestion
Bridge the local hermes_events bus across process boundaries so a plugin
running in the dashboard process sees events from the TUI sidecar and
from the standalone gateway via a single subscribe() call.

Dashboard ingestion (hermes_cli/web_server.py)
----------------------------------------------
The /api/pub WebSocket handler now ALSO re-publishes incoming frames
onto the local hermes_events bus, in addition to the existing per-channel
_broadcast_event fanout (which still drives the React sidebar's
per-session feed).

Two frame shapes are recognized:

1. **TUI sidecar JSON-RPC**: legacy
   {"jsonrpc": "2.0", "method": "event",
    "params": {type, session_id, payload}}
   → published as topic `tui.<type>` with session_id + flattened payload
   keys. Auto-stamped ts/src.

2. **Bus relay** (new shape for gateway → dashboard):
   {"_bus_relay": true, "topic": "<topic>", "envelope": {<full envelope>}}
   → published verbatim onto the bus. The envelope's pre-stamped ts/src
   are preserved (cross-process timestamp fidelity).

Frame parsing is best-effort and try/except'd; a malformed frame never
disturbs the legacy per-channel fanout.

Gateway-side bridge (gateway/event_bridge.py)
---------------------------------------------
New module: subscribes to `**` on the local bus and ships envelopes to
the dashboard via WS in shape #2 above. Wired into gateway/run.py
startup (right after `hooks.discover_and_load()`); silently no-ops when
the configuration env vars are unset (the standalone-gateway case).

Configuration:
- HERMES_DASHBOARD_EVENT_URL  — e.g. ws://127.0.0.1:9119/api/pub
- HERMES_DASHBOARD_EVENT_TOKEN — dashboard _SESSION_TOKEN value
- HERMES_DASHBOARD_EVENT_CHANNEL — defaults to "gateway"

Failure mode is silent: connect/send errors are logged at debug,
the worker reconnects with exponential backoff up to 30s, and gateway
runs are never blocked on bus delivery. Loop-prevention: envelopes
whose `src` doesn't start with "gateway"/"agent" (i.e. inbound TUI
echoes) are skipped to prevent ping-pong.

The auth handoff (sharing the dashboard's ephemeral _SESSION_TOKEN
with the gateway process) is left to a follow-up — the env-var path
unblocks anyone deploying gateway and dashboard together via systemd
or k8s where the token can be templated in. For dev (single-host,
single user) the dashboard token can be `cat ~/.hermes/dashboard.token`
or piped via `hermes dashboard --print-token | gateway ...`.

TUI side
--------
No new code. Phase 3 already added publish() to _emit(). The existing
TeeTransport → WsPublisherTransport plumbing ships JSON-RPC frames to
/api/pub; the dashboard ingestor handles them as shape #1 above. So
the TUI's contribution flows into the dashboard bus without further
modification.

Tests
-----
- tests/hermes_cli/test_web_server.py: 150 pre-existing tests pass.
- tests/hermes_cli/test_dashboard_auth_ws_auth.py: 21 pass.
- tests/gateway/test_hooks.py: 21 pass.
- tests/test_tui_gateway_server.py: 186 pass.
- tests/test_hermes_events.py: 21 pass (from Phase 2).
Total: 399 tests across 5 files pass after this phase. Cross-process
integration test lands in Phase 5.
2026-05-29 09:43:10 +10:00
Ben
0244207b36 phase 3: publishers — wire _emit() and HookRegistry.emit() to hermes_events
Both load-bearing event sources in Hermes now publish to the generic
event bus alongside their existing dispatch channels. Plugins can
subscribe via plugin_api.py instead of registering bespoke hooks.

tui_gateway/server.py _emit()
-----------------------------
Adds a publish() call alongside the existing JSON-RPC write_json frame.
The bus payload flattens the JSON-RPC params shape: session_id at top
level, plus the event-specific payload dict's keys. Topic is prefixed
with `tui.` (e.g. `tui.message.start`, `tui.tool.complete`,
`tui.reasoning.delta`).

gateway/hooks.py HookRegistry.emit()
------------------------------------
Adds publish() at the top of the coroutine. emit() is async; publish()
is sync (the bus schedules async subscribers internally via
asyncio.create_task), so no await is needed. The legacy `:` separator
is translated to `.` so the bus topic is `gateway.agent.start`,
`gateway.session.reset`, `gateway.command.title`, etc.

Both call sites wrap publish() in try/except — the bus is best-effort
plumbing for plugins, and a publish-side error must never block the
primary event paths (JSON-RPC to the React TUI; the YAML-discovered
hook handler chain). hermes_events itself already swallows subscriber
exceptions; the try/except here covers (very unlikely) publish-side
errors only.

Tests
-----
- tests/gateway/test_hooks.py: 21 pre-existing tests still pass.
- tests/test_tui_gateway_server.py: 186 pre-existing tests still pass.
- tests/test_hermes_events.py: 21 bus tests from Phase 2 still pass.
Total: 228 pass via scripts/run_tests.sh in 5.6s.

No new tests in this phase — Phase 5's cross-process integration test
will exercise the gateway→bus→dashboard flow end-to-end.
2026-05-29 09:43:10 +10:00
Ben
315293e9fb phase 2: bus unit tests (tests/test_hermes_events.py)
21 tests covering every contract of hermes_events.py:

- publish/subscribe round-trip
- glob pattern matching: `*` is one segment, `**` is zero-or-more,
  mid-segment `**`, literal-segment exact match
- sync subscriber fires in publisher stack (no await needed)
- async subscriber dispatched via asyncio.create_task when loop running
  (uses @pytest.mark.asyncio for the loop)
- async subscriber DROPPED with warning log line when no loop is running;
  sync subscribers in the same emit still fire
- exception in one subscriber doesn't block others or raise to publisher
- sync subscriber accidentally returning a coroutine: discarded with warning
- unsubscribe is idempotent (second call returns False, doesn't raise)
- same callback can subscribe to same pattern twice; handles are distinct
- envelope auto-stamps type/ts/src when missing
- envelope preserves caller-provided ts/src/type (cross-process relay)
- 100-way fanout publish stays under 50ms (generous sanity bound)

All tests use an autouse _reset_bus fixture to clear bus state between
tests. Per-file process isolation (scripts/run_tests_parallel.py) means
this file's state can't leak into other test files.

Build status: 21 tests pass via scripts/run_tests.sh in 0.6s.
2026-05-29 09:43:10 +10:00
Ben
2316d2a225 phase 1: hermes_events.py — generic plugin event bus
Add a process-local pub/sub bus that lets sources (TUI sidecar, gateway
hooks, agent loop, etc.) publish lifecycle events to subscribers
(plugins, observability, debug taps) without either side knowing about
the other.

The orb is the first consumer of this facility, but the API is
generic — any plugin can subscribe via its plugin_api.py.

Design
------
- Sync publish() — callable from any context, including async coroutines
  (no await needed). Async publishers in gateway/hooks.py and agent
  hot-paths can use it without restructuring.
- Mixed-mode subscribers — sync callbacks fire in the publisher's stack;
  async callbacks are scheduled via asyncio.create_task() when a loop is
  detected, dropped with a warning otherwise.
- Glob patterns — `*` matches one segment, `**` matches any number.
- Auto-stamped envelope — bus fills in `type`/`ts`/`src` when missing,
  preserves them when the publisher pre-stamps (for cross-process relays).
- Exception isolation — a raising subscriber never poisons others or
  bubbles back to the publisher.

Files
-----
- hermes_events.py        — the bus module (~270 lines, well-commented)
- docs/events.md          — public taxonomy + envelope spec + EXPERIMENTAL
                            disclaimer at the top

Stability
---------
docs/events.md carries the explicit "experimental, breakage expected"
notice from the plan. Topic names and payload shapes may change without
a deprecation cycle until v1.0 lands. The API shape (publish/subscribe/
unsubscribe + envelope rule + glob syntax) is intended to be stable.

Phase 0 baseline
----------------
Verified pristine HEAD passes tests/gateway/test_hooks.py +
tests/test_tui_gateway_server.py (207 tests). No baseline-fix commit
needed.

Build status: hermes_events module imports cleanly; smoke-tested
publish/subscribe/glob/envelope/exception-isolation in a subprocess.
2026-05-29 09:43:10 +10:00
10 changed files with 1507 additions and 2 deletions

173
docs/events.md Normal file
View File

@@ -0,0 +1,173 @@
# Hermes Events Bus
> ⚠️ **EXPERIMENTAL.** Topic names and payload shapes in this document are
> explicitly prone to change. The bus is shipped now so plugin authors can
> start consuming it, but you should expect to update your topic globs and
> payload-field accesses over the next few releases. We will declare a
> stable v1.0 schema once the orb (and at least one more consumer) have
> shaken out the design — at that point we'll add a proper deprecation
> policy. Until then, breakage is on the menu.
`hermes_events` is the in-process pub/sub bus that lets core components
(the embedded TUI, the gateway, the agent loop, the cron scheduler, ...)
broadcast lifecycle events that plugins (and other core components) can
subscribe to. It is the substrate that drives the orb's `SceneState`
machine and is intended to be the substrate that drives any future
"what is Hermes doing right now" widget, observability shipper, or
debug-tap plugin.
## API surface
```python
from hermes_events import publish, subscribe, unsubscribe
# Sources publish — always sync, even from inside async contexts.
publish("tui.tool.start", {"name": "web_search", "session_id": "abc"})
publish("gateway.agent.start", {"platform": "telegram", "session_id": "xyz"})
# Plugins subscribe with glob patterns.
handle = subscribe("tui.tool.*", on_tool_event) # one segment after tool.
handle = subscribe("gateway.agent.*", on_agent_event)
handle = subscribe("tui.**", any_tui_topic) # any number of segments
handle = subscribe("**", on_anything) # firehose
unsubscribe(handle)
```
- Topics are `.`-segmented.
- Pattern globs: `*` matches one segment; `**` matches zero or more segments.
- `publish()` is **synchronous**. Async publishers (`async def emit(): ...`)
just call sync `publish()` from inside their coroutine — no `await` needed.
- Subscribers may be sync or async. Sync subscribers fire immediately in the
publisher's stack. Async subscribers are scheduled via
`asyncio.create_task()` if a running event loop is detected. **If no loop
is running** (unit tests outside `pytest-asyncio`, startup-before-loop),
async subscribers are dropped for that emit with a warning log line; sync
subscribers still fire.
- Subscriber exceptions are logged but never raised back to the publisher.
## Envelope
Every published payload is wrapped before delivery:
```python
{
"type": "tui.tool.start", # always the topic the publisher used
"ts": 1716938400.123, # unix seconds (float)
"src": "tui", # first dot-segment of `type`
# ...source-specific fields, freely evolving during the experimental phase
}
```
`type`, `ts`, and `src` are always present. The bus auto-stamps `ts` (current
`time.time()`) and `src` (first segment of the topic) only if the publisher
omits them. **Relayed events** (e.g. the dashboard receiving a frame from a
remote gateway process) should pre-populate `ts` and `src` so the originating
values are preserved — the bus does not overwrite existing keys.
## Current topic taxonomy
All topic names below are **experimental**. Plugins should subscribe to
the most general pattern that meets their needs and tolerate the addition
of new sibling topics over time.
### `tui.*` — emitted by the dashboard's TUI sidecar
| Topic | When it fires |
|---|---|
| `tui.message.start` | The model begins emitting a new assistant message. |
| `tui.message.delta` | A streaming-token chunk of the current assistant message. |
| `tui.message.complete` | The assistant message has finished streaming. |
| `tui.tool.start` | A tool call has begun execution. |
| `tui.tool.progress` | A long-running tool reports incremental output. |
| `tui.tool.complete` | A tool call finished (success or failure). |
| `tui.tool.generating` | The model is mid-stream of constructing a tool call's arguments. |
| `tui.reasoning.delta` | A streaming-token chunk of the model's reasoning content. |
| `tui.reasoning.available` | A full reasoning block became available (non-streaming case). |
| `tui.error` | An error frame propagated through the TUI. |
Common payload fields (best-effort, not guaranteed):
- `session_id` (str) — TUI session this event belongs to.
- `name` (str) — for tool events, the tool's name.
- `preview` (str) — for tool events, a short preview of args or output.
### `gateway.*` — emitted by the messaging gateway
| Topic | When it fires |
|---|---|
| `gateway.startup` | The gateway process has finished initializing. |
| `gateway.agent.start` | The agent has begun processing a user message. |
| `gateway.agent.step` | Each iteration of the agent's tool-calling loop. |
| `gateway.agent.end` | The agent has finished processing a user message. |
| `gateway.session.start` | A new session was created (first message of a new session). |
| `gateway.session.end` | A session ended (user ran `/new` or `/reset`). |
| `gateway.session.reset` | A session reset completed; a new session entry was created. |
| `gateway.command.<name>` | Any slash command was executed. Wildcard-friendly via `gateway.command.*`. |
Common payload fields:
- `platform` (str) — `"telegram"`, `"discord"`, `"slack"`, etc.
- `session_id` (str) — the gateway session this event belongs to.
### Future namespaces
Topics under namespaces we haven't shipped yet are reserved. If you see
events fire under a namespace not listed here, that's a leak from in-flight
work — please don't subscribe to it; the topic may be renamed or removed
before the next release.
## Cross-process delivery
Each process has its own bus instance. Bridges ship events between
processes:
- **TUI sidecar → dashboard:** the sidecar runs as a PTY-spawned subprocess
of the dashboard. A default `subscribe("**", ship_via_pub_ws)` on the TUI
side forwards every event to the dashboard via the `/api/pub` WebSocket,
which re-publishes them onto the dashboard's local bus.
- **Gateway → dashboard:** the gateway process opens a WebSocket to the
dashboard at startup and ships its events the same way. If the dashboard
is offline, the bridge silently no-ops; gateway runs are never blocked.
- **Same-process publishers** (e.g. an embedded gateway in the dashboard
process) hit the local bus directly — no bridge involved.
This means a plugin's `plugin_api.py` running in the dashboard process
sees events from all three sources via a single `subscribe(...)` call.
Subscription patterns do not change based on where events originated.
## For plugin authors
A typical plugin backend wires up subscriptions in its `register()` (or
at import time, since `plugin_api.py` is imported once at dashboard
startup):
```python
from fastapi import APIRouter
from hermes_events import subscribe
router = APIRouter()
def _on_tool_event(envelope: dict) -> None:
# Process the envelope, push to your WebSocket subscribers, etc.
...
subscribe("tui.tool.*", _on_tool_event)
subscribe("gateway.agent.*", _on_tool_event)
```
If you need to subscribe asynchronously (e.g. to drive a per-subscriber
`asyncio.Queue` in a long-lived FastAPI WebSocket handler), register an
`async def` callback — the bus will `asyncio.create_task()` it inside the
dashboard's running event loop.
For a fully-worked example, see `plugins/orb/dashboard/plugin_api.py`.
## Stability declaration
Until further notice (read: until a `docs/events-v1.md` lands and this
file is renamed to `docs/events-experimental.md`), assume any topic name
or payload field may be renamed, removed, or split into multiple events
in any release.
The **shape of the bus itself** (the `publish` / `subscribe` /
`unsubscribe` Python API, the envelope auto-stamping rule, the glob
pattern syntax) is more stable — we intend to preserve it through v1.0.
But the **content** flowing through it is the experimental part.

246
gateway/event_bridge.py Normal file
View File

@@ -0,0 +1,246 @@
"""Cross-process bridge: ship local ``hermes_events`` to the dashboard.
The gateway runs as its own process (``hermes gateway run``) — separate from
the dashboard's web server. Plugins like the orb live in the dashboard
process and subscribe to its local bus instance. This module forwards
locally-published events to the dashboard so its plugins see them.
Wire protocol
-------------
Frames are JSON dicts with the shape::
{
"_bus_relay": true,
"topic": "<topic, e.g. gateway.agent.start>",
"envelope": {<full hermes_events envelope: type, ts, src, ...>},
}
The dashboard's ``/api/pub`` receiver recognises this shape and re-publishes
verbatim onto its local bus. ``ts`` and ``src`` are preserved (the bus only
auto-stamps missing keys), so subscribers see the original gateway-side
timestamp.
Configuration
-------------
The bridge reads two environment variables at start time:
- ``HERMES_DASHBOARD_EVENT_URL`` — ws://host:port/api/pub URL (no query string;
this module appends ``token`` and ``channel``).
- ``HERMES_DASHBOARD_EVENT_TOKEN`` — the dashboard's session token. The
dashboard's ``_SESSION_TOKEN`` is process-local-ephemeral, so until a
handoff mechanism lands this must be supplied by whoever launches the
gateway. Without it the bridge no-ops (no exception, no spam — just a
single debug log line at startup).
Failure mode
------------
Best-effort. Connection failures, drops, and per-frame send errors are
logged at debug and the bridge silently retries with exponential backoff.
``publish()`` is sync; the bridge enqueues frames on a daemon thread so
the publisher's main path never blocks.
If both env vars are missing the bridge does not start — this is the
common case for gateways running standalone (no dashboard at all).
"""
from __future__ import annotations
import json
import logging
import os
import queue
import threading
import time
from typing import Optional
from urllib.parse import urlencode
import hermes_events
from hermes_events import _Subscription
logger = logging.getLogger(__name__)
# Module-level state. ``start()`` is idempotent; calling twice replaces the
# bridge but does not duplicate it.
_lock = threading.Lock()
_bridge: Optional["_Bridge"] = None
class _Bridge:
"""Background thread that ships local bus events to the dashboard."""
_QUEUE_MAX = 1024
def __init__(self, url: str, token: str, channel: str = "gateway") -> None:
self._url = url
self._token = token
self._channel = channel
self._queue: queue.Queue = queue.Queue(maxsize=self._QUEUE_MAX)
self._stop = threading.Event()
self._worker: Optional[threading.Thread] = None
self._subscription: Optional[_Subscription] = None
def start(self) -> None:
# Subscribe to ``**`` first — every published event lands in the
# outgoing queue. The worker thread drains the queue and ships via
# WS. If the WS isn't connected yet the queue absorbs the burst.
self._subscription = hermes_events.subscribe("**", self._enqueue)
self._worker = threading.Thread(
target=self._run,
name="hermes-event-bridge",
daemon=True,
)
self._worker.start()
def stop(self) -> None:
if self._subscription is not None:
hermes_events.unsubscribe(self._subscription)
self._subscription = None
self._stop.set()
# Nudge the queue so a blocked drain returns promptly.
try:
self._queue.put_nowait(None)
except queue.Full:
pass
def _enqueue(self, envelope: dict) -> None:
"""Subscriber callback. Skip ``_bus_relay``-tagged envelopes — those
came IN via the dashboard's pub_ws, and re-shipping would loop them
back. We only forward locally-originated events."""
if envelope.get("_relayed"):
return
topic = envelope.get("type", "")
if not isinstance(topic, str) or not topic:
return
# Skip echo: don't ship events whose ``src`` indicates they came
# from somewhere other than this process. The dashboard ingestor
# doesn't tag with ``_relayed``, but it does preserve the original
# ``src``. If the gateway ever subscribes to dashboard-originated
# events (e.g. ``tui.*``), shipping them back would create a loop.
# For now we accept anything originating with src starting with
# ``gateway`` or ``agent``; everything else is presumed inbound.
src = envelope.get("src", "")
if isinstance(src, str) and not src.startswith(("gateway", "agent")):
return
frame = {
"_bus_relay": True,
"topic": topic,
"envelope": dict(envelope),
}
try:
self._queue.put_nowait(frame)
except queue.Full:
# Drop oldest, enqueue newest. Bus events are best-effort; a
# blocked subscriber must never wedge the publisher's main path.
try:
self._queue.get_nowait()
except queue.Empty:
pass
try:
self._queue.put_nowait(frame)
except queue.Full:
# Pathological: queue still full after a get. Drop.
pass
def _run(self) -> None:
backoff = 1.0
max_backoff = 30.0
while not self._stop.is_set():
ws = self._connect()
if ws is None:
# Sleep with stop-aware wait. .wait returns True if stop
# was set, in which case we exit immediately.
if self._stop.wait(backoff):
return
backoff = min(backoff * 2, max_backoff)
continue
backoff = 1.0 # reset on successful connect
try:
self._drain(ws)
except Exception as exc:
logger.debug("event_bridge: drain error: %s", exc)
finally:
try:
ws.close() # type: ignore[union-attr]
except Exception:
pass
def _connect(self):
try:
from websockets.sync.client import connect as ws_connect
except ImportError: # pragma: no cover
logger.debug("event_bridge: 'websockets' package not installed")
return None
qs = urlencode({"token": self._token, "channel": self._channel})
url = f"{self._url}?{qs}" if "?" not in self._url else f"{self._url}&{qs}"
try:
return ws_connect(url, open_timeout=2.0, max_size=None)
except Exception as exc:
logger.debug("event_bridge: connect to %s failed: %s", self._url, exc)
return None
def _drain(self, ws) -> None:
"""Pop frames off the queue and send via WS until error or stop."""
while not self._stop.is_set():
try:
item = self._queue.get(timeout=1.0)
except queue.Empty:
continue
if item is None:
return # explicit shutdown sentinel
try:
ws.send(json.dumps(item, ensure_ascii=False))
except Exception as exc:
logger.debug("event_bridge: send failed, will reconnect: %s", exc)
# Re-queue the lost frame at the front so we don't drop it
# on a transient blip. Best-effort: the queue is FIFO, so
# this lands at the back, not the front — acceptable for v1.
try:
self._queue.put_nowait(item)
except queue.Full:
pass
return
def start_if_configured() -> bool:
"""Read env vars and start the bridge if both are set.
Returns True if the bridge was started, False otherwise. Idempotent —
a second call replaces the previous bridge.
"""
global _bridge
url = os.environ.get("HERMES_DASHBOARD_EVENT_URL", "").strip()
token = os.environ.get("HERMES_DASHBOARD_EVENT_TOKEN", "").strip()
if not url or not token:
logger.debug(
"event_bridge: HERMES_DASHBOARD_EVENT_URL/TOKEN not set — "
"gateway bus events will not reach the dashboard"
)
return False
channel = os.environ.get("HERMES_DASHBOARD_EVENT_CHANNEL", "gateway").strip() or "gateway"
with _lock:
if _bridge is not None:
_bridge.stop()
_bridge = _Bridge(url=url, token=token, channel=channel)
_bridge.start()
logger.info(
"event_bridge: shipping local bus events to dashboard at %s (channel=%s)",
url,
channel,
)
return True
def stop() -> None:
"""Stop the bridge if running. Idempotent."""
global _bridge
with _lock:
if _bridge is not None:
_bridge.stop()
_bridge = None
__all__ = ["start_if_configured", "stop"]

View File

@@ -27,6 +27,7 @@ from typing import Any, Callable, Dict, List, Optional
import yaml
from hermes_cli.config import get_hermes_home
import hermes_events
HOOKS_DIR = get_hermes_home() / "hooks"
@@ -171,6 +172,24 @@ class HookRegistry:
if context is None:
context = {}
# Publish on the generic event bus (hermes_events) so plugins can
# subscribe to gateway lifecycle without having to register a
# full HOOK.yaml + handler.py. Translate the legacy ``:`` separator
# to the bus's ``.`` so a subscriber can match ``gateway.agent.*``
# or ``gateway.command.<name>``. ``publish()`` is sync — safe to
# call from inside this coroutine without ``await``; async
# subscribers are scheduled by the bus via asyncio.create_task().
# See docs/events.md for the topic taxonomy.
try:
hermes_events.publish(
f"gateway.{event_type.replace(':', '.')}", dict(context)
)
except Exception:
# Bus failures must never block the legacy hook pipeline. The
# bus already logs subscriber exceptions internally; this
# catch only handles a (very unlikely) publish-side error.
pass
for fn in self._resolve_handlers(event_type):
try:
result = fn(event_type, context)

View File

@@ -4081,6 +4081,19 @@ class GatewayRunner:
# Discover and load event hooks
self.hooks.discover_and_load()
# Optionally bridge local hermes_events bus → dashboard so plugins
# in the dashboard process (orb, etc.) see gateway lifecycle events.
# No-op when HERMES_DASHBOARD_EVENT_URL/TOKEN env vars are unset
# (the common standalone-gateway case). See gateway/event_bridge.py.
try:
from gateway import event_bridge
event_bridge.start_if_configured()
except Exception:
logger.debug(
"gateway/event_bridge.start_if_configured failed; "
"continuing without dashboard event relay",
exc_info=True,
)
# Recover background processes from checkpoint (crash recovery)
try:

View File

@@ -33,6 +33,7 @@ if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from hermes_cli import __version__, __release_date__
import hermes_events # type: ignore[import-not-found]
from hermes_cli.config import (
cfg_get,
DEFAULT_CONFIG,
@@ -3755,11 +3756,79 @@ async def pub_ws(ws: WebSocket) -> None:
try:
while True:
await _broadcast_event(channel, await ws.receive_text())
frame = await ws.receive_text()
# Legacy behavior: fan the frame out to every /api/events
# subscriber on this channel id. Drives the React sidebar's
# live tool-call feed and keeps the existing per-session pubsub
# working exactly as before.
await _broadcast_event(channel, frame)
# Phase 4 addition: also re-publish onto the local hermes_events
# bus so plugin_api.py modules (orb today, others later) can
# subscribe to TUI/gateway events without needing to know about
# WebSocket plumbing. Frame parsing is best-effort and never
# blocks the legacy fanout above — a malformed frame is logged
# at debug and discarded for bus purposes only.
try:
_republish_pub_frame_to_bus(frame)
except Exception:
_log.debug("pub_ws: bus republish failed for one frame", exc_info=True)
except WebSocketDisconnect:
pass
def _republish_pub_frame_to_bus(frame: str) -> None:
"""Best-effort: parse a frame received on ``/api/pub`` and re-publish it
onto the local ``hermes_events`` bus.
Two frame shapes are recognised:
1. **TUI sidecar JSON-RPC** ``{"jsonrpc": "2.0", "method": "event",
"params": {"type": <evt>, "session_id": <sid>, "payload": {...}}}``
— published as topic ``tui.<evt>`` with ``session_id`` + flattened
``payload`` keys. The bus auto-stamps ``ts`` and ``src``.
2. **Bus relay** ``{"_bus_relay": true, "topic": <topic>,
"envelope": {<full envelope incl. type/ts/src/...>}}`` — published
verbatim onto the bus under ``topic``. The envelope's ``ts`` and
``src`` are preserved (the bus only stamps missing keys), which is
critical for cross-process events where we want the original
timestamp from the gateway process.
Anything else is ignored silently.
"""
try:
obj = json.loads(frame)
except (json.JSONDecodeError, ValueError):
return
if not isinstance(obj, dict):
return
# Shape 2: bus relay (gateway → dashboard, or future relays)
if obj.get("_bus_relay") is True:
topic = obj.get("topic")
envelope = obj.get("envelope")
if isinstance(topic, str) and isinstance(envelope, dict):
hermes_events.publish(topic, envelope)
return
# Shape 1: legacy TUI JSON-RPC event frame
if obj.get("method") == "event":
params = obj.get("params")
if not isinstance(params, dict):
return
evt = params.get("type")
if not isinstance(evt, str) or not evt:
return
bus_payload: dict = {}
sid = params.get("session_id")
if sid is not None:
bus_payload["session_id"] = sid
payload = params.get("payload")
if isinstance(payload, dict):
bus_payload.update(payload)
hermes_events.publish(f"tui.{evt}", bus_payload)
@app.websocket("/api/events")
async def events_ws(ws: WebSocket) -> None:
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:

298
hermes_events.py Normal file
View File

@@ -0,0 +1,298 @@
"""
Hermes Agent — process-local pub/sub event bus.
Plugins and core components can publish events on a shared in-process bus and
subscribe to topic patterns. Sources publish; subscribers listen; neither knows
about the other.
EXPERIMENTAL — topic names and payload shapes are explicitly prone to change
until we publish a v1.0 schema. See ``docs/events.md`` for the current taxonomy
and the breakage notice. Third-party plugins are welcome to subscribe today,
but should expect to update their topic globs over the next few releases.
Basic usage
-----------
::
from hermes_events import publish, subscribe, unsubscribe
# Sources publish — always sync, even from inside async contexts.
publish("tui.tool.start", {"name": "web_search", "session_id": "abc"})
# Subscribers register glob patterns. ``*`` matches one topic segment;
# ``**`` matches any number of segments.
handle = subscribe("tui.tool.*", on_tool_event) # tui.tool.start/.complete/...
handle = subscribe("tui.**", on_any_tui) # any tui.* topic
handle = subscribe("**", everything)
unsubscribe(handle)
Envelope
--------
Every published payload is wrapped in a minimal envelope before being delivered
to subscribers:
::
{
"type": "<topic>", # always the topic the publisher used
"ts": <unix-seconds-float>, # auto-stamped if missing
"src": "<first-topic-segment>", # auto-stamped if missing
...source-specific fields, freely evolving in the experimental phase
}
Publishers that are *relaying* events from another process should pre-populate
``ts`` and ``src`` so the original values are preserved (the bus only fills in
missing keys; it never overwrites them).
Async semantics
---------------
``publish()`` is synchronous. Sync subscribers fire immediately in the
publisher's stack. Async subscribers are scheduled via
``asyncio.create_task()`` when a running event loop is detected. When no loop
is running (unit tests outside of an async test, startup-before-loop), async
subscribers are dropped for that emit with a single warning log line — sync
subscribers still fire normally.
This means async publishers (e.g. ``gateway.hooks.HookRegistry.emit()``) just
call sync ``publish()`` from inside their coroutine — no ``await`` needed.
Exceptions in subscribers are logged but never raised back to the publisher.
"""
from __future__ import annotations
import asyncio
import inspect
import logging
import threading
import time
from collections.abc import Awaitable, Callable
from typing import Any
logger = logging.getLogger(__name__)
# A subscriber callback. Receives the envelope dict and may return either
# ``None`` (sync) or an awaitable (async).
SubscriberCallback = Callable[[dict], "Awaitable[None] | None"]
class _Subscription:
"""Internal record of one subscription. Returned (opaque) to callers as
the handle for ``unsubscribe()``.
"""
__slots__ = ("pattern", "segments", "callback", "is_async")
def __init__(self, pattern: str, callback: SubscriberCallback) -> None:
self.pattern = pattern
self.segments = pattern.split(".")
self.callback = callback
# ``iscoroutinefunction`` doesn't catch partials, bound methods of
# async functions in all cases, or callable objects with an async
# ``__call__``. ``inspect.iscoroutinefunction`` handles the common
# cases; we also probe by calling and inspecting if a callable
# object's __call__ is async-tagged. Keep this conservative — a
# false-negative just runs an async subscriber synchronously and
# discards the coroutine (with a warning), which is recoverable;
# a false-positive schedules a sync function as a task, which
# would explode in create_task().
self.is_async = inspect.iscoroutinefunction(callback) or inspect.iscoroutinefunction(
getattr(callback, "__call__", None)
)
# Module-level state. The bus is a process-local singleton — we deliberately
# don't expose a class or constructor to avoid the "which bus is this?"
# confusion. Inside one Python process, there is exactly one bus.
_subscriptions: list[_Subscription] = []
_lock = threading.RLock()
def _matches(topic_segments: list[str], pattern_segments: list[str]) -> bool:
"""Glob-match a topic against a pattern.
``*`` matches exactly one segment. ``**`` matches zero or more segments.
Anywhere a literal segment appears in the pattern, the topic must match
exactly at that position.
Recursive implementation; patterns are short (typically 1-3 segments).
"""
# ``ti`` indexes into topic; ``pi`` indexes into pattern.
def _rec(ti: int, pi: int) -> bool:
# Both exhausted simultaneously → match.
if pi == len(pattern_segments):
return ti == len(topic_segments)
seg = pattern_segments[pi]
if seg == "**":
# Greedy: try consuming 0..N topic segments.
# First, try matching with ``**`` consuming zero topic segments.
if _rec(ti, pi + 1):
return True
# Then try consuming one more topic segment and recurse on the
# same ``**`` position.
if ti < len(topic_segments) and _rec(ti + 1, pi):
return True
return False
if ti == len(topic_segments):
# Pattern still has segments to consume but topic is exhausted.
return False
if seg == "*" or seg == topic_segments[ti]:
return _rec(ti + 1, pi + 1)
return False
return _rec(0, 0)
def subscribe(pattern: str, callback: SubscriberCallback) -> _Subscription:
"""Subscribe a callback to a topic glob pattern.
Returns a subscription handle. Pass it to :func:`unsubscribe` to remove.
The same callback may subscribe to the same pattern more than once; each
call returns a distinct handle. The bus does not deduplicate.
"""
if not pattern:
raise ValueError("subscribe pattern must be a non-empty string")
sub = _Subscription(pattern, callback)
with _lock:
_subscriptions.append(sub)
return sub
def unsubscribe(handle: _Subscription) -> bool:
"""Remove a subscription. Returns True if the handle was found, False
otherwise (idempotent — unsubscribing a removed handle is not an error).
"""
with _lock:
try:
_subscriptions.remove(handle)
return True
except ValueError:
return False
def publish(topic: str, payload: dict | None = None) -> None:
"""Publish an event on a topic.
``topic`` is a ``.``-segmented string (e.g. ``"tui.tool.start"``).
``payload`` is a dict of source-specific fields; the bus wraps it with the
minimal envelope (``type``, ``ts``, ``src``) before delivery.
Publishers that are relaying events from another process may pre-populate
``ts`` and/or ``src`` in the payload to preserve the originating values;
the bus only fills in missing keys.
This call always returns synchronously. Sync subscribers fire immediately;
async subscribers are scheduled via ``asyncio.create_task()`` when a
running loop is available. If no loop is running, async subscribers are
dropped with a single warning log line per emit.
"""
if not topic:
raise ValueError("publish topic must be a non-empty string")
# Build the delivered envelope. We do this once and share the dict across
# all subscribers — subscribers that mutate it are misusing the API, but
# we can't realistically guard against that without copying for every
# delivery (which would balloon cost for the ``**`` firehose case).
envelope = dict(payload) if payload else {}
envelope.setdefault("type", topic)
envelope.setdefault("ts", time.time())
envelope.setdefault("src", topic.split(".", 1)[0])
topic_segments = topic.split(".")
# Snapshot subscribers under the lock, then release before invoking
# callbacks — a subscriber that calls publish() or subscribe() from
# inside its callback must not deadlock on a sync callback.
with _lock:
matches = [
sub
for sub in _subscriptions
if _matches(topic_segments, sub.segments)
]
if not matches:
return
# Probe for a running event loop ONCE per publish call, not per
# subscriber. ``get_running_loop()`` only succeeds if we're being called
# from inside a coroutine or task; otherwise it raises ``RuntimeError``.
try:
loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
except RuntimeError:
loop = None
dropped_async = 0
for sub in matches:
try:
if sub.is_async:
if loop is None:
dropped_async += 1
continue
# Invoke the coroutine factory to get a coroutine, then schedule
# it on the running loop. The task object is intentionally not
# awaited — the bus is fire-and-forget.
coro = sub.callback(envelope)
if inspect.iscoroutine(coro):
loop.create_task(coro)
else:
result = sub.callback(envelope)
# A sync subscriber that accidentally returns a coroutine
# (e.g. someone changed sync→async without updating the
# registration) would leak the coroutine. Warn and discard.
if inspect.iscoroutine(result):
logger.warning(
"hermes_events: subscriber for %r returned a coroutine "
"but was not registered as async; coroutine dropped",
sub.pattern,
)
result.close()
except Exception:
# Subscriber exceptions are logged but never propagated. One
# bad subscriber must not break the publisher's main path or
# starve the rest of the subscribers on this emit.
logger.exception(
"hermes_events: subscriber for pattern %r raised on topic %r",
sub.pattern,
topic,
)
if dropped_async:
logger.warning(
"hermes_events: dropped %d async subscriber(s) on topic %r "
"(no running asyncio loop)",
dropped_async,
topic,
)
# -----------------------------------------------------------------------------
# Testing aids — not part of the documented public API.
# -----------------------------------------------------------------------------
def _reset_for_tests() -> None:
"""Clear all subscriptions. For unit tests only — must NOT be called from
production code or third-party plugins."""
with _lock:
_subscriptions.clear()
def _subscriber_count() -> int:
"""Return the current subscriber count. For unit tests only."""
with _lock:
return len(_subscriptions)
__all__ = ["publish", "subscribe", "unsubscribe"]

View File

@@ -217,7 +217,7 @@ hermes-agent = "run_agent:main"
hermes-acp = "acp_adapter.entry:main"
[tool.setuptools]
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_bootstrap", "hermes_constants", "hermes_state", "hermes_time", "hermes_logging", "utils"]
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_bootstrap", "hermes_constants", "hermes_events", "hermes_state", "hermes_time", "hermes_logging", "utils"]
[tool.setuptools.package-data]
hermes_cli = ["web_dist/**/*", "tui_dist/**/*", "scripts/install.sh", "scripts/install.ps1"]

402
tests/test_hermes_events.py Normal file
View File

@@ -0,0 +1,402 @@
"""Unit tests for the hermes_events pub/sub bus.
Covers:
- publish/subscribe round-trip
- glob pattern matching (`*` one segment, `**` zero or more)
- sync subscriber dispatch (fires in publisher stack)
- async subscriber dispatch via asyncio.create_task when a loop is running
- async subscriber drop-with-warning when no loop is running
- exception isolation (one bad subscriber doesn't kill others or raise)
- unsubscribe idempotence
- envelope auto-stamp of type/ts/src when missing
- envelope preservation of pre-populated ts/src (cross-process relay case)
- high-fanout micro-benchmark sanity check
"""
from __future__ import annotations
import asyncio
import logging
import time
import pytest
import hermes_events
from hermes_events import publish, subscribe, unsubscribe
@pytest.fixture(autouse=True)
def _reset_bus():
"""Reset bus state before each test. Per-file pytest isolation already
means cross-file leakage is impossible, but multiple tests in this file
share the same process — we want each test to start clean.
"""
hermes_events._reset_for_tests()
yield
hermes_events._reset_for_tests()
# ---------------------------------------------------------------------------
# Basic publish/subscribe round-trip
# ---------------------------------------------------------------------------
def test_publish_subscribe_round_trip():
received: list[dict] = []
subscribe("foo.bar", received.append)
publish("foo.bar", {"k": 1})
assert len(received) == 1
env = received[0]
assert env["type"] == "foo.bar"
assert env["src"] == "foo"
assert "ts" in env
assert env["k"] == 1
def test_publish_with_no_payload_still_delivers_envelope():
received: list[dict] = []
subscribe("alone", received.append)
publish("alone")
assert len(received) == 1
env = received[0]
assert env["type"] == "alone"
assert env["src"] == "alone"
assert isinstance(env["ts"], float)
def test_publish_empty_topic_raises():
with pytest.raises(ValueError):
publish("")
def test_subscribe_empty_pattern_raises():
with pytest.raises(ValueError):
subscribe("", lambda env: None)
# ---------------------------------------------------------------------------
# Glob pattern matching
# ---------------------------------------------------------------------------
def test_star_matches_exactly_one_segment():
received: list[str] = []
subscribe("tui.*", lambda env: received.append(env["type"]))
publish("tui.tool") # one segment after tui → matches
publish("tui.tool.start") # two segments after tui → does NOT match
publish("gateway.tool") # different prefix → does NOT match
publish("tui") # too few → does NOT match
assert received == ["tui.tool"]
def test_double_star_matches_zero_or_more_segments():
received: list[str] = []
subscribe("tui.**", lambda env: received.append(env["type"]))
publish("tui") # zero suffix segments
publish("tui.tool") # one
publish("tui.tool.start") # two
publish("gateway.tool") # different prefix → no match
received_after_match = list(received)
assert received_after_match == ["tui", "tui.tool", "tui.tool.start"]
def test_double_star_alone_is_firehose():
received: list[str] = []
subscribe("**", lambda env: received.append(env["type"]))
publish("a")
publish("a.b")
publish("a.b.c.d.e")
assert received == ["a", "a.b", "a.b.c.d.e"]
def test_mid_segment_double_star():
received: list[str] = []
subscribe("a.**.z", lambda env: received.append(env["type"]))
publish("a.z") # ** consumes zero
publish("a.b.z") # ** consumes one
publish("a.b.c.z") # ** consumes two
publish("a.b.c.y") # doesn't end with z → no match
publish("a") # too short → no match
assert received == ["a.z", "a.b.z", "a.b.c.z"]
def test_literal_segments_must_match_exactly():
received: list[str] = []
subscribe("gateway.agent.start", lambda env: received.append(env["type"]))
publish("gateway.agent.start") # exact match
publish("gateway.agent.end") # different last segment
publish("gateway.session.start") # different middle segment
assert received == ["gateway.agent.start"]
# ---------------------------------------------------------------------------
# Sync vs async subscriber dispatch
# ---------------------------------------------------------------------------
def test_sync_subscriber_fires_in_publisher_stack():
"""Sync subscribers must execute synchronously — the publisher's next
statement must see their side-effects already applied."""
received: list[dict] = []
def on_evt(env: dict) -> None:
received.append(env)
subscribe("sync.test", on_evt)
publish("sync.test", {"v": 42})
# No await, no sleep, no loop — must already be there.
assert len(received) == 1
assert received[0]["v"] == 42
@pytest.mark.asyncio
async def test_async_subscriber_dispatched_via_create_task():
"""Async subscribers fire via asyncio.create_task when a loop is running.
The publisher returns immediately; we await once for the task to run."""
received: list[dict] = []
async def on_evt(env: dict) -> None:
received.append(env)
subscribe("async.test", on_evt)
publish("async.test", {"v": 7})
# Publish returned synchronously; the task is scheduled but not yet run.
# Yield to the loop once so the task can fire.
await asyncio.sleep(0)
assert len(received) == 1
assert received[0]["v"] == 7
def test_async_subscriber_dropped_when_no_loop(caplog):
"""When no event loop is running, an async subscriber is dropped with
a single warning log line per emit. Sync subscribers still fire."""
async_received: list[dict] = []
sync_received: list[dict] = []
async def on_async(env: dict) -> None:
async_received.append(env)
def on_sync(env: dict) -> None:
sync_received.append(env)
subscribe("noloop.test", on_async)
subscribe("noloop.test", on_sync)
with caplog.at_level(logging.WARNING, logger="hermes_events"):
publish("noloop.test", {"v": 1})
# Async one was dropped (no loop running).
assert async_received == []
# Sync one still fired.
assert len(sync_received) == 1
# Warning was emitted.
assert any(
"dropped" in r.message and "noloop.test" in r.message
for r in caplog.records
), [r.message for r in caplog.records]
# ---------------------------------------------------------------------------
# Exception isolation
# ---------------------------------------------------------------------------
def test_exception_in_one_subscriber_does_not_block_others(caplog):
received: list[dict] = []
def boom(env: dict) -> None:
raise RuntimeError("subscriber test failure")
def ok(env: dict) -> None:
received.append(env)
# Order matters less than coverage — register boom first to confirm
# ok still runs after the exception path.
subscribe("err.test", boom)
subscribe("err.test", ok)
with caplog.at_level(logging.ERROR, logger="hermes_events"):
# publish() must not raise even though boom does.
publish("err.test", {"v": 1})
assert len(received) == 1
# The error was logged.
assert any(
"subscriber test failure" in (r.message + (r.exc_text or ""))
or "err.test" in r.message
for r in caplog.records
), [r.message for r in caplog.records]
def test_sync_subscriber_returning_coroutine_is_discarded(caplog):
"""A sync subscriber that accidentally returns a coroutine (e.g. someone
converted it to async without re-registering) should not leak the
coroutine. We discard with a warning."""
received: list[dict] = []
# Note: NOT declared async, but body returns a coroutine.
def sneaky(env: dict):
async def inner():
received.append(env)
return inner()
subscribe("sneak.test", sneaky)
with caplog.at_level(logging.WARNING, logger="hermes_events"):
publish("sneak.test")
# The inner coroutine was discarded — never awaited, never ran.
assert received == []
assert any("coroutine" in r.message for r in caplog.records), [
r.message for r in caplog.records
]
# ---------------------------------------------------------------------------
# Unsubscribe
# ---------------------------------------------------------------------------
def test_unsubscribe_removes_subscription():
received: list[dict] = []
h = subscribe("u.test", received.append)
publish("u.test")
assert len(received) == 1
assert unsubscribe(h) is True
publish("u.test")
assert len(received) == 1 # not delivered
# Idempotent: unsubscribing a removed handle is not an error.
assert unsubscribe(h) is False
def test_subscribe_same_pattern_twice_yields_distinct_handles():
received_a: list[dict] = []
received_b: list[dict] = []
h_a = subscribe("dup", received_a.append)
h_b = subscribe("dup", received_b.append)
assert h_a is not h_b
publish("dup")
assert len(received_a) == 1
assert len(received_b) == 1
unsubscribe(h_a)
publish("dup")
assert len(received_a) == 1
assert len(received_b) == 2
# ---------------------------------------------------------------------------
# Envelope auto-stamping and preservation
# ---------------------------------------------------------------------------
def test_envelope_autostamps_type_ts_src_when_missing():
received: list[dict] = []
subscribe("**", received.append)
before = time.time()
publish("ns.evt", {"x": 1})
after = time.time()
env = received[0]
assert env["type"] == "ns.evt"
assert env["src"] == "ns"
assert before <= env["ts"] <= after
assert env["x"] == 1
def test_envelope_preserves_prepopulated_ts_and_src():
"""For the cross-process relay case: the dashboard's bridge ingestor
re-publishes a frame received from the gateway, with the gateway's
original ts and src already in the payload. The bus must not overwrite."""
received: list[dict] = []
subscribe("**", received.append)
publish(
"gateway.agent.start",
{"ts": 1000.5, "src": "remote-gateway", "platform": "telegram"},
)
env = received[0]
assert env["ts"] == 1000.5
assert env["src"] == "remote-gateway"
assert env["type"] == "gateway.agent.start" # type is always set to topic
assert env["platform"] == "telegram"
def test_envelope_does_not_overwrite_caller_provided_type():
"""If the caller pre-populates `type` (e.g. relaying a frame whose topic
name differs slightly from the current one), the pre-populated value
wins — same rule as ts/src."""
received: list[dict] = []
subscribe("**", received.append)
publish("foo.bar", {"type": "foo.bar.relayed", "k": 1})
env = received[0]
assert env["type"] == "foo.bar.relayed"
# ---------------------------------------------------------------------------
# High-fanout micro-benchmark
# ---------------------------------------------------------------------------
def test_high_fanout_publish_is_sub_millisecond():
"""100 subscribers on a hot topic must dispatch in well under a
millisecond on any reasonable machine. This is a sanity check, not a
perf gate — generous bound so it doesn't flake on busy CI."""
counts: list[int] = [0] * 100
def make_cb(i: int):
def cb(env: dict) -> None:
counts[i] += 1
return cb
for i in range(100):
subscribe("fan", make_cb(i))
start = time.perf_counter()
publish("fan", {"k": 1})
elapsed = time.perf_counter() - start
assert all(c == 1 for c in counts)
assert elapsed < 0.05, f"100-way fanout took {elapsed*1000:.2f}ms (>50ms)"
# ---------------------------------------------------------------------------
# Internal helpers (sanity)
# ---------------------------------------------------------------------------
def test_subscriber_count_helper():
assert hermes_events._subscriber_count() == 0
h1 = subscribe("a", lambda e: None)
h2 = subscribe("b", lambda e: None)
assert hermes_events._subscriber_count() == 2
unsubscribe(h1)
unsubscribe(h2)
assert hermes_events._subscriber_count() == 0

View File

@@ -0,0 +1,264 @@
"""Integration test for the cross-process event bridge.
The dashboard process receives bus relay frames on ``/api/pub`` from two
sources:
1. The TUI sidecar (legacy JSON-RPC ``{method: "event", params: {...}}``)
2. The standalone gateway process (new ``{_bus_relay: true, topic, envelope}``)
This test exercises the dashboard-side ingestor (``_republish_pub_frame_to_bus``
in ``hermes_cli/web_server.py``) directly: we feed it synthetic frames in
both shapes and assert the local bus delivers them to subscribers with the
correct topics + envelopes.
We deliberately don't spawn a real subprocess. The wire format and the
ingestor logic are what matter; the WebSocket transport itself is exercised
by the production smoke test (Phase 17 manual smoke) and by upstream
``websockets`` library tests.
"""
from __future__ import annotations
import json
import logging
import pytest
import hermes_events
from hermes_cli.web_server import _republish_pub_frame_to_bus
@pytest.fixture(autouse=True)
def _reset_bus():
hermes_events._reset_for_tests()
yield
hermes_events._reset_for_tests()
# ---------------------------------------------------------------------------
# Shape 2: bus relay (the gateway → dashboard path)
# ---------------------------------------------------------------------------
def test_bus_relay_frame_publishes_with_preserved_envelope():
"""A {_bus_relay: true, topic, envelope} frame is re-published verbatim,
preserving the originating ts and src so subscribers see the gateway's
original timestamp rather than a re-stamped one."""
received: list[dict] = []
hermes_events.subscribe("**", received.append)
# Simulate a frame the gateway-side bridge would send: a
# ``gateway.agent.start`` envelope it built ~5 seconds ago.
gateway_envelope = {
"type": "gateway.agent.start",
"ts": 100.0,
"src": "gateway",
"platform": "telegram",
"session_id": "abc-123",
}
frame = json.dumps(
{
"_bus_relay": True,
"topic": "gateway.agent.start",
"envelope": gateway_envelope,
}
)
_republish_pub_frame_to_bus(frame)
assert len(received) == 1
env = received[0]
assert env["type"] == "gateway.agent.start"
assert env["ts"] == 100.0, "originating ts must be preserved, not re-stamped"
assert env["src"] == "gateway", "originating src must be preserved"
assert env["platform"] == "telegram"
assert env["session_id"] == "abc-123"
def test_bus_relay_with_arbitrary_topic_passes_through():
"""The dashboard ingestor publishes whatever topic the gateway sent; it
does not inspect or validate the topic namespace beyond shape."""
received: list[str] = []
hermes_events.subscribe("**", lambda env: received.append(env["type"]))
frame = json.dumps(
{
"_bus_relay": True,
"topic": "agent.iteration",
"envelope": {"type": "agent.iteration", "ts": 1.0, "src": "agent", "n": 5},
}
)
_republish_pub_frame_to_bus(frame)
assert received == ["agent.iteration"]
# ---------------------------------------------------------------------------
# Shape 1: TUI sidecar JSON-RPC (the TUI → dashboard path)
# ---------------------------------------------------------------------------
def test_tui_jsonrpc_frame_publishes_as_tui_namespace():
"""A {jsonrpc, method:"event", params:{type, session_id, payload}} frame
becomes a `tui.<type>` topic publish on the local bus."""
received: list[dict] = []
hermes_events.subscribe("tui.**", received.append)
frame = json.dumps(
{
"jsonrpc": "2.0",
"method": "event",
"params": {
"type": "tool.start",
"session_id": "sess-1",
"payload": {"name": "web_search", "preview": "foo bar"},
},
}
)
_republish_pub_frame_to_bus(frame)
assert len(received) == 1
env = received[0]
assert env["type"] == "tui.tool.start"
assert env["src"] == "tui"
assert env["session_id"] == "sess-1"
assert env["name"] == "web_search"
assert env["preview"] == "foo bar"
# ts is auto-stamped (TUI JSON-RPC frames don't carry one).
assert isinstance(env["ts"], float)
def test_tui_jsonrpc_frame_without_payload_still_publishes():
received: list[dict] = []
hermes_events.subscribe("tui.**", received.append)
frame = json.dumps(
{
"jsonrpc": "2.0",
"method": "event",
"params": {"type": "message.start", "session_id": "s1"},
}
)
_republish_pub_frame_to_bus(frame)
assert len(received) == 1
assert received[0]["type"] == "tui.message.start"
assert received[0]["session_id"] == "s1"
def test_tui_jsonrpc_frame_drops_silently_when_type_missing():
received: list[dict] = []
hermes_events.subscribe("**", received.append)
frame = json.dumps(
{
"jsonrpc": "2.0",
"method": "event",
"params": {"session_id": "s1"}, # no `type` field
}
)
_republish_pub_frame_to_bus(frame)
assert received == []
# ---------------------------------------------------------------------------
# Malformed frames are best-effort (must not raise)
# ---------------------------------------------------------------------------
def test_invalid_json_is_silently_ignored():
"""A malformed frame must not raise — the production pub_ws handler
relies on this so a single bad frame doesn't kill the WS receive loop."""
received: list[dict] = []
hermes_events.subscribe("**", received.append)
_republish_pub_frame_to_bus("not json at all {{{")
_republish_pub_frame_to_bus("")
_republish_pub_frame_to_bus("[1, 2, 3]") # array, not dict
assert received == []
def test_unknown_frame_shape_is_silently_ignored():
received: list[dict] = []
hermes_events.subscribe("**", received.append)
# Looks like JSON-RPC but method is not "event" — ignore.
_republish_pub_frame_to_bus(json.dumps({"jsonrpc": "2.0", "method": "ping"}))
# Looks like bus_relay but topic missing — ignore.
_republish_pub_frame_to_bus(json.dumps({"_bus_relay": True, "envelope": {}}))
# Neither shape — ignore.
_republish_pub_frame_to_bus(json.dumps({"hello": "world"}))
assert received == []
# ---------------------------------------------------------------------------
# Subscriber sees both sources via a single pattern
# ---------------------------------------------------------------------------
def test_plugin_can_subscribe_to_both_sources_with_one_pattern():
"""The point of the bridge: a plugin subscribing to e.g. `**` should see
TUI events (auto-stamped) AND gateway events (preserved ts) interleaved
via a single subscribe() call — no need for per-source plumbing."""
received: list[tuple[str, float]] = []
hermes_events.subscribe(
"**", lambda env: received.append((env["type"], env["ts"]))
)
# Gateway-relayed event with preserved ts.
_republish_pub_frame_to_bus(
json.dumps(
{
"_bus_relay": True,
"topic": "gateway.agent.start",
"envelope": {"type": "gateway.agent.start", "ts": 100.0, "src": "gateway"},
}
)
)
# TUI event auto-stamped.
_republish_pub_frame_to_bus(
json.dumps(
{
"jsonrpc": "2.0",
"method": "event",
"params": {"type": "tool.start", "session_id": "s1"},
}
)
)
assert len(received) == 2
topics = [t for t, _ in received]
assert topics == ["gateway.agent.start", "tui.tool.start"]
# Gateway ts preserved; TUI ts is auto-stamped (current time, > 100).
assert received[0][1] == 100.0
assert received[1][1] > 100.0
# ---------------------------------------------------------------------------
# The pub_ws's outer try/except catches our errors too
# ---------------------------------------------------------------------------
def test_subscriber_exception_does_not_propagate(caplog):
"""The bus already isolates subscriber exceptions; this confirms that
when called from the ingestor, the exception is still contained."""
def boom(env):
raise RuntimeError("subscriber test failure")
hermes_events.subscribe("**", boom)
with caplog.at_level(logging.ERROR):
# Must not raise.
_republish_pub_frame_to_bus(
json.dumps(
{
"_bus_relay": True,
"topic": "any",
"envelope": {"type": "any", "ts": 1.0, "src": "test"},
}
)
)

View File

@@ -17,6 +17,7 @@ from typing import Any, Optional
from hermes_constants import get_hermes_home
from hermes_cli.env_loader import load_hermes_dotenv
import hermes_events
from utils import is_truthy_value
from tui_gateway.transport import (
StdioTransport,
@@ -388,6 +389,26 @@ def _emit(event: str, sid: str, payload: dict | None = None):
if payload is not None:
params["payload"] = payload
write_json({"jsonrpc": "2.0", "method": "event", "params": params})
# Also publish on the generic event bus (hermes_events) so plugins
# (orb today, achievements / observability / debug taps tomorrow) can
# subscribe to TUI lifecycle without needing per-session WS plumbing.
# The bus payload mirrors the JSON-RPC params: ``session_id`` plus the
# event-specific ``payload`` dict (if any), flattened so subscribers
# don't need to dig into a nested envelope. The topic is prefixed with
# ``tui.`` to namespace it from gateway/agent events. See docs/events.md.
try:
bus_payload: dict = {"session_id": sid}
if payload is not None:
bus_payload.update(payload)
hermes_events.publish(f"tui.{event}", bus_payload)
except Exception:
# The bus is best-effort plumbing for plugins; a publish failure
# must never disturb the JSON-RPC emit that drives the React TUI.
# hermes_events itself already logs subscriber failures internally
# — this catch only handles a publish-side error (e.g. someone
# passed an unhashable topic), which shouldn't happen but isn't
# worth a crash if it does.
pass
def _status_update(sid: str, kind: str, text: str | None = None):