Compare commits

...

2 Commits

Author SHA1 Message Date
teknium1
eba8cc564e fix(schema): preserve multi-type arrays as anyOf instead of dropping branches
Port from anomalyco/opencode#31877: JSON Schema type arrays like
["number","string"] (common in MCP tool schemas) were collapsed to the
first non-null type, silently dropping every other branch. Several
tool-call backends reject the array form outright — llama.cpp's grammar
generator and Gemini via OpenAI-compatible transports (e.g. GitHub
Copilot proxying to Gemini) 400 on it.

_sanitize_node now mirrors @ai-sdk/google: a single non-null type stays
type:X (+nullable if null was present), multiple non-null types become
an anyOf of single-type schemas so no branch is lost, and an all-null
array becomes type:null. Single-null collapse is unchanged.

Verified nested (object props, array items) survive the full sanitize
pipeline — combinator stripping is top-level-only and nullable-union
collapse only fires on single-survivor unions, so multi-type anyOf is
left intact.
2026-06-18 17:03:20 -07:00
Ben Barclay
d2c53ff558 feat(relay): WS-only inbound on the gateway adapter (Phase 3) (#48294)
The connector now delivers inbound (messages + interrupts) over the gateway's
OUTBOUND /relay WebSocket, not a signed HTTP POST to an inbound endpoint. The
gateway needs no inbound HTTP port — which is what makes hosted gateways (no
public IP) able to receive inbound at all.

- gateway/relay/adapter.py: connect() wires set_interrupt_inbound_handler(
  self.on_interrupt) so connector->gateway interrupt_inbound frames bridge into
  the existing per-session interrupt path (the inbound message handler was
  already wired). Removed _maybe_start_inbound_receiver() + the _inbound_runner
  lifecycle — there is no HTTP receiver anymore.
- gateway/relay/inbound_receiver.py: deleted (the signed-HTTP InboundDelivery
  receiver).
- gateway/relay/__init__.py: removed relay_inbound_config() (dead with the
  receiver gone). The delivery key is still set in-process by self-provision for
  forward-compat but is no longer consumed for inbound.
- docs/relay-connector-contract.md: §3 rewritten — inbound is the WS back-channel
  routed cross-instance via the connector's relay bus; §5 interrupt + §6 auth
  table updated; the old signed-HTTP-POST + per-tenant-delivery-key-signing path
  is documented as superseded. gatewayEndpoint noted as passthrough-plane only.

Tests: stub_connector grows set_interrupt_inbound_handler + push_interrupt;
new test_relay_interrupt case proves connect() wires BOTH inbound handlers and an
interrupt_inbound frame over the WS cancels the right session. Removed the
HTTP-receiver test; updated the crypto-shedding scan + self-provision delivery-key
assertion. 88 relay tests pass.

EXPERIMENTAL. Pairs with gateway-gateway (relay bus + WsGatewayDelivery) and the
NAS GATEWAY_RELAY_URL stamp. The cross-repo E2E (connector repo) proves the full
multi-instance path against this production adapter code.
2026-06-19 09:33:15 +10:00
11 changed files with 207 additions and 489 deletions

View File

@@ -62,33 +62,55 @@ live platform adapter's capability methods.
The connector normalizes each platform wire event into a `MessageEvent`
(`gateway/platforms/base.py`) and delivers it to the gateway. **Inbound is
delivered over a signed HTTP POST, not the outbound `/relay` WebSocket** (see
the transport note below). The gateway keys the session via `build_session_key()`
delivered over the gateway's OUTBOUND `/relay` WebSocket** (see the transport
note below) — the connector pushes an `inbound` frame down the socket the
gateway already dialed. The gateway keys the session via `build_session_key()`
from the embedded `SessionSource` — so populating the right discriminators is
the single highest-correctness responsibility of the connector.
### Inbound transport (signed HTTP POST, not the outbound WS)
### Inbound transport (WS back-channel, not HTTP)
The gateway dials **out** to the connector's `/relay` WebSocket for the
handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound,
however, is delivered the other way: the connector **POSTs** the normalized
event to the gateway's inbound endpoint (`HttpGatewayDelivery` on the connector;
`gateway/relay/inbound_receiver.py` on the gateway). The reason is
multi-instance: the connector instance that owns a platform's socket (and thus
produces inbound events) is generally **not** the instance a given gateway
dialed its outbound WS into, so inbound must target a tenant **endpoint** (which
may load-balance across gateway instances) rather than ride one gateway's
outbound socket. Each delivery is HMAC-signed with the per-tenant **delivery
key** (§6.1); the gateway verifies the signature over the exact raw bytes before
accepting the event. Two POST targets:
handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound rides
the **same socket** in the other direction: the connector pushes an `inbound`
frame (and `interrupt_inbound` for §5) down the gateway's outbound WS. There is
**no gateway-side inbound HTTP endpoint** — a gateway need not (and, when hosted,
cannot) expose any inbound port; everything flows over the connection it
initiated.
**Multi-instance routing.** The connector instance that owns a platform's socket
(and thus produces inbound events) is generally **not** the instance the gateway
dialed its outbound WS into. The producing instance therefore publishes the
event on the connector's internal **relay bus** (Redis pub/sub; `RelayBus` in
`src/core/relayBus.ts`) keyed by tenant. Every connector instance subscribes and
routes each message to its **local** sessions for that tenant
(`RelayServer.routeBusMessage`); the single instance that actually holds the
gateway's socket delivers it, and instances with no local session for the tenant
no-op. Cross-instance delivery is thus an in-cluster Redis hop, not a public
HTTP call.
Frames (connector → gateway, over the WS):
- `{"type":"inbound", "event": <MessageEvent>, "bufferId"?}`
- `{"type":"interrupt_inbound", "session_key", "chat_id"}` (§5)
**Trust.** The WS upgrade is authenticated with the gateway's per-gateway secret
(§6.1), so the channel is trusted end to end — inbound frames are not separately
HMAC-signed (the authenticated socket subsumes the per-delivery origin proof the
old HTTP path needed). The relay-bus hop is inside the connector trust domain
(same as the lease/buffer/capability stores).
> Earlier drafts of this contract delivered inbound over a signed **HTTP POST**
> to a `gatewayEndpoint` (`HttpGatewayDelivery` + a gateway-side
> `inbound_receiver`), HMAC-signed with a per-tenant delivery key. That required
> every gateway to expose a reachable inbound URL — impossible for hosted
> gateways, which have no public IP. The WS back-channel above replaces it; the
> per-tenant delivery key is retained at provision for forward-compat but is no
> longer used for inbound. `gatewayEndpoint` remains only for the **passthrough
> plane** (Class-2/3 webhooks like Discord interactions / Twilio), which is a
> separate synchronous-forward path and out of scope for this section.
- `POST {gatewayEndpoint}``{"type":"message", "event": <MessageEvent>}`
- `POST {gatewayEndpoint}/interrupt``{"type":"interrupt", "session_key", "reason"?}` (§5)
> An earlier draft of this contract delivered inbound over the WS `inbound`
> frame. That only works single-instance and predates the multi-instance
> socket-ownership + channel-auth model; the signed-HTTP path above is the
> shipped design.
### SessionSource fields (the wire surface)
@@ -178,13 +200,15 @@ gateway holds zero capability material). Source of truth:
mid-turn `/stop` over the outbound WS. The connector MUST forward it to the
gateway instance running that `session_key` (the routing invariant).
- **Connector → gateway:** an inbound interrupt for a `session_key` is delivered
as a **signed HTTP POST** to `{gatewayEndpoint}/interrupt` (§3 transport note),
and bridged by the adapter's `on_interrupt(session_key, chat_id)` into the
existing per-session interrupt mechanism, cancelling exactly that turn
as an `interrupt_inbound` frame down the gateway's outbound WS (§3 transport
note) — routed cross-instance via the relay bus to whichever instance holds
the socket — and bridged by the adapter's `on_interrupt(session_key, chat_id)`
into the existing per-session interrupt mechanism, cancelling exactly that turn
(siblings untouched).
The gateway→connector `/stop` rides the outbound WS; the connector→gateway
interrupt rides the same signed-HTTP inbound path as a normalized event.
Both directions ride the gateway's outbound WS: the gateway→connector `/stop`
egresses over it, and the connector→gateway interrupt rides the same `inbound`
back-channel as a normalized event.
---
@@ -231,20 +255,21 @@ only in transport. See `docs/capability-trust-boundary.md` (connector repo:
A2 makes the connector the sole holder of platform secrets while the gateway may
be **customer-managed and internet-exposed**, so the connector⇄gateway channel
is itself authenticated. The gateway holds two enrollment-issued credentials
(`hermes gateway enroll` → connector `/relay/enroll`): a **per-gateway secret**
and a **per-tenant delivery key**. Both are HMAC-SHA256 schemes with a
multi-secret rotation verify list (gateway side: `gateway/relay/auth.py`;
connector side: `src/core/relayAuthToken.ts` + `src/core/deliverySigning.ts`).
is itself authenticated. The gateway holds an enrollment- or provision-issued
**per-gateway secret** (`hermes gateway enroll` → connector `/relay/enroll`, or
managed self-provision → `/relay/provision`) that authenticates its outbound WS
upgrade. It is an HMAC-SHA256 scheme with a multi-secret rotation verify list
(gateway side: `gateway/relay/auth.py`; connector side:
`src/core/relayAuthToken.ts`).
| Leg | Credential | Mechanism |
|-----|-----------|-----------|
| Gateway → connector WS upgrade | per-gateway secret | An `Authorization` bearer header on the `/relay` upgrade. The token is `base64url(payload:exp:sig)` where `payload = gatewayId` and `sig = HMAC(payload:exp, secret)`. Connector verifies and rejects the upgrade (**close 4401**) on mismatch/absence/revocation. The authenticated tenant comes from the connector's store, never the `hello` frame. |
| Connector → gateway inbound POST | per-tenant delivery key | Two headers: `x-relay-timestamp` (unix seconds) and `x-relay-signature` (hex `HMAC(ts.rawBody, deliveryKey)`). Gateway verifies over the **exact raw bytes** within a ±300s replay window before accepting the event; rejects **401** otherwise. |
| Connector → gateway inbound (`inbound` / `interrupt_inbound` frames) | — (rides the authenticated WS) | Inbound is pushed down the gateway's already-authenticated outbound socket (§3), so no per-message signature is needed. A **per-tenant delivery key** is still issued at enroll/provision and retained for forward-compat, but is no longer used to sign inbound. |
This is the **channel** authenticator — distinct from platform crypto, which the
relay path still sheds entirely (§6). The gateway holds zero platform secrets;
these two keys authenticate only the connector link. Full threat model +
the per-gateway secret authenticates only the connector link. Full threat model +
enrollment/rotation/kill-switch design: `docs/connector-gateway-auth-design.md`
(connector repo).

View File

@@ -79,40 +79,6 @@ def relay_connection_auth() -> tuple[Optional[str], Optional[str]]:
return (gateway_id or None, secret or None)
def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]:
"""Resolve (delivery_key, bind_host, bind_port) for the inbound receiver.
The connector delivers normalized inbound events to this gateway over a
SIGNED HTTP POST (not the outbound WS), verified with the per-tenant delivery
key issued at enrollment (``GATEWAY_RELAY_DELIVERY_KEY``). The receiver only
starts when a delivery key AND a bind port are configured — a gateway with no
public inbound URL (e.g. a purely outbound dev run) simply doesn't run it.
Env first (Docker), then ``gateway.relay_delivery_key`` /
``gateway.relay_inbound_host`` / ``gateway.relay_inbound_port`` in config.yaml.
Port 0 (default/unset) -> receiver disabled.
"""
key = os.environ.get("GATEWAY_RELAY_DELIVERY_KEY", "").strip()
host = os.environ.get("GATEWAY_RELAY_INBOUND_HOST", "").strip()
port_raw = os.environ.get("GATEWAY_RELAY_INBOUND_PORT", "").strip()
if not (key and port_raw):
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = (_load_gateway_config().get("gateway") or {})
key = key or str(cfg.get("relay_delivery_key", "") or "").strip()
host = host or str(cfg.get("relay_inbound_host", "") or "").strip()
if not port_raw:
port_raw = str(cfg.get("relay_inbound_port", "") or "").strip()
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
pass
try:
port = int(port_raw) if port_raw else 0
except ValueError:
port = 0
return (key or None, host or "0.0.0.0", port)
def relay_endpoint() -> Optional[str]:
"""The gateway's own PUBLIC inbound URL, asserted to the connector at provision.
@@ -318,8 +284,11 @@ def self_provision_if_managed() -> bool:
logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc)
return False
# Set creds in-process so register_relay_adapter() + relay_inbound_config()
# read them from os.environ. Never logged.
# Set creds in-process so register_relay_adapter() reads them from os.environ
# (the per-gateway secret authenticates the outbound WS upgrade). The delivery
# key is still issued by the connector and persisted for forward-compat, but
# inbound now rides the WS (no HTTP receiver), so it is not consumed here.
# Never logged.
os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id)
os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "")
os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "")

View File

@@ -58,10 +58,6 @@ class RelayAdapter(BasePlatformAdapter):
# Capability surface read by stream_consumer (getattr(..., 4096)).
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
# Inbound delivery receiver (signed connector→gateway HTTP POSTs). Built
# lazily in connect() when a delivery key + bind port are configured; a
# purely-outbound dev gateway runs without it. See inbound_receiver.py.
self._inbound_runner: Any = None
# ── capability surface (from descriptor) ─────────────────────────────
@property
@@ -80,6 +76,12 @@ class RelayAdapter(BasePlatformAdapter):
if self._transport is None:
raise RuntimeError("RelayAdapter has no transport configured")
self._transport.set_inbound_handler(self._on_inbound)
# Inbound interrupts (connector -> owning gateway) arrive as
# interrupt_inbound frames over the SAME outbound WS; bridge them to the
# adapter's interrupt path. WS-only: there is no inbound HTTP receiver.
set_interrupt = getattr(self._transport, "set_interrupt_inbound_handler", None)
if callable(set_interrupt):
set_interrupt(self.on_interrupt)
ok = await self._transport.connect()
if not ok:
return False
@@ -92,40 +94,12 @@ class RelayAdapter(BasePlatformAdapter):
logger.warning("relay handshake failed: %s", exc)
return False
self._apply_descriptor(descriptor)
# Start the signed inbound-delivery receiver if configured (the connector
# POSTs normalized events to it over HTTP, verified with the tenant
# delivery key). Non-fatal: a receiver bind failure must not fail the
# outbound connection — the gateway can still send.
await self._maybe_start_inbound_receiver()
# Inbound (messages + interrupts) is delivered over the outbound WS via
# the connector's relay bus — there is NO inbound HTTP endpoint (hosted
# gateways have no public IP). The transport's reader already dispatches
# `inbound` / `interrupt_inbound` frames to the handlers wired above.
return True
async def _maybe_start_inbound_receiver(self) -> None:
"""Start the inbound HTTP receiver when a delivery key + port are set."""
from gateway.relay import relay_inbound_config
delivery_key, host, port = relay_inbound_config()
if not (delivery_key and port):
return # no inbound URL configured -> outbound-only gateway
try:
from aiohttp import web
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
receiver = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [delivery_key],
on_message=self._on_inbound,
on_interrupt=self.on_interrupt,
)
runner = web.AppRunner(receiver.build_app(), access_log=None)
await runner.setup()
site = web.TCPSite(runner, host, port)
await site.start()
self._inbound_runner = runner
logger.info("relay inbound receiver listening on http://%s:%s", host, port)
except Exception as exc: # noqa: BLE001 - inbound bind failure must not kill outbound
logger.warning("relay inbound receiver failed to start: %s", exc)
self._inbound_runner = None
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
"""Adopt a (re)negotiated descriptor into the live capability surface."""
self.descriptor = descriptor
@@ -148,12 +122,6 @@ class RelayAdapter(BasePlatformAdapter):
await self.interrupt_session_activity(session_key, chat_id)
async def disconnect(self) -> None:
if self._inbound_runner is not None:
try:
await self._inbound_runner.cleanup()
except Exception: # noqa: BLE001 - best-effort teardown
pass
self._inbound_runner = None
if self._transport is not None:
await self._transport.disconnect()

View File

@@ -1,204 +0,0 @@
"""Gateway-side inbound delivery receiver. EXPERIMENTAL.
The connector delivers normalized inbound events to a tenant's gateway over a
**signed HTTP POST** (connector ``src/relay/httpGatewayDelivery.ts``), NOT over
the gateway's outbound ``/relay`` WebSocket: the connector instance that owns a
platform socket is generally not the instance a given gateway dialed out to, so
inbound is delivered to a tenant ENDPOINT (which may load-balance across gateway
instances). Each delivery is HMAC-signed with the per-tenant **delivery key**
(``gateway/relay/auth.py``); this receiver verifies the signature over the EXACT
raw request bytes before accepting the event.
Two routes (mirroring the connector's two POST targets):
POST {base} {"type":"message", "event": <MessageEvent>, ...}
POST {base}/interrupt {"type":"interrupt","session_key": ..., "reason"?}
The receiver:
1. reads the RAW body bytes (never a reparsed/re-serialized form — the HMAC is
over the literal bytes the connector signed),
2. verifies ``x-relay-signature`` / ``x-relay-timestamp`` against the delivery
key verify list (primary + secondary during rotation), within the replay
window — rejects 401 on any failure,
3. parses the JSON and dispatches: a ``message`` to the inbound handler (the
RelayAdapter's ``handle_message`` via the transport's normal path), an
``interrupt`` to the interrupt handler.
EXPERIMENTAL: the transport protocol may change without a deprecation cycle
until ≥2 Class-1 platforms validate it. See docs/relay-connector-contract.md.
"""
from __future__ import annotations
import json
import logging
from typing import Any, Awaitable, Callable, Optional, Sequence
from gateway.platforms.base import MessageEvent
from gateway.relay.auth import (
DELIVERY_SIG_HEADER,
DELIVERY_TS_HEADER,
verify_delivery_signature,
)
logger = logging.getLogger(__name__)
# Callbacks the receiver dispatches verified deliveries to.
InboundMessageHandler = Callable[[MessageEvent], Awaitable[None]]
InboundInterruptHandler = Callable[[str, str], Awaitable[None]]
try: # lazy/optional dep — mirrors the other HTTP-receiving adapters
from aiohttp import web
except ImportError: # pragma: no cover - exercised only when the extra is absent
web = None # type: ignore[assignment]
AIOHTTP_AVAILABLE = web is not None
def _event_from_wire(raw: dict) -> MessageEvent:
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
Identical mapping to the WS transport's ``_event_from_wire`` (the wire shape
is the same; only the transport differs). Kept here so the HTTP receiver has
no import dependency on the WS transport module.
"""
from gateway.config import Platform
from gateway.platforms.base import MessageType
from gateway.session import SessionSource
src = raw.get("source", {}) or {}
platform = src.get("platform", "relay")
try:
platform_enum = Platform(platform)
except ValueError:
platform_enum = Platform.RELAY
source = SessionSource(
platform=platform_enum,
chat_id=src.get("chat_id", ""),
chat_type=src.get("chat_type", "dm"),
chat_name=src.get("chat_name"),
user_id=src.get("user_id"),
user_name=src.get("user_name"),
thread_id=src.get("thread_id"),
chat_topic=src.get("chat_topic"),
user_id_alt=src.get("user_id_alt"),
chat_id_alt=src.get("chat_id_alt"),
guild_id=src.get("guild_id"),
parent_chat_id=src.get("parent_chat_id"),
message_id=src.get("message_id"),
)
try:
msg_type = MessageType(raw.get("message_type", "text"))
except ValueError:
msg_type = MessageType.TEXT
return MessageEvent(
text=raw.get("text", ""),
message_type=msg_type,
source=source,
message_id=raw.get("message_id"),
reply_to_message_id=raw.get("reply_to_message_id"),
media_urls=raw.get("media_urls") or [],
)
class InboundDeliveryReceiver:
"""Verifies + dispatches signed connector→gateway inbound deliveries.
Transport-agnostic core: ``handle_raw`` takes the raw body bytes + headers +
which route was hit and returns ``(status, body)``. The aiohttp wiring
(``build_app`` / ``serve``) is a thin shell so the verify+dispatch logic is
unit-testable without a live socket.
"""
def __init__(
self,
*,
delivery_key_verify_list: Callable[[], Sequence[str]],
on_message: InboundMessageHandler,
on_interrupt: Optional[InboundInterruptHandler] = None,
max_skew_seconds: int = 300,
) -> None:
# A callable (not a static list) so a rotated delivery key is picked up
# without rebuilding the receiver — mirrors the connector's verify list.
self._verify_list = delivery_key_verify_list
self._on_message = on_message
self._on_interrupt = on_interrupt
self._max_skew_seconds = max_skew_seconds
async def handle_raw(
self, *, raw_body: bytes, timestamp: Optional[str], signature: Optional[str], is_interrupt: bool
) -> tuple[int, dict]:
"""Verify the signature over ``raw_body`` and dispatch. Returns (status, json).
401 on a missing/invalid/expired signature (never dispatches unverified).
400 on malformed JSON. 200 on a verified, dispatched delivery.
"""
verify_keys = list(self._verify_list() or [])
if not verify_keys:
# No delivery key provisioned -> we cannot verify -> reject. A gateway
# that hasn't enrolled must not accept inbound (fail closed).
logger.warning("relay inbound: no delivery key configured; rejecting")
return 401, {"error": "no delivery key configured"}
# Verify over the EXACT raw bytes the connector signed. Decode to text
# with the same UTF-8 the connector's JSON.stringify produced; a single
# differing byte breaks the HMAC (raw-body-preservation discipline).
body_text = raw_body.decode("utf-8", errors="strict")
if not verify_delivery_signature(
body_text, timestamp, signature, verify_keys, self._max_skew_seconds
):
return 401, {"error": "invalid delivery signature"}
try:
payload = json.loads(body_text)
except json.JSONDecodeError:
return 400, {"error": "invalid JSON body"}
if is_interrupt or payload.get("type") == "interrupt":
session_key = str(payload.get("session_key", ""))
chat_id = str(payload.get("chat_id", "") or payload.get("reason", "") or "")
if self._on_interrupt is not None and session_key:
await self._on_interrupt(session_key, chat_id)
return 200, {"ok": True}
# Default: a normalized inbound message event.
event_raw = payload.get("event")
if not isinstance(event_raw, dict):
return 400, {"error": "missing event"}
event = _event_from_wire(event_raw)
await self._on_message(event)
return 200, {"ok": True}
# ── aiohttp wiring (thin shell over handle_raw) ──────────────────────
def build_app(self) -> Any:
"""Build an aiohttp Application exposing the delivery + interrupt routes."""
if not AIOHTTP_AVAILABLE:
raise RuntimeError(
"InboundDeliveryReceiver requires the 'aiohttp' package "
"(install the messaging extra)."
)
async def _deliver(request: Any) -> Any:
return await self._respond(request, is_interrupt=False)
async def _interrupt(request: Any) -> Any:
return await self._respond(request, is_interrupt=True)
app = web.Application()
app.router.add_get("/healthz", lambda _: web.Response(text="ok"))
app.router.add_post("/", _deliver)
app.router.add_post("/interrupt", _interrupt)
return app
async def _respond(self, request: Any, *, is_interrupt: bool) -> Any:
# Read the RAW bytes — do NOT use request.json() (it reparses and we'd
# verify over a re-serialized form, breaking the HMAC).
raw_body = await request.read()
status, body = await self.handle_raw(
raw_body=raw_body,
timestamp=request.headers.get(DELIVERY_TS_HEADER),
signature=request.headers.get(DELIVERY_SIG_HEADER),
is_interrupt=is_interrupt,
)
return web.json_response(body, status=status)

View File

@@ -26,6 +26,7 @@ class StubConnector:
def __init__(self, descriptor: CapabilityDescriptor) -> None:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self._interrupt_inbound: Optional[Any] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
@@ -51,6 +52,11 @@ class StubConnector:
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
def set_interrupt_inbound_handler(self, handler: Any) -> None:
"""Mirror the real WS transport: the adapter registers its interrupt
bridge here so connector→gateway interrupt_inbound frames route to it."""
self._interrupt_inbound = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
@@ -73,3 +79,9 @@ class StubConnector:
if self._inbound is None:
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
await self._inbound(event)
async def push_interrupt(self, session_key: str, chat_id: str) -> None:
"""Simulate the connector delivering an interrupt_inbound over the WS."""
if self._interrupt_inbound is None:
raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)")
await self._interrupt_inbound(session_key, chat_id)

