mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 20:29:00 +08:00
Compare commits
6 Commits
docs/execu
...
hermes-eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1ab55e000b | ||
|
|
952ce45bf0 | ||
|
|
4872bef3bd | ||
|
|
0244207b36 | ||
|
|
315293e9fb | ||
|
|
2316d2a225 |
173
docs/events.md
Normal file
173
docs/events.md
Normal file
@@ -0,0 +1,173 @@
|
||||
# Hermes Events Bus
|
||||
|
||||
> ⚠️ **EXPERIMENTAL.** Topic names and payload shapes in this document are
|
||||
> explicitly prone to change. The bus is shipped now so plugin authors can
|
||||
> start consuming it, but you should expect to update your topic globs and
|
||||
> payload-field accesses over the next few releases. We will declare a
|
||||
> stable v1.0 schema once the orb (and at least one more consumer) have
|
||||
> shaken out the design — at that point we'll add a proper deprecation
|
||||
> policy. Until then, breakage is on the menu.
|
||||
|
||||
`hermes_events` is the in-process pub/sub bus that lets core components
|
||||
(the embedded TUI, the gateway, the agent loop, the cron scheduler, ...)
|
||||
broadcast lifecycle events that plugins (and other core components) can
|
||||
subscribe to. It is the substrate that drives the orb's `SceneState`
|
||||
machine and is intended to be the substrate that drives any future
|
||||
"what is Hermes doing right now" widget, observability shipper, or
|
||||
debug-tap plugin.
|
||||
|
||||
## API surface
|
||||
|
||||
```python
|
||||
from hermes_events import publish, subscribe, unsubscribe
|
||||
|
||||
# Sources publish — always sync, even from inside async contexts.
|
||||
publish("tui.tool.start", {"name": "web_search", "session_id": "abc"})
|
||||
publish("gateway.agent.start", {"platform": "telegram", "session_id": "xyz"})
|
||||
|
||||
# Plugins subscribe with glob patterns.
|
||||
handle = subscribe("tui.tool.*", on_tool_event) # one segment after tool.
|
||||
handle = subscribe("gateway.agent.*", on_agent_event)
|
||||
handle = subscribe("tui.**", any_tui_topic) # any number of segments
|
||||
handle = subscribe("**", on_anything) # firehose
|
||||
unsubscribe(handle)
|
||||
```
|
||||
|
||||
- Topics are `.`-segmented.
|
||||
- Pattern globs: `*` matches one segment; `**` matches zero or more segments.
|
||||
- `publish()` is **synchronous**. Async publishers (`async def emit(): ...`)
|
||||
just call sync `publish()` from inside their coroutine — no `await` needed.
|
||||
- Subscribers may be sync or async. Sync subscribers fire immediately in the
|
||||
publisher's stack. Async subscribers are scheduled via
|
||||
`asyncio.create_task()` if a running event loop is detected. **If no loop
|
||||
is running** (unit tests outside `pytest-asyncio`, startup-before-loop),
|
||||
async subscribers are dropped for that emit with a warning log line; sync
|
||||
subscribers still fire.
|
||||
- Subscriber exceptions are logged but never raised back to the publisher.
|
||||
|
||||
## Envelope
|
||||
|
||||
Every published payload is wrapped before delivery:
|
||||
|
||||
```python
|
||||
{
|
||||
"type": "tui.tool.start", # always the topic the publisher used
|
||||
"ts": 1716938400.123, # unix seconds (float)
|
||||
"src": "tui", # first dot-segment of `type`
|
||||
# ...source-specific fields, freely evolving during the experimental phase
|
||||
}
|
||||
```
|
||||
|
||||
`type`, `ts`, and `src` are always present. The bus auto-stamps `ts` (current
|
||||
`time.time()`) and `src` (first segment of the topic) only if the publisher
|
||||
omits them. **Relayed events** (e.g. the dashboard receiving a frame from a
|
||||
remote gateway process) should pre-populate `ts` and `src` so the originating
|
||||
values are preserved — the bus does not overwrite existing keys.
|
||||
|
||||
## Current topic taxonomy
|
||||
|
||||
All topic names below are **experimental**. Plugins should subscribe to
|
||||
the most general pattern that meets their needs and tolerate the addition
|
||||
of new sibling topics over time.
|
||||
|
||||
### `tui.*` — emitted by the dashboard's TUI sidecar
|
||||
|
||||
| Topic | When it fires |
|
||||
|---|---|
|
||||
| `tui.message.start` | The model begins emitting a new assistant message. |
|
||||
| `tui.message.delta` | A streaming-token chunk of the current assistant message. |
|
||||
| `tui.message.complete` | The assistant message has finished streaming. |
|
||||
| `tui.tool.start` | A tool call has begun execution. |
|
||||
| `tui.tool.progress` | A long-running tool reports incremental output. |
|
||||
| `tui.tool.complete` | A tool call finished (success or failure). |
|
||||
| `tui.tool.generating` | The model is mid-stream of constructing a tool call's arguments. |
|
||||
| `tui.reasoning.delta` | A streaming-token chunk of the model's reasoning content. |
|
||||
| `tui.reasoning.available` | A full reasoning block became available (non-streaming case). |
|
||||
| `tui.error` | An error frame propagated through the TUI. |
|
||||
|
||||
Common payload fields (best-effort, not guaranteed):
|
||||
- `session_id` (str) — TUI session this event belongs to.
|
||||
- `name` (str) — for tool events, the tool's name.
|
||||
- `preview` (str) — for tool events, a short preview of args or output.
|
||||
|
||||
### `gateway.*` — emitted by the messaging gateway
|
||||
|
||||
| Topic | When it fires |
|
||||
|---|---|
|
||||
| `gateway.startup` | The gateway process has finished initializing. |
|
||||
| `gateway.agent.start` | The agent has begun processing a user message. |
|
||||
| `gateway.agent.step` | Each iteration of the agent's tool-calling loop. |
|
||||
| `gateway.agent.end` | The agent has finished processing a user message. |
|
||||
| `gateway.session.start` | A new session was created (first message of a new session). |
|
||||
| `gateway.session.end` | A session ended (user ran `/new` or `/reset`). |
|
||||
| `gateway.session.reset` | A session reset completed; a new session entry was created. |
|
||||
| `gateway.command.<name>` | Any slash command was executed. Wildcard-friendly via `gateway.command.*`. |
|
||||
|
||||
Common payload fields:
|
||||
- `platform` (str) — `"telegram"`, `"discord"`, `"slack"`, etc.
|
||||
- `session_id` (str) — the gateway session this event belongs to.
|
||||
|
||||
### Future namespaces
|
||||
|
||||
Topics under namespaces we haven't shipped yet are reserved. If you see
|
||||
events fire under a namespace not listed here, that's a leak from in-flight
|
||||
work — please don't subscribe to it; the topic may be renamed or removed
|
||||
before the next release.
|
||||
|
||||
## Cross-process delivery
|
||||
|
||||
Each process has its own bus instance. Bridges ship events between
|
||||
processes:
|
||||
|
||||
- **TUI sidecar → dashboard:** the sidecar runs as a PTY-spawned subprocess
|
||||
of the dashboard. A default `subscribe("**", ship_via_pub_ws)` on the TUI
|
||||
side forwards every event to the dashboard via the `/api/pub` WebSocket,
|
||||
which re-publishes them onto the dashboard's local bus.
|
||||
- **Gateway → dashboard:** the gateway process opens a WebSocket to the
|
||||
dashboard at startup and ships its events the same way. If the dashboard
|
||||
is offline, the bridge silently no-ops; gateway runs are never blocked.
|
||||
- **Same-process publishers** (e.g. an embedded gateway in the dashboard
|
||||
process) hit the local bus directly — no bridge involved.
|
||||
|
||||
This means a plugin's `plugin_api.py` running in the dashboard process
|
||||
sees events from all three sources via a single `subscribe(...)` call.
|
||||
Subscription patterns do not change based on where events originated.
|
||||
|
||||
## For plugin authors
|
||||
|
||||
A typical plugin backend wires up subscriptions in its `register()` (or
|
||||
at import time, since `plugin_api.py` is imported once at dashboard
|
||||
startup):
|
||||
|
||||
```python
|
||||
from fastapi import APIRouter
|
||||
from hermes_events import subscribe
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
def _on_tool_event(envelope: dict) -> None:
|
||||
# Process the envelope, push to your WebSocket subscribers, etc.
|
||||
...
|
||||
|
||||
subscribe("tui.tool.*", _on_tool_event)
|
||||
subscribe("gateway.agent.*", _on_tool_event)
|
||||
```
|
||||
|
||||
If you need to subscribe asynchronously (e.g. to drive a per-subscriber
|
||||
`asyncio.Queue` in a long-lived FastAPI WebSocket handler), register an
|
||||
`async def` callback — the bus will `asyncio.create_task()` it inside the
|
||||
dashboard's running event loop.
|
||||
|
||||
For a fully-worked example, see `plugins/orb/dashboard/plugin_api.py`.
|
||||
|
||||
## Stability declaration
|
||||
|
||||
Until further notice (read: until a `docs/events-v1.md` lands and this
|
||||
file is renamed to `docs/events-experimental.md`), assume any topic name
|
||||
or payload field may be renamed, removed, or split into multiple events
|
||||
in any release.
|
||||
|
||||
The **shape of the bus itself** (the `publish` / `subscribe` /
|
||||
`unsubscribe` Python API, the envelope auto-stamping rule, the glob
|
||||
pattern syntax) is more stable — we intend to preserve it through v1.0.
|
||||
But the **content** flowing through it is the experimental part.
|
||||
246
gateway/event_bridge.py
Normal file
246
gateway/event_bridge.py
Normal file
@@ -0,0 +1,246 @@
|
||||
"""Cross-process bridge: ship local ``hermes_events`` to the dashboard.
|
||||
|
||||
The gateway runs as its own process (``hermes gateway run``) — separate from
|
||||
the dashboard's web server. Plugins like the orb live in the dashboard
|
||||
process and subscribe to its local bus instance. This module forwards
|
||||
locally-published events to the dashboard so its plugins see them.
|
||||
|
||||
Wire protocol
|
||||
-------------
|
||||
Frames are JSON dicts with the shape::
|
||||
|
||||
{
|
||||
"_bus_relay": true,
|
||||
"topic": "<topic, e.g. gateway.agent.start>",
|
||||
"envelope": {<full hermes_events envelope: type, ts, src, ...>},
|
||||
}
|
||||
|
||||
The dashboard's ``/api/pub`` receiver recognises this shape and re-publishes
|
||||
verbatim onto its local bus. ``ts`` and ``src`` are preserved (the bus only
|
||||
auto-stamps missing keys), so subscribers see the original gateway-side
|
||||
timestamp.
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
The bridge reads two environment variables at start time:
|
||||
|
||||
- ``HERMES_DASHBOARD_EVENT_URL`` — ws://host:port/api/pub URL (no query string;
|
||||
this module appends ``token`` and ``channel``).
|
||||
- ``HERMES_DASHBOARD_EVENT_TOKEN`` — the dashboard's session token. The
|
||||
dashboard's ``_SESSION_TOKEN`` is process-local-ephemeral, so until a
|
||||
handoff mechanism lands this must be supplied by whoever launches the
|
||||
gateway. Without it the bridge no-ops (no exception, no spam — just a
|
||||
single debug log line at startup).
|
||||
|
||||
Failure mode
|
||||
------------
|
||||
Best-effort. Connection failures, drops, and per-frame send errors are
|
||||
logged at debug and the bridge silently retries with exponential backoff.
|
||||
``publish()`` is sync; the bridge enqueues frames on a daemon thread so
|
||||
the publisher's main path never blocks.
|
||||
|
||||
If both env vars are missing the bridge does not start — this is the
|
||||
common case for gateways running standalone (no dashboard at all).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import hermes_events
|
||||
from hermes_events import _Subscription
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Module-level state. ``start()`` is idempotent; calling twice replaces the
|
||||
# bridge but does not duplicate it.
|
||||
_lock = threading.Lock()
|
||||
_bridge: Optional["_Bridge"] = None
|
||||
|
||||
|
||||
class _Bridge:
|
||||
"""Background thread that ships local bus events to the dashboard."""
|
||||
|
||||
_QUEUE_MAX = 1024
|
||||
|
||||
def __init__(self, url: str, token: str, channel: str = "gateway") -> None:
|
||||
self._url = url
|
||||
self._token = token
|
||||
self._channel = channel
|
||||
self._queue: queue.Queue = queue.Queue(maxsize=self._QUEUE_MAX)
|
||||
self._stop = threading.Event()
|
||||
self._worker: Optional[threading.Thread] = None
|
||||
self._subscription: Optional[_Subscription] = None
|
||||
|
||||
def start(self) -> None:
|
||||
# Subscribe to ``**`` first — every published event lands in the
|
||||
# outgoing queue. The worker thread drains the queue and ships via
|
||||
# WS. If the WS isn't connected yet the queue absorbs the burst.
|
||||
self._subscription = hermes_events.subscribe("**", self._enqueue)
|
||||
self._worker = threading.Thread(
|
||||
target=self._run,
|
||||
name="hermes-event-bridge",
|
||||
daemon=True,
|
||||
)
|
||||
self._worker.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
if self._subscription is not None:
|
||||
hermes_events.unsubscribe(self._subscription)
|
||||
self._subscription = None
|
||||
self._stop.set()
|
||||
# Nudge the queue so a blocked drain returns promptly.
|
||||
try:
|
||||
self._queue.put_nowait(None)
|
||||
except queue.Full:
|
||||
pass
|
||||
|
||||
def _enqueue(self, envelope: dict) -> None:
|
||||
"""Subscriber callback. Skip ``_bus_relay``-tagged envelopes — those
|
||||
came IN via the dashboard's pub_ws, and re-shipping would loop them
|
||||
back. We only forward locally-originated events."""
|
||||
if envelope.get("_relayed"):
|
||||
return
|
||||
topic = envelope.get("type", "")
|
||||
if not isinstance(topic, str) or not topic:
|
||||
return
|
||||
# Skip echo: don't ship events whose ``src`` indicates they came
|
||||
# from somewhere other than this process. The dashboard ingestor
|
||||
# doesn't tag with ``_relayed``, but it does preserve the original
|
||||
# ``src``. If the gateway ever subscribes to dashboard-originated
|
||||
# events (e.g. ``tui.*``), shipping them back would create a loop.
|
||||
# For now we accept anything originating with src starting with
|
||||
# ``gateway`` or ``agent``; everything else is presumed inbound.
|
||||
src = envelope.get("src", "")
|
||||
if isinstance(src, str) and not src.startswith(("gateway", "agent")):
|
||||
return
|
||||
frame = {
|
||||
"_bus_relay": True,
|
||||
"topic": topic,
|
||||
"envelope": dict(envelope),
|
||||
}
|
||||
try:
|
||||
self._queue.put_nowait(frame)
|
||||
except queue.Full:
|
||||
# Drop oldest, enqueue newest. Bus events are best-effort; a
|
||||
# blocked subscriber must never wedge the publisher's main path.
|
||||
try:
|
||||
self._queue.get_nowait()
|
||||
except queue.Empty:
|
||||
pass
|
||||
try:
|
||||
self._queue.put_nowait(frame)
|
||||
except queue.Full:
|
||||
# Pathological: queue still full after a get. Drop.
|
||||
pass
|
||||
|
||||
def _run(self) -> None:
|
||||
backoff = 1.0
|
||||
max_backoff = 30.0
|
||||
while not self._stop.is_set():
|
||||
ws = self._connect()
|
||||
if ws is None:
|
||||
# Sleep with stop-aware wait. .wait returns True if stop
|
||||
# was set, in which case we exit immediately.
|
||||
if self._stop.wait(backoff):
|
||||
return
|
||||
backoff = min(backoff * 2, max_backoff)
|
||||
continue
|
||||
backoff = 1.0 # reset on successful connect
|
||||
try:
|
||||
self._drain(ws)
|
||||
except Exception as exc:
|
||||
logger.debug("event_bridge: drain error: %s", exc)
|
||||
finally:
|
||||
try:
|
||||
ws.close() # type: ignore[union-attr]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _connect(self):
|
||||
try:
|
||||
from websockets.sync.client import connect as ws_connect
|
||||
except ImportError: # pragma: no cover
|
||||
logger.debug("event_bridge: 'websockets' package not installed")
|
||||
return None
|
||||
qs = urlencode({"token": self._token, "channel": self._channel})
|
||||
url = f"{self._url}?{qs}" if "?" not in self._url else f"{self._url}&{qs}"
|
||||
try:
|
||||
return ws_connect(url, open_timeout=2.0, max_size=None)
|
||||
except Exception as exc:
|
||||
logger.debug("event_bridge: connect to %s failed: %s", self._url, exc)
|
||||
return None
|
||||
|
||||
def _drain(self, ws) -> None:
|
||||
"""Pop frames off the queue and send via WS until error or stop."""
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
item = self._queue.get(timeout=1.0)
|
||||
except queue.Empty:
|
||||
continue
|
||||
if item is None:
|
||||
return # explicit shutdown sentinel
|
||||
try:
|
||||
ws.send(json.dumps(item, ensure_ascii=False))
|
||||
except Exception as exc:
|
||||
logger.debug("event_bridge: send failed, will reconnect: %s", exc)
|
||||
# Re-queue the lost frame at the front so we don't drop it
|
||||
# on a transient blip. Best-effort: the queue is FIFO, so
|
||||
# this lands at the back, not the front — acceptable for v1.
|
||||
try:
|
||||
self._queue.put_nowait(item)
|
||||
except queue.Full:
|
||||
pass
|
||||
return
|
||||
|
||||
|
||||
def start_if_configured() -> bool:
|
||||
"""Read env vars and start the bridge if both are set.
|
||||
|
||||
Returns True if the bridge was started, False otherwise. Idempotent —
|
||||
a second call replaces the previous bridge.
|
||||
"""
|
||||
global _bridge
|
||||
|
||||
url = os.environ.get("HERMES_DASHBOARD_EVENT_URL", "").strip()
|
||||
token = os.environ.get("HERMES_DASHBOARD_EVENT_TOKEN", "").strip()
|
||||
if not url or not token:
|
||||
logger.debug(
|
||||
"event_bridge: HERMES_DASHBOARD_EVENT_URL/TOKEN not set — "
|
||||
"gateway bus events will not reach the dashboard"
|
||||
)
|
||||
return False
|
||||
|
||||
channel = os.environ.get("HERMES_DASHBOARD_EVENT_CHANNEL", "gateway").strip() or "gateway"
|
||||
|
||||
with _lock:
|
||||
if _bridge is not None:
|
||||
_bridge.stop()
|
||||
_bridge = _Bridge(url=url, token=token, channel=channel)
|
||||
_bridge.start()
|
||||
|
||||
logger.info(
|
||||
"event_bridge: shipping local bus events to dashboard at %s (channel=%s)",
|
||||
url,
|
||||
channel,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def stop() -> None:
|
||||
"""Stop the bridge if running. Idempotent."""
|
||||
global _bridge
|
||||
with _lock:
|
||||
if _bridge is not None:
|
||||
_bridge.stop()
|
||||
_bridge = None
|
||||
|
||||
|
||||
__all__ = ["start_if_configured", "stop"]
|
||||
@@ -27,6 +27,7 @@ from typing import Any, Callable, Dict, List, Optional
|
||||
import yaml
|
||||
|
||||
from hermes_cli.config import get_hermes_home
|
||||
import hermes_events
|
||||
|
||||
|
||||
HOOKS_DIR = get_hermes_home() / "hooks"
|
||||
@@ -171,6 +172,24 @@ class HookRegistry:
|
||||
if context is None:
|
||||
context = {}
|
||||
|
||||
# Publish on the generic event bus (hermes_events) so plugins can
|
||||
# subscribe to gateway lifecycle without having to register a
|
||||
# full HOOK.yaml + handler.py. Translate the legacy ``:`` separator
|
||||
# to the bus's ``.`` so a subscriber can match ``gateway.agent.*``
|
||||
# or ``gateway.command.<name>``. ``publish()`` is sync — safe to
|
||||
# call from inside this coroutine without ``await``; async
|
||||
# subscribers are scheduled by the bus via asyncio.create_task().
|
||||
# See docs/events.md for the topic taxonomy.
|
||||
try:
|
||||
hermes_events.publish(
|
||||
f"gateway.{event_type.replace(':', '.')}", dict(context)
|
||||
)
|
||||
except Exception:
|
||||
# Bus failures must never block the legacy hook pipeline. The
|
||||
# bus already logs subscriber exceptions internally; this
|
||||
# catch only handles a (very unlikely) publish-side error.
|
||||
pass
|
||||
|
||||
for fn in self._resolve_handlers(event_type):
|
||||
try:
|
||||
result = fn(event_type, context)
|
||||
|
||||
@@ -4081,6 +4081,19 @@ class GatewayRunner:
|
||||
# Discover and load event hooks
|
||||
self.hooks.discover_and_load()
|
||||
|
||||
# Optionally bridge local hermes_events bus → dashboard so plugins
|
||||
# in the dashboard process (orb, etc.) see gateway lifecycle events.
|
||||
# No-op when HERMES_DASHBOARD_EVENT_URL/TOKEN env vars are unset
|
||||
# (the common standalone-gateway case). See gateway/event_bridge.py.
|
||||
try:
|
||||
from gateway import event_bridge
|
||||
event_bridge.start_if_configured()
|
||||
except Exception:
|
||||
logger.debug(
|
||||
"gateway/event_bridge.start_if_configured failed; "
|
||||
"continuing without dashboard event relay",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
# Recover background processes from checkpoint (crash recovery)
|
||||
try:
|
||||
|
||||
@@ -33,6 +33,7 @@ if str(PROJECT_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from hermes_cli import __version__, __release_date__
|
||||
import hermes_events # type: ignore[import-not-found]
|
||||
from hermes_cli.config import (
|
||||
cfg_get,
|
||||
DEFAULT_CONFIG,
|
||||
@@ -3755,11 +3756,79 @@ async def pub_ws(ws: WebSocket) -> None:
|
||||
|
||||
try:
|
||||
while True:
|
||||
await _broadcast_event(channel, await ws.receive_text())
|
||||
frame = await ws.receive_text()
|
||||
# Legacy behavior: fan the frame out to every /api/events
|
||||
# subscriber on this channel id. Drives the React sidebar's
|
||||
# live tool-call feed and keeps the existing per-session pubsub
|
||||
# working exactly as before.
|
||||
await _broadcast_event(channel, frame)
|
||||
# Phase 4 addition: also re-publish onto the local hermes_events
|
||||
# bus so plugin_api.py modules (orb today, others later) can
|
||||
# subscribe to TUI/gateway events without needing to know about
|
||||
# WebSocket plumbing. Frame parsing is best-effort and never
|
||||
# blocks the legacy fanout above — a malformed frame is logged
|
||||
# at debug and discarded for bus purposes only.
|
||||
try:
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
except Exception:
|
||||
_log.debug("pub_ws: bus republish failed for one frame", exc_info=True)
|
||||
except WebSocketDisconnect:
|
||||
pass
|
||||
|
||||
|
||||
def _republish_pub_frame_to_bus(frame: str) -> None:
|
||||
"""Best-effort: parse a frame received on ``/api/pub`` and re-publish it
|
||||
onto the local ``hermes_events`` bus.
|
||||
|
||||
Two frame shapes are recognised:
|
||||
|
||||
1. **TUI sidecar JSON-RPC** ``{"jsonrpc": "2.0", "method": "event",
|
||||
"params": {"type": <evt>, "session_id": <sid>, "payload": {...}}}``
|
||||
— published as topic ``tui.<evt>`` with ``session_id`` + flattened
|
||||
``payload`` keys. The bus auto-stamps ``ts`` and ``src``.
|
||||
|
||||
2. **Bus relay** ``{"_bus_relay": true, "topic": <topic>,
|
||||
"envelope": {<full envelope incl. type/ts/src/...>}}`` — published
|
||||
verbatim onto the bus under ``topic``. The envelope's ``ts`` and
|
||||
``src`` are preserved (the bus only stamps missing keys), which is
|
||||
critical for cross-process events where we want the original
|
||||
timestamp from the gateway process.
|
||||
|
||||
Anything else is ignored silently.
|
||||
"""
|
||||
try:
|
||||
obj = json.loads(frame)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
return
|
||||
if not isinstance(obj, dict):
|
||||
return
|
||||
|
||||
# Shape 2: bus relay (gateway → dashboard, or future relays)
|
||||
if obj.get("_bus_relay") is True:
|
||||
topic = obj.get("topic")
|
||||
envelope = obj.get("envelope")
|
||||
if isinstance(topic, str) and isinstance(envelope, dict):
|
||||
hermes_events.publish(topic, envelope)
|
||||
return
|
||||
|
||||
# Shape 1: legacy TUI JSON-RPC event frame
|
||||
if obj.get("method") == "event":
|
||||
params = obj.get("params")
|
||||
if not isinstance(params, dict):
|
||||
return
|
||||
evt = params.get("type")
|
||||
if not isinstance(evt, str) or not evt:
|
||||
return
|
||||
bus_payload: dict = {}
|
||||
sid = params.get("session_id")
|
||||
if sid is not None:
|
||||
bus_payload["session_id"] = sid
|
||||
payload = params.get("payload")
|
||||
if isinstance(payload, dict):
|
||||
bus_payload.update(payload)
|
||||
hermes_events.publish(f"tui.{evt}", bus_payload)
|
||||
|
||||
|
||||
@app.websocket("/api/events")
|
||||
async def events_ws(ws: WebSocket) -> None:
|
||||
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
|
||||
|
||||
298
hermes_events.py
Normal file
298
hermes_events.py
Normal file
@@ -0,0 +1,298 @@
|
||||
"""
|
||||
Hermes Agent — process-local pub/sub event bus.
|
||||
|
||||
Plugins and core components can publish events on a shared in-process bus and
|
||||
subscribe to topic patterns. Sources publish; subscribers listen; neither knows
|
||||
about the other.
|
||||
|
||||
EXPERIMENTAL — topic names and payload shapes are explicitly prone to change
|
||||
until we publish a v1.0 schema. See ``docs/events.md`` for the current taxonomy
|
||||
and the breakage notice. Third-party plugins are welcome to subscribe today,
|
||||
but should expect to update their topic globs over the next few releases.
|
||||
|
||||
Basic usage
|
||||
-----------
|
||||
|
||||
::
|
||||
|
||||
from hermes_events import publish, subscribe, unsubscribe
|
||||
|
||||
# Sources publish — always sync, even from inside async contexts.
|
||||
publish("tui.tool.start", {"name": "web_search", "session_id": "abc"})
|
||||
|
||||
# Subscribers register glob patterns. ``*`` matches one topic segment;
|
||||
# ``**`` matches any number of segments.
|
||||
handle = subscribe("tui.tool.*", on_tool_event) # tui.tool.start/.complete/...
|
||||
handle = subscribe("tui.**", on_any_tui) # any tui.* topic
|
||||
handle = subscribe("**", everything)
|
||||
unsubscribe(handle)
|
||||
|
||||
Envelope
|
||||
--------
|
||||
|
||||
Every published payload is wrapped in a minimal envelope before being delivered
|
||||
to subscribers:
|
||||
|
||||
::
|
||||
|
||||
{
|
||||
"type": "<topic>", # always the topic the publisher used
|
||||
"ts": <unix-seconds-float>, # auto-stamped if missing
|
||||
"src": "<first-topic-segment>", # auto-stamped if missing
|
||||
...source-specific fields, freely evolving in the experimental phase
|
||||
}
|
||||
|
||||
Publishers that are *relaying* events from another process should pre-populate
|
||||
``ts`` and ``src`` so the original values are preserved (the bus only fills in
|
||||
missing keys; it never overwrites them).
|
||||
|
||||
Async semantics
|
||||
---------------
|
||||
|
||||
``publish()`` is synchronous. Sync subscribers fire immediately in the
|
||||
publisher's stack. Async subscribers are scheduled via
|
||||
``asyncio.create_task()`` when a running event loop is detected. When no loop
|
||||
is running (unit tests outside of an async test, startup-before-loop), async
|
||||
subscribers are dropped for that emit with a single warning log line — sync
|
||||
subscribers still fire normally.
|
||||
|
||||
This means async publishers (e.g. ``gateway.hooks.HookRegistry.emit()``) just
|
||||
call sync ``publish()`` from inside their coroutine — no ``await`` needed.
|
||||
|
||||
Exceptions in subscribers are logged but never raised back to the publisher.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# A subscriber callback. Receives the envelope dict and may return either
|
||||
# ``None`` (sync) or an awaitable (async).
|
||||
SubscriberCallback = Callable[[dict], "Awaitable[None] | None"]
|
||||
|
||||
|
||||
class _Subscription:
|
||||
"""Internal record of one subscription. Returned (opaque) to callers as
|
||||
the handle for ``unsubscribe()``.
|
||||
"""
|
||||
|
||||
__slots__ = ("pattern", "segments", "callback", "is_async")
|
||||
|
||||
def __init__(self, pattern: str, callback: SubscriberCallback) -> None:
|
||||
self.pattern = pattern
|
||||
self.segments = pattern.split(".")
|
||||
self.callback = callback
|
||||
# ``iscoroutinefunction`` doesn't catch partials, bound methods of
|
||||
# async functions in all cases, or callable objects with an async
|
||||
# ``__call__``. ``inspect.iscoroutinefunction`` handles the common
|
||||
# cases; we also probe by calling and inspecting if a callable
|
||||
# object's __call__ is async-tagged. Keep this conservative — a
|
||||
# false-negative just runs an async subscriber synchronously and
|
||||
# discards the coroutine (with a warning), which is recoverable;
|
||||
# a false-positive schedules a sync function as a task, which
|
||||
# would explode in create_task().
|
||||
self.is_async = inspect.iscoroutinefunction(callback) or inspect.iscoroutinefunction(
|
||||
getattr(callback, "__call__", None)
|
||||
)
|
||||
|
||||
|
||||
# Module-level state. The bus is a process-local singleton — we deliberately
|
||||
# don't expose a class or constructor to avoid the "which bus is this?"
|
||||
# confusion. Inside one Python process, there is exactly one bus.
|
||||
_subscriptions: list[_Subscription] = []
|
||||
_lock = threading.RLock()
|
||||
|
||||
|
||||
def _matches(topic_segments: list[str], pattern_segments: list[str]) -> bool:
|
||||
"""Glob-match a topic against a pattern.
|
||||
|
||||
``*`` matches exactly one segment. ``**`` matches zero or more segments.
|
||||
Anywhere a literal segment appears in the pattern, the topic must match
|
||||
exactly at that position.
|
||||
|
||||
Recursive implementation; patterns are short (typically 1-3 segments).
|
||||
"""
|
||||
# ``ti`` indexes into topic; ``pi`` indexes into pattern.
|
||||
def _rec(ti: int, pi: int) -> bool:
|
||||
# Both exhausted simultaneously → match.
|
||||
if pi == len(pattern_segments):
|
||||
return ti == len(topic_segments)
|
||||
|
||||
seg = pattern_segments[pi]
|
||||
|
||||
if seg == "**":
|
||||
# Greedy: try consuming 0..N topic segments.
|
||||
# First, try matching with ``**`` consuming zero topic segments.
|
||||
if _rec(ti, pi + 1):
|
||||
return True
|
||||
# Then try consuming one more topic segment and recurse on the
|
||||
# same ``**`` position.
|
||||
if ti < len(topic_segments) and _rec(ti + 1, pi):
|
||||
return True
|
||||
return False
|
||||
|
||||
if ti == len(topic_segments):
|
||||
# Pattern still has segments to consume but topic is exhausted.
|
||||
return False
|
||||
|
||||
if seg == "*" or seg == topic_segments[ti]:
|
||||
return _rec(ti + 1, pi + 1)
|
||||
|
||||
return False
|
||||
|
||||
return _rec(0, 0)
|
||||
|
||||
|
||||
def subscribe(pattern: str, callback: SubscriberCallback) -> _Subscription:
|
||||
"""Subscribe a callback to a topic glob pattern.
|
||||
|
||||
Returns a subscription handle. Pass it to :func:`unsubscribe` to remove.
|
||||
|
||||
The same callback may subscribe to the same pattern more than once; each
|
||||
call returns a distinct handle. The bus does not deduplicate.
|
||||
"""
|
||||
if not pattern:
|
||||
raise ValueError("subscribe pattern must be a non-empty string")
|
||||
|
||||
sub = _Subscription(pattern, callback)
|
||||
with _lock:
|
||||
_subscriptions.append(sub)
|
||||
return sub
|
||||
|
||||
|
||||
def unsubscribe(handle: _Subscription) -> bool:
|
||||
"""Remove a subscription. Returns True if the handle was found, False
|
||||
otherwise (idempotent — unsubscribing a removed handle is not an error).
|
||||
"""
|
||||
with _lock:
|
||||
try:
|
||||
_subscriptions.remove(handle)
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def publish(topic: str, payload: dict | None = None) -> None:
|
||||
"""Publish an event on a topic.
|
||||
|
||||
``topic`` is a ``.``-segmented string (e.g. ``"tui.tool.start"``).
|
||||
``payload`` is a dict of source-specific fields; the bus wraps it with the
|
||||
minimal envelope (``type``, ``ts``, ``src``) before delivery.
|
||||
|
||||
Publishers that are relaying events from another process may pre-populate
|
||||
``ts`` and/or ``src`` in the payload to preserve the originating values;
|
||||
the bus only fills in missing keys.
|
||||
|
||||
This call always returns synchronously. Sync subscribers fire immediately;
|
||||
async subscribers are scheduled via ``asyncio.create_task()`` when a
|
||||
running loop is available. If no loop is running, async subscribers are
|
||||
dropped with a single warning log line per emit.
|
||||
"""
|
||||
if not topic:
|
||||
raise ValueError("publish topic must be a non-empty string")
|
||||
|
||||
# Build the delivered envelope. We do this once and share the dict across
|
||||
# all subscribers — subscribers that mutate it are misusing the API, but
|
||||
# we can't realistically guard against that without copying for every
|
||||
# delivery (which would balloon cost for the ``**`` firehose case).
|
||||
envelope = dict(payload) if payload else {}
|
||||
envelope.setdefault("type", topic)
|
||||
envelope.setdefault("ts", time.time())
|
||||
envelope.setdefault("src", topic.split(".", 1)[0])
|
||||
|
||||
topic_segments = topic.split(".")
|
||||
|
||||
# Snapshot subscribers under the lock, then release before invoking
|
||||
# callbacks — a subscriber that calls publish() or subscribe() from
|
||||
# inside its callback must not deadlock on a sync callback.
|
||||
with _lock:
|
||||
matches = [
|
||||
sub
|
||||
for sub in _subscriptions
|
||||
if _matches(topic_segments, sub.segments)
|
||||
]
|
||||
|
||||
if not matches:
|
||||
return
|
||||
|
||||
# Probe for a running event loop ONCE per publish call, not per
|
||||
# subscriber. ``get_running_loop()`` only succeeds if we're being called
|
||||
# from inside a coroutine or task; otherwise it raises ``RuntimeError``.
|
||||
try:
|
||||
loop: asyncio.AbstractEventLoop | None = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = None
|
||||
|
||||
dropped_async = 0
|
||||
|
||||
for sub in matches:
|
||||
try:
|
||||
if sub.is_async:
|
||||
if loop is None:
|
||||
dropped_async += 1
|
||||
continue
|
||||
# Invoke the coroutine factory to get a coroutine, then schedule
|
||||
# it on the running loop. The task object is intentionally not
|
||||
# awaited — the bus is fire-and-forget.
|
||||
coro = sub.callback(envelope)
|
||||
if inspect.iscoroutine(coro):
|
||||
loop.create_task(coro)
|
||||
else:
|
||||
result = sub.callback(envelope)
|
||||
# A sync subscriber that accidentally returns a coroutine
|
||||
# (e.g. someone changed sync→async without updating the
|
||||
# registration) would leak the coroutine. Warn and discard.
|
||||
if inspect.iscoroutine(result):
|
||||
logger.warning(
|
||||
"hermes_events: subscriber for %r returned a coroutine "
|
||||
"but was not registered as async; coroutine dropped",
|
||||
sub.pattern,
|
||||
)
|
||||
result.close()
|
||||
except Exception:
|
||||
# Subscriber exceptions are logged but never propagated. One
|
||||
# bad subscriber must not break the publisher's main path or
|
||||
# starve the rest of the subscribers on this emit.
|
||||
logger.exception(
|
||||
"hermes_events: subscriber for pattern %r raised on topic %r",
|
||||
sub.pattern,
|
||||
topic,
|
||||
)
|
||||
|
||||
if dropped_async:
|
||||
logger.warning(
|
||||
"hermes_events: dropped %d async subscriber(s) on topic %r "
|
||||
"(no running asyncio loop)",
|
||||
dropped_async,
|
||||
topic,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Testing aids — not part of the documented public API.
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _reset_for_tests() -> None:
|
||||
"""Clear all subscriptions. For unit tests only — must NOT be called from
|
||||
production code or third-party plugins."""
|
||||
with _lock:
|
||||
_subscriptions.clear()
|
||||
|
||||
|
||||
def _subscriber_count() -> int:
|
||||
"""Return the current subscriber count. For unit tests only."""
|
||||
with _lock:
|
||||
return len(_subscriptions)
|
||||
|
||||
|
||||
__all__ = ["publish", "subscribe", "unsubscribe"]
|
||||
@@ -217,7 +217,7 @@ hermes-agent = "run_agent:main"
|
||||
hermes-acp = "acp_adapter.entry:main"
|
||||
|
||||
[tool.setuptools]
|
||||
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_bootstrap", "hermes_constants", "hermes_state", "hermes_time", "hermes_logging", "utils"]
|
||||
py-modules = ["run_agent", "model_tools", "toolsets", "batch_runner", "trajectory_compressor", "toolset_distributions", "cli", "hermes_bootstrap", "hermes_constants", "hermes_events", "hermes_state", "hermes_time", "hermes_logging", "utils"]
|
||||
|
||||
[tool.setuptools.package-data]
|
||||
hermes_cli = ["web_dist/**/*", "tui_dist/**/*", "scripts/install.sh", "scripts/install.ps1"]
|
||||
|
||||
402
tests/test_hermes_events.py
Normal file
402
tests/test_hermes_events.py
Normal file
@@ -0,0 +1,402 @@
|
||||
"""Unit tests for the hermes_events pub/sub bus.
|
||||
|
||||
Covers:
|
||||
- publish/subscribe round-trip
|
||||
- glob pattern matching (`*` one segment, `**` zero or more)
|
||||
- sync subscriber dispatch (fires in publisher stack)
|
||||
- async subscriber dispatch via asyncio.create_task when a loop is running
|
||||
- async subscriber drop-with-warning when no loop is running
|
||||
- exception isolation (one bad subscriber doesn't kill others or raise)
|
||||
- unsubscribe idempotence
|
||||
- envelope auto-stamp of type/ts/src when missing
|
||||
- envelope preservation of pre-populated ts/src (cross-process relay case)
|
||||
- high-fanout micro-benchmark sanity check
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_events
|
||||
from hermes_events import publish, subscribe, unsubscribe
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_bus():
|
||||
"""Reset bus state before each test. Per-file pytest isolation already
|
||||
means cross-file leakage is impossible, but multiple tests in this file
|
||||
share the same process — we want each test to start clean.
|
||||
"""
|
||||
hermes_events._reset_for_tests()
|
||||
yield
|
||||
hermes_events._reset_for_tests()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Basic publish/subscribe round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_publish_subscribe_round_trip():
|
||||
received: list[dict] = []
|
||||
subscribe("foo.bar", received.append)
|
||||
|
||||
publish("foo.bar", {"k": 1})
|
||||
|
||||
assert len(received) == 1
|
||||
env = received[0]
|
||||
assert env["type"] == "foo.bar"
|
||||
assert env["src"] == "foo"
|
||||
assert "ts" in env
|
||||
assert env["k"] == 1
|
||||
|
||||
|
||||
def test_publish_with_no_payload_still_delivers_envelope():
|
||||
received: list[dict] = []
|
||||
subscribe("alone", received.append)
|
||||
|
||||
publish("alone")
|
||||
|
||||
assert len(received) == 1
|
||||
env = received[0]
|
||||
assert env["type"] == "alone"
|
||||
assert env["src"] == "alone"
|
||||
assert isinstance(env["ts"], float)
|
||||
|
||||
|
||||
def test_publish_empty_topic_raises():
|
||||
with pytest.raises(ValueError):
|
||||
publish("")
|
||||
|
||||
|
||||
def test_subscribe_empty_pattern_raises():
|
||||
with pytest.raises(ValueError):
|
||||
subscribe("", lambda env: None)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Glob pattern matching
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_star_matches_exactly_one_segment():
|
||||
received: list[str] = []
|
||||
subscribe("tui.*", lambda env: received.append(env["type"]))
|
||||
|
||||
publish("tui.tool") # one segment after tui → matches
|
||||
publish("tui.tool.start") # two segments after tui → does NOT match
|
||||
publish("gateway.tool") # different prefix → does NOT match
|
||||
publish("tui") # too few → does NOT match
|
||||
|
||||
assert received == ["tui.tool"]
|
||||
|
||||
|
||||
def test_double_star_matches_zero_or_more_segments():
|
||||
received: list[str] = []
|
||||
subscribe("tui.**", lambda env: received.append(env["type"]))
|
||||
|
||||
publish("tui") # zero suffix segments
|
||||
publish("tui.tool") # one
|
||||
publish("tui.tool.start") # two
|
||||
publish("gateway.tool") # different prefix → no match
|
||||
received_after_match = list(received)
|
||||
|
||||
assert received_after_match == ["tui", "tui.tool", "tui.tool.start"]
|
||||
|
||||
|
||||
def test_double_star_alone_is_firehose():
|
||||
received: list[str] = []
|
||||
subscribe("**", lambda env: received.append(env["type"]))
|
||||
|
||||
publish("a")
|
||||
publish("a.b")
|
||||
publish("a.b.c.d.e")
|
||||
|
||||
assert received == ["a", "a.b", "a.b.c.d.e"]
|
||||
|
||||
|
||||
def test_mid_segment_double_star():
|
||||
received: list[str] = []
|
||||
subscribe("a.**.z", lambda env: received.append(env["type"]))
|
||||
|
||||
publish("a.z") # ** consumes zero
|
||||
publish("a.b.z") # ** consumes one
|
||||
publish("a.b.c.z") # ** consumes two
|
||||
publish("a.b.c.y") # doesn't end with z → no match
|
||||
publish("a") # too short → no match
|
||||
|
||||
assert received == ["a.z", "a.b.z", "a.b.c.z"]
|
||||
|
||||
|
||||
def test_literal_segments_must_match_exactly():
|
||||
received: list[str] = []
|
||||
subscribe("gateway.agent.start", lambda env: received.append(env["type"]))
|
||||
|
||||
publish("gateway.agent.start") # exact match
|
||||
publish("gateway.agent.end") # different last segment
|
||||
publish("gateway.session.start") # different middle segment
|
||||
|
||||
assert received == ["gateway.agent.start"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sync vs async subscriber dispatch
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_sync_subscriber_fires_in_publisher_stack():
|
||||
"""Sync subscribers must execute synchronously — the publisher's next
|
||||
statement must see their side-effects already applied."""
|
||||
received: list[dict] = []
|
||||
|
||||
def on_evt(env: dict) -> None:
|
||||
received.append(env)
|
||||
|
||||
subscribe("sync.test", on_evt)
|
||||
publish("sync.test", {"v": 42})
|
||||
|
||||
# No await, no sleep, no loop — must already be there.
|
||||
assert len(received) == 1
|
||||
assert received[0]["v"] == 42
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_subscriber_dispatched_via_create_task():
|
||||
"""Async subscribers fire via asyncio.create_task when a loop is running.
|
||||
The publisher returns immediately; we await once for the task to run."""
|
||||
received: list[dict] = []
|
||||
|
||||
async def on_evt(env: dict) -> None:
|
||||
received.append(env)
|
||||
|
||||
subscribe("async.test", on_evt)
|
||||
publish("async.test", {"v": 7})
|
||||
|
||||
# Publish returned synchronously; the task is scheduled but not yet run.
|
||||
# Yield to the loop once so the task can fire.
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0]["v"] == 7
|
||||
|
||||
|
||||
def test_async_subscriber_dropped_when_no_loop(caplog):
|
||||
"""When no event loop is running, an async subscriber is dropped with
|
||||
a single warning log line per emit. Sync subscribers still fire."""
|
||||
async_received: list[dict] = []
|
||||
sync_received: list[dict] = []
|
||||
|
||||
async def on_async(env: dict) -> None:
|
||||
async_received.append(env)
|
||||
|
||||
def on_sync(env: dict) -> None:
|
||||
sync_received.append(env)
|
||||
|
||||
subscribe("noloop.test", on_async)
|
||||
subscribe("noloop.test", on_sync)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="hermes_events"):
|
||||
publish("noloop.test", {"v": 1})
|
||||
|
||||
# Async one was dropped (no loop running).
|
||||
assert async_received == []
|
||||
# Sync one still fired.
|
||||
assert len(sync_received) == 1
|
||||
|
||||
# Warning was emitted.
|
||||
assert any(
|
||||
"dropped" in r.message and "noloop.test" in r.message
|
||||
for r in caplog.records
|
||||
), [r.message for r in caplog.records]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Exception isolation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_exception_in_one_subscriber_does_not_block_others(caplog):
|
||||
received: list[dict] = []
|
||||
|
||||
def boom(env: dict) -> None:
|
||||
raise RuntimeError("subscriber test failure")
|
||||
|
||||
def ok(env: dict) -> None:
|
||||
received.append(env)
|
||||
|
||||
# Order matters less than coverage — register boom first to confirm
|
||||
# ok still runs after the exception path.
|
||||
subscribe("err.test", boom)
|
||||
subscribe("err.test", ok)
|
||||
|
||||
with caplog.at_level(logging.ERROR, logger="hermes_events"):
|
||||
# publish() must not raise even though boom does.
|
||||
publish("err.test", {"v": 1})
|
||||
|
||||
assert len(received) == 1
|
||||
# The error was logged.
|
||||
assert any(
|
||||
"subscriber test failure" in (r.message + (r.exc_text or ""))
|
||||
or "err.test" in r.message
|
||||
for r in caplog.records
|
||||
), [r.message for r in caplog.records]
|
||||
|
||||
|
||||
def test_sync_subscriber_returning_coroutine_is_discarded(caplog):
|
||||
"""A sync subscriber that accidentally returns a coroutine (e.g. someone
|
||||
converted it to async without re-registering) should not leak the
|
||||
coroutine. We discard with a warning."""
|
||||
received: list[dict] = []
|
||||
|
||||
# Note: NOT declared async, but body returns a coroutine.
|
||||
def sneaky(env: dict):
|
||||
async def inner():
|
||||
received.append(env)
|
||||
return inner()
|
||||
|
||||
subscribe("sneak.test", sneaky)
|
||||
|
||||
with caplog.at_level(logging.WARNING, logger="hermes_events"):
|
||||
publish("sneak.test")
|
||||
|
||||
# The inner coroutine was discarded — never awaited, never ran.
|
||||
assert received == []
|
||||
assert any("coroutine" in r.message for r in caplog.records), [
|
||||
r.message for r in caplog.records
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unsubscribe
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_unsubscribe_removes_subscription():
|
||||
received: list[dict] = []
|
||||
h = subscribe("u.test", received.append)
|
||||
|
||||
publish("u.test")
|
||||
assert len(received) == 1
|
||||
|
||||
assert unsubscribe(h) is True
|
||||
publish("u.test")
|
||||
assert len(received) == 1 # not delivered
|
||||
|
||||
# Idempotent: unsubscribing a removed handle is not an error.
|
||||
assert unsubscribe(h) is False
|
||||
|
||||
|
||||
def test_subscribe_same_pattern_twice_yields_distinct_handles():
|
||||
received_a: list[dict] = []
|
||||
received_b: list[dict] = []
|
||||
|
||||
h_a = subscribe("dup", received_a.append)
|
||||
h_b = subscribe("dup", received_b.append)
|
||||
assert h_a is not h_b
|
||||
|
||||
publish("dup")
|
||||
assert len(received_a) == 1
|
||||
assert len(received_b) == 1
|
||||
|
||||
unsubscribe(h_a)
|
||||
publish("dup")
|
||||
assert len(received_a) == 1
|
||||
assert len(received_b) == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Envelope auto-stamping and preservation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_envelope_autostamps_type_ts_src_when_missing():
|
||||
received: list[dict] = []
|
||||
subscribe("**", received.append)
|
||||
|
||||
before = time.time()
|
||||
publish("ns.evt", {"x": 1})
|
||||
after = time.time()
|
||||
|
||||
env = received[0]
|
||||
assert env["type"] == "ns.evt"
|
||||
assert env["src"] == "ns"
|
||||
assert before <= env["ts"] <= after
|
||||
assert env["x"] == 1
|
||||
|
||||
|
||||
def test_envelope_preserves_prepopulated_ts_and_src():
|
||||
"""For the cross-process relay case: the dashboard's bridge ingestor
|
||||
re-publishes a frame received from the gateway, with the gateway's
|
||||
original ts and src already in the payload. The bus must not overwrite."""
|
||||
received: list[dict] = []
|
||||
subscribe("**", received.append)
|
||||
|
||||
publish(
|
||||
"gateway.agent.start",
|
||||
{"ts": 1000.5, "src": "remote-gateway", "platform": "telegram"},
|
||||
)
|
||||
|
||||
env = received[0]
|
||||
assert env["ts"] == 1000.5
|
||||
assert env["src"] == "remote-gateway"
|
||||
assert env["type"] == "gateway.agent.start" # type is always set to topic
|
||||
assert env["platform"] == "telegram"
|
||||
|
||||
|
||||
def test_envelope_does_not_overwrite_caller_provided_type():
|
||||
"""If the caller pre-populates `type` (e.g. relaying a frame whose topic
|
||||
name differs slightly from the current one), the pre-populated value
|
||||
wins — same rule as ts/src."""
|
||||
received: list[dict] = []
|
||||
subscribe("**", received.append)
|
||||
|
||||
publish("foo.bar", {"type": "foo.bar.relayed", "k": 1})
|
||||
|
||||
env = received[0]
|
||||
assert env["type"] == "foo.bar.relayed"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# High-fanout micro-benchmark
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_high_fanout_publish_is_sub_millisecond():
|
||||
"""100 subscribers on a hot topic must dispatch in well under a
|
||||
millisecond on any reasonable machine. This is a sanity check, not a
|
||||
perf gate — generous bound so it doesn't flake on busy CI."""
|
||||
counts: list[int] = [0] * 100
|
||||
|
||||
def make_cb(i: int):
|
||||
def cb(env: dict) -> None:
|
||||
counts[i] += 1
|
||||
return cb
|
||||
|
||||
for i in range(100):
|
||||
subscribe("fan", make_cb(i))
|
||||
|
||||
start = time.perf_counter()
|
||||
publish("fan", {"k": 1})
|
||||
elapsed = time.perf_counter() - start
|
||||
|
||||
assert all(c == 1 for c in counts)
|
||||
assert elapsed < 0.05, f"100-way fanout took {elapsed*1000:.2f}ms (>50ms)"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers (sanity)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_subscriber_count_helper():
|
||||
assert hermes_events._subscriber_count() == 0
|
||||
h1 = subscribe("a", lambda e: None)
|
||||
h2 = subscribe("b", lambda e: None)
|
||||
assert hermes_events._subscriber_count() == 2
|
||||
unsubscribe(h1)
|
||||
unsubscribe(h2)
|
||||
assert hermes_events._subscriber_count() == 0
|
||||
264
tests/test_hermes_events_bridge.py
Normal file
264
tests/test_hermes_events_bridge.py
Normal file
@@ -0,0 +1,264 @@
|
||||
"""Integration test for the cross-process event bridge.
|
||||
|
||||
The dashboard process receives bus relay frames on ``/api/pub`` from two
|
||||
sources:
|
||||
|
||||
1. The TUI sidecar (legacy JSON-RPC ``{method: "event", params: {...}}``)
|
||||
2. The standalone gateway process (new ``{_bus_relay: true, topic, envelope}``)
|
||||
|
||||
This test exercises the dashboard-side ingestor (``_republish_pub_frame_to_bus``
|
||||
in ``hermes_cli/web_server.py``) directly: we feed it synthetic frames in
|
||||
both shapes and assert the local bus delivers them to subscribers with the
|
||||
correct topics + envelopes.
|
||||
|
||||
We deliberately don't spawn a real subprocess. The wire format and the
|
||||
ingestor logic are what matter; the WebSocket transport itself is exercised
|
||||
by the production smoke test (Phase 17 manual smoke) and by upstream
|
||||
``websockets`` library tests.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_events
|
||||
from hermes_cli.web_server import _republish_pub_frame_to_bus
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_bus():
|
||||
hermes_events._reset_for_tests()
|
||||
yield
|
||||
hermes_events._reset_for_tests()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shape 2: bus relay (the gateway → dashboard path)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_bus_relay_frame_publishes_with_preserved_envelope():
|
||||
"""A {_bus_relay: true, topic, envelope} frame is re-published verbatim,
|
||||
preserving the originating ts and src so subscribers see the gateway's
|
||||
original timestamp rather than a re-stamped one."""
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("**", received.append)
|
||||
|
||||
# Simulate a frame the gateway-side bridge would send: a
|
||||
# ``gateway.agent.start`` envelope it built ~5 seconds ago.
|
||||
gateway_envelope = {
|
||||
"type": "gateway.agent.start",
|
||||
"ts": 100.0,
|
||||
"src": "gateway",
|
||||
"platform": "telegram",
|
||||
"session_id": "abc-123",
|
||||
}
|
||||
frame = json.dumps(
|
||||
{
|
||||
"_bus_relay": True,
|
||||
"topic": "gateway.agent.start",
|
||||
"envelope": gateway_envelope,
|
||||
}
|
||||
)
|
||||
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
|
||||
assert len(received) == 1
|
||||
env = received[0]
|
||||
assert env["type"] == "gateway.agent.start"
|
||||
assert env["ts"] == 100.0, "originating ts must be preserved, not re-stamped"
|
||||
assert env["src"] == "gateway", "originating src must be preserved"
|
||||
assert env["platform"] == "telegram"
|
||||
assert env["session_id"] == "abc-123"
|
||||
|
||||
|
||||
def test_bus_relay_with_arbitrary_topic_passes_through():
|
||||
"""The dashboard ingestor publishes whatever topic the gateway sent; it
|
||||
does not inspect or validate the topic namespace beyond shape."""
|
||||
received: list[str] = []
|
||||
hermes_events.subscribe("**", lambda env: received.append(env["type"]))
|
||||
|
||||
frame = json.dumps(
|
||||
{
|
||||
"_bus_relay": True,
|
||||
"topic": "agent.iteration",
|
||||
"envelope": {"type": "agent.iteration", "ts": 1.0, "src": "agent", "n": 5},
|
||||
}
|
||||
)
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
|
||||
assert received == ["agent.iteration"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Shape 1: TUI sidecar JSON-RPC (the TUI → dashboard path)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_tui_jsonrpc_frame_publishes_as_tui_namespace():
|
||||
"""A {jsonrpc, method:"event", params:{type, session_id, payload}} frame
|
||||
becomes a `tui.<type>` topic publish on the local bus."""
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("tui.**", received.append)
|
||||
|
||||
frame = json.dumps(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {
|
||||
"type": "tool.start",
|
||||
"session_id": "sess-1",
|
||||
"payload": {"name": "web_search", "preview": "foo bar"},
|
||||
},
|
||||
}
|
||||
)
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
|
||||
assert len(received) == 1
|
||||
env = received[0]
|
||||
assert env["type"] == "tui.tool.start"
|
||||
assert env["src"] == "tui"
|
||||
assert env["session_id"] == "sess-1"
|
||||
assert env["name"] == "web_search"
|
||||
assert env["preview"] == "foo bar"
|
||||
# ts is auto-stamped (TUI JSON-RPC frames don't carry one).
|
||||
assert isinstance(env["ts"], float)
|
||||
|
||||
|
||||
def test_tui_jsonrpc_frame_without_payload_still_publishes():
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("tui.**", received.append)
|
||||
|
||||
frame = json.dumps(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {"type": "message.start", "session_id": "s1"},
|
||||
}
|
||||
)
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
|
||||
assert len(received) == 1
|
||||
assert received[0]["type"] == "tui.message.start"
|
||||
assert received[0]["session_id"] == "s1"
|
||||
|
||||
|
||||
def test_tui_jsonrpc_frame_drops_silently_when_type_missing():
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("**", received.append)
|
||||
|
||||
frame = json.dumps(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {"session_id": "s1"}, # no `type` field
|
||||
}
|
||||
)
|
||||
_republish_pub_frame_to_bus(frame)
|
||||
|
||||
assert received == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Malformed frames are best-effort (must not raise)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_invalid_json_is_silently_ignored():
|
||||
"""A malformed frame must not raise — the production pub_ws handler
|
||||
relies on this so a single bad frame doesn't kill the WS receive loop."""
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("**", received.append)
|
||||
|
||||
_republish_pub_frame_to_bus("not json at all {{{")
|
||||
_republish_pub_frame_to_bus("")
|
||||
_republish_pub_frame_to_bus("[1, 2, 3]") # array, not dict
|
||||
|
||||
assert received == []
|
||||
|
||||
|
||||
def test_unknown_frame_shape_is_silently_ignored():
|
||||
received: list[dict] = []
|
||||
hermes_events.subscribe("**", received.append)
|
||||
|
||||
# Looks like JSON-RPC but method is not "event" — ignore.
|
||||
_republish_pub_frame_to_bus(json.dumps({"jsonrpc": "2.0", "method": "ping"}))
|
||||
# Looks like bus_relay but topic missing — ignore.
|
||||
_republish_pub_frame_to_bus(json.dumps({"_bus_relay": True, "envelope": {}}))
|
||||
# Neither shape — ignore.
|
||||
_republish_pub_frame_to_bus(json.dumps({"hello": "world"}))
|
||||
|
||||
assert received == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Subscriber sees both sources via a single pattern
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_plugin_can_subscribe_to_both_sources_with_one_pattern():
|
||||
"""The point of the bridge: a plugin subscribing to e.g. `**` should see
|
||||
TUI events (auto-stamped) AND gateway events (preserved ts) interleaved
|
||||
via a single subscribe() call — no need for per-source plumbing."""
|
||||
received: list[tuple[str, float]] = []
|
||||
hermes_events.subscribe(
|
||||
"**", lambda env: received.append((env["type"], env["ts"]))
|
||||
)
|
||||
|
||||
# Gateway-relayed event with preserved ts.
|
||||
_republish_pub_frame_to_bus(
|
||||
json.dumps(
|
||||
{
|
||||
"_bus_relay": True,
|
||||
"topic": "gateway.agent.start",
|
||||
"envelope": {"type": "gateway.agent.start", "ts": 100.0, "src": "gateway"},
|
||||
}
|
||||
)
|
||||
)
|
||||
# TUI event auto-stamped.
|
||||
_republish_pub_frame_to_bus(
|
||||
json.dumps(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {"type": "tool.start", "session_id": "s1"},
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
assert len(received) == 2
|
||||
topics = [t for t, _ in received]
|
||||
assert topics == ["gateway.agent.start", "tui.tool.start"]
|
||||
# Gateway ts preserved; TUI ts is auto-stamped (current time, > 100).
|
||||
assert received[0][1] == 100.0
|
||||
assert received[1][1] > 100.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# The pub_ws's outer try/except catches our errors too
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_subscriber_exception_does_not_propagate(caplog):
|
||||
"""The bus already isolates subscriber exceptions; this confirms that
|
||||
when called from the ingestor, the exception is still contained."""
|
||||
|
||||
def boom(env):
|
||||
raise RuntimeError("subscriber test failure")
|
||||
|
||||
hermes_events.subscribe("**", boom)
|
||||
|
||||
with caplog.at_level(logging.ERROR):
|
||||
# Must not raise.
|
||||
_republish_pub_frame_to_bus(
|
||||
json.dumps(
|
||||
{
|
||||
"_bus_relay": True,
|
||||
"topic": "any",
|
||||
"envelope": {"type": "any", "ts": 1.0, "src": "test"},
|
||||
}
|
||||
)
|
||||
)
|
||||
@@ -17,6 +17,7 @@ from typing import Any, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
import hermes_events
|
||||
from utils import is_truthy_value
|
||||
from tui_gateway.transport import (
|
||||
StdioTransport,
|
||||
@@ -388,6 +389,26 @@ def _emit(event: str, sid: str, payload: dict | None = None):
|
||||
if payload is not None:
|
||||
params["payload"] = payload
|
||||
write_json({"jsonrpc": "2.0", "method": "event", "params": params})
|
||||
# Also publish on the generic event bus (hermes_events) so plugins
|
||||
# (orb today, achievements / observability / debug taps tomorrow) can
|
||||
# subscribe to TUI lifecycle without needing per-session WS plumbing.
|
||||
# The bus payload mirrors the JSON-RPC params: ``session_id`` plus the
|
||||
# event-specific ``payload`` dict (if any), flattened so subscribers
|
||||
# don't need to dig into a nested envelope. The topic is prefixed with
|
||||
# ``tui.`` to namespace it from gateway/agent events. See docs/events.md.
|
||||
try:
|
||||
bus_payload: dict = {"session_id": sid}
|
||||
if payload is not None:
|
||||
bus_payload.update(payload)
|
||||
hermes_events.publish(f"tui.{event}", bus_payload)
|
||||
except Exception:
|
||||
# The bus is best-effort plumbing for plugins; a publish failure
|
||||
# must never disturb the JSON-RPC emit that drives the React TUI.
|
||||
# hermes_events itself already logs subscriber failures internally
|
||||
# — this catch only handles a publish-side error (e.g. someone
|
||||
# passed an unhashable topic), which shouldn't happen but isn't
|
||||
# worth a crash if it does.
|
||||
pass
|
||||
|
||||
|
||||
def _status_update(sid: str, kind: str, text: str | None = None):
|
||||
|
||||
Reference in New Issue
Block a user