mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-20 00:50:40 +08:00
Compare commits
2 Commits
bb/desktop
...
opencode-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eba8cc564e | ||
|
|
d2c53ff558 |
@@ -62,33 +62,55 @@ live platform adapter's capability methods.
|
||||
|
||||
The connector normalizes each platform wire event into a `MessageEvent`
|
||||
(`gateway/platforms/base.py`) and delivers it to the gateway. **Inbound is
|
||||
delivered over a signed HTTP POST, not the outbound `/relay` WebSocket** (see
|
||||
the transport note below). The gateway keys the session via `build_session_key()`
|
||||
delivered over the gateway's OUTBOUND `/relay` WebSocket** (see the transport
|
||||
note below) — the connector pushes an `inbound` frame down the socket the
|
||||
gateway already dialed. The gateway keys the session via `build_session_key()`
|
||||
from the embedded `SessionSource` — so populating the right discriminators is
|
||||
the single highest-correctness responsibility of the connector.
|
||||
|
||||
### Inbound transport (signed HTTP POST, not the outbound WS)
|
||||
### Inbound transport (WS back-channel, not HTTP)
|
||||
|
||||
The gateway dials **out** to the connector's `/relay` WebSocket for the
|
||||
handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound,
|
||||
however, is delivered the other way: the connector **POSTs** the normalized
|
||||
event to the gateway's inbound endpoint (`HttpGatewayDelivery` on the connector;
|
||||
`gateway/relay/inbound_receiver.py` on the gateway). The reason is
|
||||
multi-instance: the connector instance that owns a platform's socket (and thus
|
||||
produces inbound events) is generally **not** the instance a given gateway
|
||||
dialed its outbound WS into, so inbound must target a tenant **endpoint** (which
|
||||
may load-balance across gateway instances) rather than ride one gateway's
|
||||
outbound socket. Each delivery is HMAC-signed with the per-tenant **delivery
|
||||
key** (§6.1); the gateway verifies the signature over the exact raw bytes before
|
||||
accepting the event. Two POST targets:
|
||||
handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound rides
|
||||
the **same socket** in the other direction: the connector pushes an `inbound`
|
||||
frame (and `interrupt_inbound` for §5) down the gateway's outbound WS. There is
|
||||
**no gateway-side inbound HTTP endpoint** — a gateway need not (and, when hosted,
|
||||
cannot) expose any inbound port; everything flows over the connection it
|
||||
initiated.
|
||||
|
||||
**Multi-instance routing.** The connector instance that owns a platform's socket
|
||||
(and thus produces inbound events) is generally **not** the instance the gateway
|
||||
dialed its outbound WS into. The producing instance therefore publishes the
|
||||
event on the connector's internal **relay bus** (Redis pub/sub; `RelayBus` in
|
||||
`src/core/relayBus.ts`) keyed by tenant. Every connector instance subscribes and
|
||||
routes each message to its **local** sessions for that tenant
|
||||
(`RelayServer.routeBusMessage`); the single instance that actually holds the
|
||||
gateway's socket delivers it, and instances with no local session for the tenant
|
||||
no-op. Cross-instance delivery is thus an in-cluster Redis hop, not a public
|
||||
HTTP call.
|
||||
|
||||
Frames (connector → gateway, over the WS):
|
||||
|
||||
- `{"type":"inbound", "event": <MessageEvent>, "bufferId"?}`
|
||||
- `{"type":"interrupt_inbound", "session_key", "chat_id"}` (§5)
|
||||
|
||||
**Trust.** The WS upgrade is authenticated with the gateway's per-gateway secret
|
||||
(§6.1), so the channel is trusted end to end — inbound frames are not separately
|
||||
HMAC-signed (the authenticated socket subsumes the per-delivery origin proof the
|
||||
old HTTP path needed). The relay-bus hop is inside the connector trust domain
|
||||
(same as the lease/buffer/capability stores).
|
||||
|
||||
> Earlier drafts of this contract delivered inbound over a signed **HTTP POST**
|
||||
> to a `gatewayEndpoint` (`HttpGatewayDelivery` + a gateway-side
|
||||
> `inbound_receiver`), HMAC-signed with a per-tenant delivery key. That required
|
||||
> every gateway to expose a reachable inbound URL — impossible for hosted
|
||||
> gateways, which have no public IP. The WS back-channel above replaces it; the
|
||||
> per-tenant delivery key is retained at provision for forward-compat but is no
|
||||
> longer used for inbound. `gatewayEndpoint` remains only for the **passthrough
|
||||
> plane** (Class-2/3 webhooks like Discord interactions / Twilio), which is a
|
||||
> separate synchronous-forward path and out of scope for this section.
|
||||
|
||||
- `POST {gatewayEndpoint}` → `{"type":"message", "event": <MessageEvent>}`
|
||||
- `POST {gatewayEndpoint}/interrupt` → `{"type":"interrupt", "session_key", "reason"?}` (§5)
|
||||
|
||||
> An earlier draft of this contract delivered inbound over the WS `inbound`
|
||||
> frame. That only works single-instance and predates the multi-instance
|
||||
> socket-ownership + channel-auth model; the signed-HTTP path above is the
|
||||
> shipped design.
|
||||
|
||||
### SessionSource fields (the wire surface)
|
||||
|
||||
@@ -178,13 +200,15 @@ gateway holds zero capability material). Source of truth:
|
||||
mid-turn `/stop` over the outbound WS. The connector MUST forward it to the
|
||||
gateway instance running that `session_key` (the routing invariant).
|
||||
- **Connector → gateway:** an inbound interrupt for a `session_key` is delivered
|
||||
as a **signed HTTP POST** to `{gatewayEndpoint}/interrupt` (§3 transport note),
|
||||
and bridged by the adapter's `on_interrupt(session_key, chat_id)` into the
|
||||
existing per-session interrupt mechanism, cancelling exactly that turn
|
||||
as an `interrupt_inbound` frame down the gateway's outbound WS (§3 transport
|
||||
note) — routed cross-instance via the relay bus to whichever instance holds
|
||||
the socket — and bridged by the adapter's `on_interrupt(session_key, chat_id)`
|
||||
into the existing per-session interrupt mechanism, cancelling exactly that turn
|
||||
(siblings untouched).
|
||||
|
||||
The gateway→connector `/stop` rides the outbound WS; the connector→gateway
|
||||
interrupt rides the same signed-HTTP inbound path as a normalized event.
|
||||
Both directions ride the gateway's outbound WS: the gateway→connector `/stop`
|
||||
egresses over it, and the connector→gateway interrupt rides the same `inbound`
|
||||
back-channel as a normalized event.
|
||||
|
||||
---
|
||||
|
||||
@@ -231,20 +255,21 @@ only in transport. See `docs/capability-trust-boundary.md` (connector repo:
|
||||
|
||||
A2 makes the connector the sole holder of platform secrets while the gateway may
|
||||
be **customer-managed and internet-exposed**, so the connector⇄gateway channel
|
||||
is itself authenticated. The gateway holds two enrollment-issued credentials
|
||||
(`hermes gateway enroll` → connector `/relay/enroll`): a **per-gateway secret**
|
||||
and a **per-tenant delivery key**. Both are HMAC-SHA256 schemes with a
|
||||
multi-secret rotation verify list (gateway side: `gateway/relay/auth.py`;
|
||||
connector side: `src/core/relayAuthToken.ts` + `src/core/deliverySigning.ts`).
|
||||
is itself authenticated. The gateway holds an enrollment- or provision-issued
|
||||
**per-gateway secret** (`hermes gateway enroll` → connector `/relay/enroll`, or
|
||||
managed self-provision → `/relay/provision`) that authenticates its outbound WS
|
||||
upgrade. It is an HMAC-SHA256 scheme with a multi-secret rotation verify list
|
||||
(gateway side: `gateway/relay/auth.py`; connector side:
|
||||
`src/core/relayAuthToken.ts`).
|
||||
|
||||
| Leg | Credential | Mechanism |
|
||||
|-----|-----------|-----------|
|
||||
| Gateway → connector WS upgrade | per-gateway secret | An `Authorization` bearer header on the `/relay` upgrade. The token is `base64url(payload:exp:sig)` where `payload = gatewayId` and `sig = HMAC(payload:exp, secret)`. Connector verifies and rejects the upgrade (**close 4401**) on mismatch/absence/revocation. The authenticated tenant comes from the connector's store, never the `hello` frame. |
|
||||
| Connector → gateway inbound POST | per-tenant delivery key | Two headers: `x-relay-timestamp` (unix seconds) and `x-relay-signature` (hex `HMAC(ts.rawBody, deliveryKey)`). Gateway verifies over the **exact raw bytes** within a ±300s replay window before accepting the event; rejects **401** otherwise. |
|
||||
| Connector → gateway inbound (`inbound` / `interrupt_inbound` frames) | — (rides the authenticated WS) | Inbound is pushed down the gateway's already-authenticated outbound socket (§3), so no per-message signature is needed. A **per-tenant delivery key** is still issued at enroll/provision and retained for forward-compat, but is no longer used to sign inbound. |
|
||||
|
||||
This is the **channel** authenticator — distinct from platform crypto, which the
|
||||
relay path still sheds entirely (§6). The gateway holds zero platform secrets;
|
||||
these two keys authenticate only the connector link. Full threat model +
|
||||
the per-gateway secret authenticates only the connector link. Full threat model +
|
||||
enrollment/rotation/kill-switch design: `docs/connector-gateway-auth-design.md`
|
||||
(connector repo).
|
||||
|
||||
|
||||
@@ -79,40 +79,6 @@ def relay_connection_auth() -> tuple[Optional[str], Optional[str]]:
|
||||
return (gateway_id or None, secret or None)
|
||||
|
||||
|
||||
def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]:
|
||||
"""Resolve (delivery_key, bind_host, bind_port) for the inbound receiver.
|
||||
|
||||
The connector delivers normalized inbound events to this gateway over a
|
||||
SIGNED HTTP POST (not the outbound WS), verified with the per-tenant delivery
|
||||
key issued at enrollment (``GATEWAY_RELAY_DELIVERY_KEY``). The receiver only
|
||||
starts when a delivery key AND a bind port are configured — a gateway with no
|
||||
public inbound URL (e.g. a purely outbound dev run) simply doesn't run it.
|
||||
|
||||
Env first (Docker), then ``gateway.relay_delivery_key`` /
|
||||
``gateway.relay_inbound_host`` / ``gateway.relay_inbound_port`` in config.yaml.
|
||||
Port 0 (default/unset) -> receiver disabled.
|
||||
"""
|
||||
key = os.environ.get("GATEWAY_RELAY_DELIVERY_KEY", "").strip()
|
||||
host = os.environ.get("GATEWAY_RELAY_INBOUND_HOST", "").strip()
|
||||
port_raw = os.environ.get("GATEWAY_RELAY_INBOUND_PORT", "").strip()
|
||||
if not (key and port_raw):
|
||||
try:
|
||||
from gateway.run import _load_gateway_config # late import to avoid cycle
|
||||
|
||||
cfg = (_load_gateway_config().get("gateway") or {})
|
||||
key = key or str(cfg.get("relay_delivery_key", "") or "").strip()
|
||||
host = host or str(cfg.get("relay_inbound_host", "") or "").strip()
|
||||
if not port_raw:
|
||||
port_raw = str(cfg.get("relay_inbound_port", "") or "").strip()
|
||||
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
|
||||
pass
|
||||
try:
|
||||
port = int(port_raw) if port_raw else 0
|
||||
except ValueError:
|
||||
port = 0
|
||||
return (key or None, host or "0.0.0.0", port)
|
||||
|
||||
|
||||
def relay_endpoint() -> Optional[str]:
|
||||
"""The gateway's own PUBLIC inbound URL, asserted to the connector at provision.
|
||||
|
||||
@@ -318,8 +284,11 @@ def self_provision_if_managed() -> bool:
|
||||
logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc)
|
||||
return False
|
||||
|
||||
# Set creds in-process so register_relay_adapter() + relay_inbound_config()
|
||||
# read them from os.environ. Never logged.
|
||||
# Set creds in-process so register_relay_adapter() reads them from os.environ
|
||||
# (the per-gateway secret authenticates the outbound WS upgrade). The delivery
|
||||
# key is still issued by the connector and persisted for forward-compat, but
|
||||
# inbound now rides the WS (no HTTP receiver), so it is not consumed here.
|
||||
# Never logged.
|
||||
os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id)
|
||||
os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "")
|
||||
os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "")
|
||||
|
||||
@@ -58,10 +58,6 @@ class RelayAdapter(BasePlatformAdapter):
|
||||
# Capability surface read by stream_consumer (getattr(..., 4096)).
|
||||
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
|
||||
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
|
||||
# Inbound delivery receiver (signed connector→gateway HTTP POSTs). Built
|
||||
# lazily in connect() when a delivery key + bind port are configured; a
|
||||
# purely-outbound dev gateway runs without it. See inbound_receiver.py.
|
||||
self._inbound_runner: Any = None
|
||||
|
||||
# ── capability surface (from descriptor) ─────────────────────────────
|
||||
@property
|
||||
@@ -80,6 +76,12 @@ class RelayAdapter(BasePlatformAdapter):
|
||||
if self._transport is None:
|
||||
raise RuntimeError("RelayAdapter has no transport configured")
|
||||
self._transport.set_inbound_handler(self._on_inbound)
|
||||
# Inbound interrupts (connector -> owning gateway) arrive as
|
||||
# interrupt_inbound frames over the SAME outbound WS; bridge them to the
|
||||
# adapter's interrupt path. WS-only: there is no inbound HTTP receiver.
|
||||
set_interrupt = getattr(self._transport, "set_interrupt_inbound_handler", None)
|
||||
if callable(set_interrupt):
|
||||
set_interrupt(self.on_interrupt)
|
||||
ok = await self._transport.connect()
|
||||
if not ok:
|
||||
return False
|
||||
@@ -92,40 +94,12 @@ class RelayAdapter(BasePlatformAdapter):
|
||||
logger.warning("relay handshake failed: %s", exc)
|
||||
return False
|
||||
self._apply_descriptor(descriptor)
|
||||
# Start the signed inbound-delivery receiver if configured (the connector
|
||||
# POSTs normalized events to it over HTTP, verified with the tenant
|
||||
# delivery key). Non-fatal: a receiver bind failure must not fail the
|
||||
# outbound connection — the gateway can still send.
|
||||
await self._maybe_start_inbound_receiver()
|
||||
# Inbound (messages + interrupts) is delivered over the outbound WS via
|
||||
# the connector's relay bus — there is NO inbound HTTP endpoint (hosted
|
||||
# gateways have no public IP). The transport's reader already dispatches
|
||||
# `inbound` / `interrupt_inbound` frames to the handlers wired above.
|
||||
return True
|
||||
|
||||
async def _maybe_start_inbound_receiver(self) -> None:
|
||||
"""Start the inbound HTTP receiver when a delivery key + port are set."""
|
||||
from gateway.relay import relay_inbound_config
|
||||
|
||||
delivery_key, host, port = relay_inbound_config()
|
||||
if not (delivery_key and port):
|
||||
return # no inbound URL configured -> outbound-only gateway
|
||||
try:
|
||||
from aiohttp import web
|
||||
|
||||
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
|
||||
|
||||
receiver = InboundDeliveryReceiver(
|
||||
delivery_key_verify_list=lambda: [delivery_key],
|
||||
on_message=self._on_inbound,
|
||||
on_interrupt=self.on_interrupt,
|
||||
)
|
||||
runner = web.AppRunner(receiver.build_app(), access_log=None)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, host, port)
|
||||
await site.start()
|
||||
self._inbound_runner = runner
|
||||
logger.info("relay inbound receiver listening on http://%s:%s", host, port)
|
||||
except Exception as exc: # noqa: BLE001 - inbound bind failure must not kill outbound
|
||||
logger.warning("relay inbound receiver failed to start: %s", exc)
|
||||
self._inbound_runner = None
|
||||
|
||||
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
|
||||
"""Adopt a (re)negotiated descriptor into the live capability surface."""
|
||||
self.descriptor = descriptor
|
||||
@@ -148,12 +122,6 @@ class RelayAdapter(BasePlatformAdapter):
|
||||
await self.interrupt_session_activity(session_key, chat_id)
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
if self._inbound_runner is not None:
|
||||
try:
|
||||
await self._inbound_runner.cleanup()
|
||||
except Exception: # noqa: BLE001 - best-effort teardown
|
||||
pass
|
||||
self._inbound_runner = None
|
||||
if self._transport is not None:
|
||||
await self._transport.disconnect()
|
||||
|
||||
|
||||
@@ -1,204 +0,0 @@
|
||||
"""Gateway-side inbound delivery receiver. EXPERIMENTAL.
|
||||
|
||||
The connector delivers normalized inbound events to a tenant's gateway over a
|
||||
**signed HTTP POST** (connector ``src/relay/httpGatewayDelivery.ts``), NOT over
|
||||
the gateway's outbound ``/relay`` WebSocket: the connector instance that owns a
|
||||
platform socket is generally not the instance a given gateway dialed out to, so
|
||||
inbound is delivered to a tenant ENDPOINT (which may load-balance across gateway
|
||||
instances). Each delivery is HMAC-signed with the per-tenant **delivery key**
|
||||
(``gateway/relay/auth.py``); this receiver verifies the signature over the EXACT
|
||||
raw request bytes before accepting the event.
|
||||
|
||||
Two routes (mirroring the connector's two POST targets):
|
||||
POST {base} {"type":"message", "event": <MessageEvent>, ...}
|
||||
POST {base}/interrupt {"type":"interrupt","session_key": ..., "reason"?}
|
||||
|
||||
The receiver:
|
||||
1. reads the RAW body bytes (never a reparsed/re-serialized form — the HMAC is
|
||||
over the literal bytes the connector signed),
|
||||
2. verifies ``x-relay-signature`` / ``x-relay-timestamp`` against the delivery
|
||||
key verify list (primary + secondary during rotation), within the replay
|
||||
window — rejects 401 on any failure,
|
||||
3. parses the JSON and dispatches: a ``message`` to the inbound handler (the
|
||||
RelayAdapter's ``handle_message`` via the transport's normal path), an
|
||||
``interrupt`` to the interrupt handler.
|
||||
|
||||
EXPERIMENTAL: the transport protocol may change without a deprecation cycle
|
||||
until ≥2 Class-1 platforms validate it. See docs/relay-connector-contract.md.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, Awaitable, Callable, Optional, Sequence
|
||||
|
||||
from gateway.platforms.base import MessageEvent
|
||||
from gateway.relay.auth import (
|
||||
DELIVERY_SIG_HEADER,
|
||||
DELIVERY_TS_HEADER,
|
||||
verify_delivery_signature,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Callbacks the receiver dispatches verified deliveries to.
|
||||
InboundMessageHandler = Callable[[MessageEvent], Awaitable[None]]
|
||||
InboundInterruptHandler = Callable[[str, str], Awaitable[None]]
|
||||
|
||||
try: # lazy/optional dep — mirrors the other HTTP-receiving adapters
|
||||
from aiohttp import web
|
||||
except ImportError: # pragma: no cover - exercised only when the extra is absent
|
||||
web = None # type: ignore[assignment]
|
||||
|
||||
AIOHTTP_AVAILABLE = web is not None
|
||||
|
||||
|
||||
def _event_from_wire(raw: dict) -> MessageEvent:
|
||||
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
|
||||
|
||||
Identical mapping to the WS transport's ``_event_from_wire`` (the wire shape
|
||||
is the same; only the transport differs). Kept here so the HTTP receiver has
|
||||
no import dependency on the WS transport module.
|
||||
"""
|
||||
from gateway.config import Platform
|
||||
from gateway.platforms.base import MessageType
|
||||
from gateway.session import SessionSource
|
||||
|
||||
src = raw.get("source", {}) or {}
|
||||
platform = src.get("platform", "relay")
|
||||
try:
|
||||
platform_enum = Platform(platform)
|
||||
except ValueError:
|
||||
platform_enum = Platform.RELAY
|
||||
|
||||
source = SessionSource(
|
||||
platform=platform_enum,
|
||||
chat_id=src.get("chat_id", ""),
|
||||
chat_type=src.get("chat_type", "dm"),
|
||||
chat_name=src.get("chat_name"),
|
||||
user_id=src.get("user_id"),
|
||||
user_name=src.get("user_name"),
|
||||
thread_id=src.get("thread_id"),
|
||||
chat_topic=src.get("chat_topic"),
|
||||
user_id_alt=src.get("user_id_alt"),
|
||||
chat_id_alt=src.get("chat_id_alt"),
|
||||
guild_id=src.get("guild_id"),
|
||||
parent_chat_id=src.get("parent_chat_id"),
|
||||
message_id=src.get("message_id"),
|
||||
)
|
||||
try:
|
||||
msg_type = MessageType(raw.get("message_type", "text"))
|
||||
except ValueError:
|
||||
msg_type = MessageType.TEXT
|
||||
|
||||
return MessageEvent(
|
||||
text=raw.get("text", ""),
|
||||
message_type=msg_type,
|
||||
source=source,
|
||||
message_id=raw.get("message_id"),
|
||||
reply_to_message_id=raw.get("reply_to_message_id"),
|
||||
media_urls=raw.get("media_urls") or [],
|
||||
)
|
||||
|
||||
|
||||
class InboundDeliveryReceiver:
|
||||
"""Verifies + dispatches signed connector→gateway inbound deliveries.
|
||||
|
||||
Transport-agnostic core: ``handle_raw`` takes the raw body bytes + headers +
|
||||
which route was hit and returns ``(status, body)``. The aiohttp wiring
|
||||
(``build_app`` / ``serve``) is a thin shell so the verify+dispatch logic is
|
||||
unit-testable without a live socket.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
delivery_key_verify_list: Callable[[], Sequence[str]],
|
||||
on_message: InboundMessageHandler,
|
||||
on_interrupt: Optional[InboundInterruptHandler] = None,
|
||||
max_skew_seconds: int = 300,
|
||||
) -> None:
|
||||
# A callable (not a static list) so a rotated delivery key is picked up
|
||||
# without rebuilding the receiver — mirrors the connector's verify list.
|
||||
self._verify_list = delivery_key_verify_list
|
||||
self._on_message = on_message
|
||||
self._on_interrupt = on_interrupt
|
||||
self._max_skew_seconds = max_skew_seconds
|
||||
|
||||
async def handle_raw(
|
||||
self, *, raw_body: bytes, timestamp: Optional[str], signature: Optional[str], is_interrupt: bool
|
||||
) -> tuple[int, dict]:
|
||||
"""Verify the signature over ``raw_body`` and dispatch. Returns (status, json).
|
||||
|
||||
401 on a missing/invalid/expired signature (never dispatches unverified).
|
||||
400 on malformed JSON. 200 on a verified, dispatched delivery.
|
||||
"""
|
||||
verify_keys = list(self._verify_list() or [])
|
||||
if not verify_keys:
|
||||
# No delivery key provisioned -> we cannot verify -> reject. A gateway
|
||||
# that hasn't enrolled must not accept inbound (fail closed).
|
||||
logger.warning("relay inbound: no delivery key configured; rejecting")
|
||||
return 401, {"error": "no delivery key configured"}
|
||||
|
||||
# Verify over the EXACT raw bytes the connector signed. Decode to text
|
||||
# with the same UTF-8 the connector's JSON.stringify produced; a single
|
||||
# differing byte breaks the HMAC (raw-body-preservation discipline).
|
||||
body_text = raw_body.decode("utf-8", errors="strict")
|
||||
if not verify_delivery_signature(
|
||||
body_text, timestamp, signature, verify_keys, self._max_skew_seconds
|
||||
):
|
||||
return 401, {"error": "invalid delivery signature"}
|
||||
|
||||
try:
|
||||
payload = json.loads(body_text)
|
||||
except json.JSONDecodeError:
|
||||
return 400, {"error": "invalid JSON body"}
|
||||
|
||||
if is_interrupt or payload.get("type") == "interrupt":
|
||||
session_key = str(payload.get("session_key", ""))
|
||||
chat_id = str(payload.get("chat_id", "") or payload.get("reason", "") or "")
|
||||
if self._on_interrupt is not None and session_key:
|
||||
await self._on_interrupt(session_key, chat_id)
|
||||
return 200, {"ok": True}
|
||||
|
||||
# Default: a normalized inbound message event.
|
||||
event_raw = payload.get("event")
|
||||
if not isinstance(event_raw, dict):
|
||||
return 400, {"error": "missing event"}
|
||||
event = _event_from_wire(event_raw)
|
||||
await self._on_message(event)
|
||||
return 200, {"ok": True}
|
||||
|
||||
# ── aiohttp wiring (thin shell over handle_raw) ──────────────────────
|
||||
def build_app(self) -> Any:
|
||||
"""Build an aiohttp Application exposing the delivery + interrupt routes."""
|
||||
if not AIOHTTP_AVAILABLE:
|
||||
raise RuntimeError(
|
||||
"InboundDeliveryReceiver requires the 'aiohttp' package "
|
||||
"(install the messaging extra)."
|
||||
)
|
||||
|
||||
async def _deliver(request: Any) -> Any:
|
||||
return await self._respond(request, is_interrupt=False)
|
||||
|
||||
async def _interrupt(request: Any) -> Any:
|
||||
return await self._respond(request, is_interrupt=True)
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_get("/healthz", lambda _: web.Response(text="ok"))
|
||||
app.router.add_post("/", _deliver)
|
||||
app.router.add_post("/interrupt", _interrupt)
|
||||
return app
|
||||
|
||||
async def _respond(self, request: Any, *, is_interrupt: bool) -> Any:
|
||||
# Read the RAW bytes — do NOT use request.json() (it reparses and we'd
|
||||
# verify over a re-serialized form, breaking the HMAC).
|
||||
raw_body = await request.read()
|
||||
status, body = await self.handle_raw(
|
||||
raw_body=raw_body,
|
||||
timestamp=request.headers.get(DELIVERY_TS_HEADER),
|
||||
signature=request.headers.get(DELIVERY_SIG_HEADER),
|
||||
is_interrupt=is_interrupt,
|
||||
)
|
||||
return web.json_response(body, status=status)
|
||||
@@ -26,6 +26,7 @@ class StubConnector:
|
||||
def __init__(self, descriptor: CapabilityDescriptor) -> None:
|
||||
self._descriptor = descriptor
|
||||
self._inbound: Optional[InboundHandler] = None
|
||||
self._interrupt_inbound: Optional[Any] = None
|
||||
self.connected = False
|
||||
self.sent: List[Dict[str, Any]] = []
|
||||
self.interrupts: List[Dict[str, Any]] = []
|
||||
@@ -51,6 +52,11 @@ class StubConnector:
|
||||
def set_inbound_handler(self, handler: InboundHandler) -> None:
|
||||
self._inbound = handler
|
||||
|
||||
def set_interrupt_inbound_handler(self, handler: Any) -> None:
|
||||
"""Mirror the real WS transport: the adapter registers its interrupt
|
||||
bridge here so connector→gateway interrupt_inbound frames route to it."""
|
||||
self._interrupt_inbound = handler
|
||||
|
||||
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
self.sent.append(action)
|
||||
if action.get("op") == "send":
|
||||
@@ -73,3 +79,9 @@ class StubConnector:
|
||||
if self._inbound is None:
|
||||
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
|
||||
await self._inbound(event)
|
||||
|
||||
async def push_interrupt(self, session_key: str, chat_id: str) -> None:
|
||||
"""Simulate the connector delivering an interrupt_inbound over the WS."""
|
||||
if self._interrupt_inbound is None:
|
||||
raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)")
|
||||
await self._interrupt_inbound(session_key, chat_id)
|
||||
|
||||
@@ -1,150 +0,0 @@
|
||||
"""Unit tests for gateway/relay/inbound_receiver.py.
|
||||
|
||||
Covers the verify-then-dispatch core (handle_raw): a correctly-signed message
|
||||
delivery is verified + dispatched; an interrupt delivery routes to the interrupt
|
||||
handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed
|
||||
JSON is 400. Signatures are produced with the SAME auth primitives the connector
|
||||
uses (gateway/relay/auth.py sign), so this exercises the real verify path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.relay.auth import sign
|
||||
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
|
||||
|
||||
_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
|
||||
|
||||
|
||||
def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]:
|
||||
"""Serialize compactly (as the connector's JSON.stringify does), sign it."""
|
||||
body = json.dumps(body_obj, separators=(",", ":"))
|
||||
raw = body.encode("utf-8")
|
||||
t = ts if ts is not None else int(time.time())
|
||||
return raw, str(t), sign(f"{t}.{body}", key)
|
||||
|
||||
|
||||
def _receiver(**kw):
|
||||
received: list = []
|
||||
interrupts: list = []
|
||||
|
||||
async def on_message(ev):
|
||||
received.append(ev)
|
||||
|
||||
async def on_interrupt(sk, chat):
|
||||
interrupts.append((sk, chat))
|
||||
|
||||
r = InboundDeliveryReceiver(
|
||||
delivery_key_verify_list=lambda: [_KEY],
|
||||
on_message=on_message,
|
||||
on_interrupt=on_interrupt,
|
||||
**kw,
|
||||
)
|
||||
return r, received, interrupts
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_valid_message_delivery_dispatched():
|
||||
r, received, _ = _receiver()
|
||||
raw, ts, sig = _signed(
|
||||
{
|
||||
"type": "message",
|
||||
"event": {
|
||||
"text": "hello",
|
||||
"message_type": "text",
|
||||
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
|
||||
},
|
||||
}
|
||||
)
|
||||
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 200 and body == {"ok": True}
|
||||
assert len(received) == 1
|
||||
assert received[0].text == "hello"
|
||||
assert received[0].source.guild_id == "guildA"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_valid_interrupt_delivery_routes_to_interrupt_handler():
|
||||
r, _, interrupts = _receiver()
|
||||
raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"})
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True)
|
||||
assert status == 200
|
||||
assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_tampered_body_rejected_401():
|
||||
r, received, _ = _receiver()
|
||||
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
|
||||
status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 401
|
||||
assert received == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unsigned_rejected_401():
|
||||
r, _, _ = _receiver()
|
||||
raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False)
|
||||
assert status == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_expired_timestamp_rejected_401():
|
||||
r, _, _ = _receiver(max_skew_seconds=300)
|
||||
raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1)
|
||||
# ts=1 (1970) is far outside the 300s window vs now.
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False)
|
||||
assert status == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wrong_key_rejected_401():
|
||||
r, _, _ = _receiver()
|
||||
other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
|
||||
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other)
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_delivery_key_fails_closed_401():
|
||||
async def on_message(ev):
|
||||
pass
|
||||
|
||||
r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message)
|
||||
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 401
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_rotation_secondary_key_accepted():
|
||||
new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
received: list = []
|
||||
|
||||
async def on_message(ev):
|
||||
received.append(ev)
|
||||
|
||||
# Connector still signs with the OLD key (secondary); verify list has both.
|
||||
r = InboundDeliveryReceiver(
|
||||
delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message
|
||||
)
|
||||
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY)
|
||||
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 200 and len(received) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_malformed_json_after_valid_signature_is_400():
|
||||
r, _, _ = _receiver()
|
||||
# Sign a non-JSON body so the signature passes but json.loads fails.
|
||||
raw = b"not json at all"
|
||||
ts = str(int(time.time()))
|
||||
sig = sign(f"{ts}.{raw.decode()}", _KEY)
|
||||
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
|
||||
assert status == 400
|
||||
@@ -67,3 +67,23 @@ async def test_outbound_interrupt_reaches_connector(adapter):
|
||||
assert stub.interrupts == [
|
||||
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_wires_inbound_interrupt_over_ws(adapter):
|
||||
"""WS-only inbound: connect() registers BOTH the inbound message handler AND
|
||||
the interrupt_inbound handler on the transport, so a connector-delivered
|
||||
interrupt_inbound frame (no HTTP receiver) reaches the right session."""
|
||||
await adapter.connect()
|
||||
stub = adapter._transport
|
||||
# Both connector->gateway handlers are wired post-connect.
|
||||
assert stub._inbound is not None
|
||||
assert stub._interrupt_inbound is not None
|
||||
|
||||
key = "agent:main:discord:group:chanA:userX"
|
||||
ev = asyncio.Event()
|
||||
adapter._active_sessions[key] = ev
|
||||
|
||||
# Simulate the connector pushing an interrupt_inbound frame down the WS.
|
||||
await stub.push_interrupt(key, chat_id="chanA")
|
||||
assert ev.is_set() is True, "interrupt delivered over the WS must cancel the target turn"
|
||||
|
||||
@@ -48,16 +48,14 @@ def _relay_py_files() -> list[Path]:
|
||||
|
||||
|
||||
# ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS
|
||||
# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py``
|
||||
# is the signed-inbound-delivery receiver that USES that channel auth to verify
|
||||
# connector→gateway POSTs. Both are net-new, intended, and the whole point of
|
||||
# authenticating an untrusted/disposable gateway — they are NOT platform crypto.
|
||||
# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any
|
||||
# platform's signing secret), so they are exempt from the platform-crypto symbol
|
||||
# scan below. The module-import ban (platform-crypto modules) still applies to
|
||||
# every file including these — they import only stdlib hmac/hashlib and each
|
||||
# other, never a platform-crypto module, so they stay clean there.
|
||||
_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"}
|
||||
# upgrade bearer). It is net-new, intended, and the whole point of
|
||||
# authenticating an untrusted/disposable gateway — it is NOT platform crypto.
|
||||
# It uses HMAC over the connector's per-gateway secret (NOT any platform's
|
||||
# signing secret), so it is exempt from the platform-crypto symbol scan below.
|
||||
# The module-import ban (platform-crypto modules) still applies to every file
|
||||
# including this one — it imports only stdlib hmac/hashlib, never a
|
||||
# platform-crypto module, so it stays clean there.
|
||||
_CHANNEL_AUTH_FILES = {"auth.py"}
|
||||
|
||||
|
||||
def test_relay_package_imports_no_platform_crypto():
|
||||
|
||||
@@ -8,6 +8,8 @@ TRIGGER logic, in-process env wiring, and fail-soft boot behaviour.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
import gateway.relay as relay
|
||||
@@ -126,8 +128,9 @@ def test_provisions_and_sets_env_in_process(monkeypatch):
|
||||
# Creds landed in os.environ (in-process), so register_relay_adapter() reads them.
|
||||
gid, secret = relay.relay_connection_auth()
|
||||
assert gid and secret == "a" * 64
|
||||
key, _host, _port = relay.relay_inbound_config()
|
||||
assert key == "b" * 64
|
||||
# The delivery key is persisted in-process too (issued by the connector,
|
||||
# kept for forward-compat; inbound rides the WS so it isn't consumed).
|
||||
assert os.environ["GATEWAY_RELAY_DELIVERY_KEY"] == "b" * 64
|
||||
|
||||
|
||||
def test_outbound_only_when_no_endpoint(monkeypatch):
|
||||
|
||||
@@ -80,6 +80,65 @@ def test_nullable_type_array_collapsed_to_single_string():
|
||||
assert prop.get("nullable") is True
|
||||
|
||||
|
||||
def test_multitype_array_becomes_anyof_no_branch_dropped():
|
||||
# Ported from anomalyco/opencode#31877: a genuine multi-type array such as
|
||||
# ["number", "string"] (common in MCP tool schemas) must keep BOTH branches
|
||||
# as an anyOf, not silently drop all but the first. Several backends
|
||||
# (llama.cpp, Gemini via OpenAI-compatible transports) reject the array form.
|
||||
tools = [_tool("t", {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"status": {"type": ["number", "string"], "description": "status filter"},
|
||||
},
|
||||
})]
|
||||
out = sanitize_tool_schemas(tools)
|
||||
prop = out[0]["function"]["parameters"]["properties"]["status"]
|
||||
assert "type" not in prop
|
||||
assert prop["anyOf"] == [{"type": "number"}, {"type": "string"}]
|
||||
assert prop.get("nullable") is None
|
||||
# Sibling keywords survive alongside the generated anyOf.
|
||||
assert prop["description"] == "status filter"
|
||||
|
||||
|
||||
def test_multitype_array_with_null_lifts_nullable():
|
||||
tools = [_tool("t", {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"v": {"type": ["integer", "boolean", "null"]},
|
||||
},
|
||||
})]
|
||||
out = sanitize_tool_schemas(tools)
|
||||
prop = out[0]["function"]["parameters"]["properties"]["v"]
|
||||
assert "type" not in prop
|
||||
assert prop["anyOf"] == [{"type": "integer"}, {"type": "boolean"}]
|
||||
assert prop.get("nullable") is True
|
||||
|
||||
|
||||
def test_all_null_type_array_becomes_null_type():
|
||||
tools = [_tool("t", {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"n": {"type": ["null"]},
|
||||
},
|
||||
})]
|
||||
out = sanitize_tool_schemas(tools)
|
||||
prop = out[0]["function"]["parameters"]["properties"]["n"]
|
||||
assert prop["type"] == "null"
|
||||
|
||||
|
||||
def test_single_element_type_array_unwrapped():
|
||||
tools = [_tool("t", {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"s": {"type": ["string"]},
|
||||
},
|
||||
})]
|
||||
out = sanitize_tool_schemas(tools)
|
||||
prop = out[0]["function"]["parameters"]["properties"]["s"]
|
||||
assert prop["type"] == "string"
|
||||
assert prop.get("nullable") is None
|
||||
|
||||
|
||||
def test_anyof_nested_objects_sanitized():
|
||||
tools = [_tool("t", {
|
||||
"type": "object",
|
||||
|
||||
@@ -235,7 +235,9 @@ def _sanitize_node(node: Any, path: str) -> Any:
|
||||
``{"type": <value>}`` so downstream consumers see a dict.
|
||||
- Injects ``properties: {}`` into object-typed nodes missing it.
|
||||
- Normalizes ``type: [X, "null"]`` arrays to single ``type: X`` (keeping
|
||||
``nullable: true`` as a hint).
|
||||
``nullable: true`` as a hint), and multi-type arrays like
|
||||
``["number", "string"]`` to an ``anyOf`` of single-type schemas so no
|
||||
branch is dropped (ported from anomalyco/opencode#31877).
|
||||
- Recurses into ``properties``, ``items``, ``additionalProperties``,
|
||||
``anyOf``, ``oneOf``, ``allOf``, and ``$defs`` / ``definitions``.
|
||||
"""
|
||||
@@ -268,23 +270,39 @@ def _sanitize_node(node: Any, path: str) -> Any:
|
||||
|
||||
out: dict = {}
|
||||
for key, value in node.items():
|
||||
# type: [X, "null"] → type: X (the backend's tool-call parser only
|
||||
# accepts singular string types; nullable is lost but the call still
|
||||
# succeeds, and the model can still pass null on its own.)
|
||||
# JSON Schema ``type`` arrays (e.g. ``["number", "string"]``, common
|
||||
# in MCP tool schemas) are rejected by several tool-call backends:
|
||||
# * llama.cpp's grammar generator only accepts a singular string type.
|
||||
# * Gemini (including OpenAI-compatible transports such as GitHub
|
||||
# Copilot proxying to Gemini) rejects the array form outright —
|
||||
# plain @ai-sdk/google rewrites it, but the OpenAI-compatible path
|
||||
# forwards it verbatim and the backend 400s.
|
||||
#
|
||||
# Normalize per the SDK's behavior:
|
||||
# * single non-null type → ``type: X`` (+ ``nullable: true`` if the
|
||||
# array also contained "null"). No data lost.
|
||||
# * multiple non-null types → ``anyOf`` of single-type schemas, so
|
||||
# EVERY branch survives instead of silently dropping all but the
|
||||
# first. ``null`` is lifted into ``nullable: true``.
|
||||
# * all-null / empty → ``type: "null"`` (or object fallback).
|
||||
# Ported from anomalyco/opencode#31877.
|
||||
if key == "type" and isinstance(value, list):
|
||||
non_null = [t for t in value if t != "null"]
|
||||
if len(non_null) == 1 and isinstance(non_null[0], str):
|
||||
has_null = "null" in value
|
||||
non_null = [t for t in value if isinstance(t, str) and t != "null"]
|
||||
if len(non_null) == 1:
|
||||
out["type"] = non_null[0]
|
||||
if "null" in value:
|
||||
if has_null:
|
||||
out.setdefault("nullable", True)
|
||||
continue
|
||||
# Fallback: pick the first string type, drop the rest.
|
||||
first_str = next((t for t in value if isinstance(t, str) and t != "null"), None)
|
||||
if first_str:
|
||||
out["type"] = first_str
|
||||
if len(non_null) >= 2:
|
||||
# Preserve all branches as a union instead of dropping them.
|
||||
out["anyOf"] = [{"type": t} for t in non_null]
|
||||
if has_null:
|
||||
out.setdefault("nullable", True)
|
||||
continue
|
||||
# All-null or empty list → treat as object.
|
||||
out["type"] = "object"
|
||||
# No usable non-null type: all-null array → type: "null";
|
||||
# otherwise an empty/garbage array → object fallback.
|
||||
out["type"] = "null" if has_null else "object"
|
||||
continue
|
||||
|
||||
if key in {"properties", "$defs", "definitions"} and isinstance(value, dict):
|
||||
|
||||
Reference in New Issue
Block a user