View File

@@ -1,150 +0,0 @@
"""Unit tests for gateway/relay/inbound_receiver.py.
Covers the verify-then-dispatch core (handle_raw): a correctly-signed message
delivery is verified + dispatched; an interrupt delivery routes to the interrupt
handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed
JSON is 400. Signatures are produced with the SAME auth primitives the connector
uses (gateway/relay/auth.py sign), so this exercises the real verify path.
"""
from __future__ import annotations
import json
import time
import pytest
from gateway.relay.auth import sign
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]:
"""Serialize compactly (as the connector's JSON.stringify does), sign it."""
body = json.dumps(body_obj, separators=(",", ":"))
raw = body.encode("utf-8")
t = ts if ts is not None else int(time.time())
return raw, str(t), sign(f"{t}.{body}", key)
def _receiver(**kw):
received: list = []
interrupts: list = []
async def on_message(ev):
received.append(ev)
async def on_interrupt(sk, chat):
interrupts.append((sk, chat))
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [_KEY],
on_message=on_message,
on_interrupt=on_interrupt,
**kw,
)
return r, received, interrupts
@pytest.mark.asyncio
async def test_valid_message_delivery_dispatched():
r, received, _ = _receiver()
raw, ts, sig = _signed(
{
"type": "message",
"event": {
"text": "hello",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
},
}
)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and body == {"ok": True}
assert len(received) == 1
assert received[0].text == "hello"
assert received[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_valid_interrupt_delivery_routes_to_interrupt_handler():
r, _, interrupts = _receiver()
raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True)
assert status == 200
assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u"
@pytest.mark.asyncio
async def test_tampered_body_rejected_401():
r, received, _ = _receiver()
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
assert received == []
@pytest.mark.asyncio
async def test_unsigned_rejected_401():
r, _, _ = _receiver()
raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_expired_timestamp_rejected_401():
r, _, _ = _receiver(max_skew_seconds=300)
raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1)
# ts=1 (1970) is far outside the 300s window vs now.
status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_wrong_key_rejected_401():
r, _, _ = _receiver()
other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_no_delivery_key_fails_closed_401():
async def on_message(ev):
pass
r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_rotation_secondary_key_accepted():
new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
received: list = []
async def on_message(ev):
received.append(ev)
# Connector still signs with the OLD key (secondary); verify list has both.
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message
)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and len(received) == 1
@pytest.mark.asyncio
async def test_malformed_json_after_valid_signature_is_400():
r, _, _ = _receiver()
# Sign a non-JSON body so the signature passes but json.loads fails.
raw = b"not json at all"
ts = str(int(time.time()))
sig = sign(f"{ts}.{raw.decode()}", _KEY)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 400

View File

@@ -67,3 +67,23 @@ async def test_outbound_interrupt_reaches_connector(adapter):
assert stub.interrupts == [
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
]
@pytest.mark.asyncio
async def test_connect_wires_inbound_interrupt_over_ws(adapter):
"""WS-only inbound: connect() registers BOTH the inbound message handler AND
the interrupt_inbound handler on the transport, so a connector-delivered
interrupt_inbound frame (no HTTP receiver) reaches the right session."""
await adapter.connect()
stub = adapter._transport
# Both connector->gateway handlers are wired post-connect.
assert stub._inbound is not None
assert stub._interrupt_inbound is not None
key = "agent:main:discord:group:chanA:userX"
ev = asyncio.Event()
adapter._active_sessions[key] = ev
# Simulate the connector pushing an interrupt_inbound frame down the WS.
await stub.push_interrupt(key, chat_id="chanA")
assert ev.is_set() is True, "interrupt delivered over the WS must cancel the target turn"

View File

@@ -48,16 +48,14 @@ def _relay_py_files() -> list[Path]:
# ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS
# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py``
# is the signed-inbound-delivery receiver that USES that channel auth to verify
# connectorgateway POSTs. Both are net-new, intended, and the whole point of
# authenticating an untrusted/disposable gateway — they are NOT platform crypto.
# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any
# platform's signing secret), so they are exempt from the platform-crypto symbol
# scan below. The module-import ban (platform-crypto modules) still applies to
# every file including these — they import only stdlib hmac/hashlib and each
# other, never a platform-crypto module, so they stay clean there.
_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"}
# upgrade bearer). It is net-new, intended, and the whole point of
# authenticating an untrusted/disposable gateway — it is NOT platform crypto.
# It uses HMAC over the connector's per-gateway secret (NOT any platform's
# signing secret), so it is exempt from the platform-crypto symbol scan below.
# The module-import ban (platform-crypto modules) still applies to every file
# including this one — it imports only stdlib hmac/hashlib, never a
# platform-crypto module, so it stays clean there.
_CHANNEL_AUTH_FILES = {"auth.py"}
def test_relay_package_imports_no_platform_crypto():

View File

@@ -8,6 +8,8 @@ TRIGGER logic, in-process env wiring, and fail-soft boot behaviour.
from __future__ import annotations
import os
import pytest
import gateway.relay as relay
@@ -126,8 +128,9 @@ def test_provisions_and_sets_env_in_process(monkeypatch):
# Creds landed in os.environ (in-process), so register_relay_adapter() reads them.
gid, secret = relay.relay_connection_auth()
assert gid and secret == "a" * 64
key, _host, _port = relay.relay_inbound_config()
assert key == "b" * 64
# The delivery key is persisted in-process too (issued by the connector,
# kept for forward-compat; inbound rides the WS so it isn't consumed).
assert os.environ["GATEWAY_RELAY_DELIVERY_KEY"] == "b" * 64
def test_outbound_only_when_no_endpoint(monkeypatch):

View File

@@ -80,6 +80,65 @@ def test_nullable_type_array_collapsed_to_single_string():
assert prop.get("nullable") is True
def test_multitype_array_becomes_anyof_no_branch_dropped():
# Ported from anomalyco/opencode#31877: a genuine multi-type array such as
# ["number", "string"] (common in MCP tool schemas) must keep BOTH branches
# as an anyOf, not silently drop all but the first. Several backends
# (llama.cpp, Gemini via OpenAI-compatible transports) reject the array form.
tools = [_tool("t", {
"type": "object",
"properties": {
"status": {"type": ["number", "string"], "description": "status filter"},
},
})]
out = sanitize_tool_schemas(tools)
prop = out[0]["function"]["parameters"]["properties"]["status"]
assert "type" not in prop
assert prop["anyOf"] == [{"type": "number"}, {"type": "string"}]
assert prop.get("nullable") is None
# Sibling keywords survive alongside the generated anyOf.
assert prop["description"] == "status filter"
def test_multitype_array_with_null_lifts_nullable():
tools = [_tool("t", {
"type": "object",
"properties": {
"v": {"type": ["integer", "boolean", "null"]},
},
})]
out = sanitize_tool_schemas(tools)
prop = out[0]["function"]["parameters"]["properties"]["v"]
assert "type" not in prop
assert prop["anyOf"] == [{"type": "integer"}, {"type": "boolean"}]
assert prop.get("nullable") is True
def test_all_null_type_array_becomes_null_type():
tools = [_tool("t", {
"type": "object",
"properties": {
"n": {"type": ["null"]},
},
})]
out = sanitize_tool_schemas(tools)
prop = out[0]["function"]["parameters"]["properties"]["n"]
assert prop["type"] == "null"
def test_single_element_type_array_unwrapped():
tools = [_tool("t", {
"type": "object",
"properties": {
"s": {"type": ["string"]},
},
})]
out = sanitize_tool_schemas(tools)
prop = out[0]["function"]["parameters"]["properties"]["s"]
assert prop["type"] == "string"
assert prop.get("nullable") is None
def test_anyof_nested_objects_sanitized():
tools = [_tool("t", {
"type": "object",

View File

@@ -235,7 +235,9 @@ def _sanitize_node(node: Any, path: str) -> Any:
``{"type": <value>}`` so downstream consumers see a dict.
- Injects ``properties: {}`` into object-typed nodes missing it.
- Normalizes ``type: [X, "null"]`` arrays to single ``type: X`` (keeping
``nullable: true`` as a hint).
``nullable: true`` as a hint), and multi-type arrays like
``["number", "string"]`` to an ``anyOf`` of single-type schemas so no
branch is dropped (ported from anomalyco/opencode#31877).
- Recurses into ``properties``, ``items``, ``additionalProperties``,
``anyOf``, ``oneOf``, ``allOf``, and ``$defs`` / ``definitions``.
"""
@@ -268,23 +270,39 @@ def _sanitize_node(node: Any, path: str) -> Any:
out: dict = {}
for key, value in node.items():
# type: [X, "null"] → type: X (the backend's tool-call parser only
# accepts singular string types; nullable is lost but the call still
# succeeds, and the model can still pass null on its own.)
# JSON Schema ``type`` arrays (e.g. ``["number", "string"]``, common
# in MCP tool schemas) are rejected by several tool-call backends:
# * llama.cpp's grammar generator only accepts a singular string type.
# * Gemini (including OpenAI-compatible transports such as GitHub
# Copilot proxying to Gemini) rejects the array form outright —
# plain @ai-sdk/google rewrites it, but the OpenAI-compatible path
# forwards it verbatim and the backend 400s.
#
# Normalize per the SDK's behavior:
# * single non-null type → ``type: X`` (+ ``nullable: true`` if the
# array also contained "null"). No data lost.
# * multiple non-null types → ``anyOf`` of single-type schemas, so
# EVERY branch survives instead of silently dropping all but the
# first. ``null`` is lifted into ``nullable: true``.
# * all-null / empty → ``type: "null"`` (or object fallback).
# Ported from anomalyco/opencode#31877.
if key == "type" and isinstance(value, list):
non_null = [t for t in value if t != "null"]
if len(non_null) == 1 and isinstance(non_null[0], str):
has_null = "null" in value
non_null = [t for t in value if isinstance(t, str) and t != "null"]
if len(non_null) == 1:
out["type"] = non_null[0]
if "null" in value:
if has_null:
out.setdefault("nullable", True)
continue
# Fallback: pick the first string type, drop the rest.
first_str = next((t for t in value if isinstance(t, str) and t != "null"), None)
if first_str:
out["type"] = first_str
if len(non_null) >= 2:
# Preserve all branches as a union instead of dropping them.
out["anyOf"] = [{"type": t} for t in non_null]
if has_null:
out.setdefault("nullable", True)
continue
# All-null or empty list → treat as object.
out["type"] = "object"
# No usable non-null type: all-null array → type: "null";
# otherwise an empty/garbage array → object fallback.
out["type"] = "null" if has_null else "object"
continue
if key in {"properties", "$defs", "definitions"} and isinstance(value, dict):