Compare commits

...

19 Commits

Author SHA1 Message Date
Ben
19399a7e32 test(gateway): live ws-transport round-trip + config-driven registration
- test_ws_transport.py: drives WebSocketRelayTransport against a REAL in-process
  websockets server (not a mock socket): handshake (hello->descriptor), inbound
  frame -> handler, outbound request/response correlation, follow_up routing,
  and clean disconnect failing pending waiters. Skips if websockets is absent.
- test_relay_registration.py: rewritten for the config-driven gate — registers
  when GATEWAY_RELAY_URL is set / an explicit url is passed / force=True; no-op
  without a URL; trailing slash stripped; adapter constructs through the registry.

Full relay suite: 57 passed.
2026-06-10 20:28:47 +10:00
Ben
b075c1ec91 feat(gateway): register relay adapter from config; drop HERMES_GATEWAY_RELAY gate
Wire the relay adapter into gateway startup and make activation config-driven
instead of a dark-launch flag.

- gateway/relay/__init__.py: replace relay_enabled()/HERMES_GATEWAY_RELAY with
  relay_url() (GATEWAY_RELAY_URL env or gateway.relay_url in config.yaml) — the
  same shape as gateway.proxy_url. register_relay_adapter() registers when a URL
  is configured and builds a live WebSocketRelayTransport; with no URL it's a
  no-op (direct/single-tenant deployments unaffected). force=True keeps the
  transport-less adapter for unit tests. relay_platform_identity() reads the
  hello platform/botId from GATEWAY_RELAY_PLATFORM/GATEWAY_RELAY_BOT_ID.
- gateway/run.py: call register_relay_adapter() during GatewayRunner.start(),
  right after plugin discovery, so a configured connector relay is registered
  on every boot. Failures are logged, never block startup.

This removes the dark-launch posture: the relay is on whenever it's configured,
shipping the production end state rather than hiding it behind a flag.
2026-06-10 20:28:39 +10:00
Ben
f325dc71e5 feat(gateway): production WebSocketRelayTransport + descriptor negotiation
Adds the concrete transport behind the RelayTransport Protocol — the missing
'later-phase work' the relay scaffold deferred. The gateway dials OUT to the
connector over a WebSocket and speaks the newline-delimited JSON frame protocol
(docs/relay-connector-contract.md; connector src/relay/protocol.ts):

- connect(): opens the ws, sends hello{platform,botId}, starts a background
  read loop, and resolves handshake() when the connector's descriptor frame
  arrives.
- inbound frames -> the registered InboundHandler (rebuilt into a MessageEvent
  via _event_from_wire, mapping the snake_case SessionSource wire form back
  onto the gateway dataclasses).
- send_outbound / send_follow_up / get_chat_info: request/response correlated
  by a uuid requestId against a per-request future, with a timeout so a caller
  never hangs; send_interrupt is fire-and-forget.
- disconnect(): cancels the reader, closes the ws, and fails any in-flight
  outbound waiters with a structured error.

RelayAdapter.connect() now negotiates the real CapabilityDescriptor from the
transport and adopts it (_apply_descriptor updates MAX_MESSAGE_LENGTH +
markdown surface), replacing the construction-time placeholder. Lazy
'import websockets' mirrors gateway/platforms/feishu.py; WEBSOCKETS_AVAILABLE
gates construction.
2026-06-10 20:28:27 +10:00
Ben
ce120f0473 docs(gateway): rewrite contract §6 to the A2 trust-boundary model
The contract's §6 still said the connector 'forwards the signed body
byte-for-byte so the gateway's existing crypto validates against unmodified
bytes.' That model is incoherent under an untrusted, disposable tenant
gateway on a shared bot:

- re-validating Twilio HMAC / WeCom crypto needs the shared signing secret
  (handing it over IS the cross-tenant leak),
- WeCom payloads are encrypted with that secret (the connector must decrypt
  at the edge just to route),
- a Discord interaction token lives inside the signed body — you can't both
  preserve the bytes and strip the credential.

Rewrites §6 to the actual model: the connector is the SOLE crypto/identity
boundary — verifies/decrypts at the edge, normalizes to a tenant-scoped
MessageEvent, strips shared-identity capabilities into its vault, and
forwards only the sanitized event. The gateway re-validates nothing (the
invariant test from the crypto-shed commit enforces this). Notes that this
unifies the passthrough + relay planes and points to the connector repo's
capability-trust-boundary.md.

Also documents the follow_up op in §4 (token-less capability action added
in the previous commit). The conformance test (§2/§3 tables) stays green;
contract is unpublished/EXPERIMENTAL so no version-bump ceremony. 55 passed.
2026-06-10 19:49:08 +10:00
Ben
6dd4caf378 feat(gateway): token-less follow_up outbound op (A2 capability action)
The relay outbound surface had send/edit/typing but no way to act on a
SHARED-identity capability (e.g. a Discord interaction follow-up token,
~15min) that the connector captured + stripped at the edge. Under A2 that
credential never reaches the gateway, so the gateway can't just 'send with
the token' — it needs a semantic op naming the session it's already in.

Adds the follow_up op end to end on the gateway side:
- RelayTransport.send_follow_up(action): protocol method. Action carries
  op='follow_up' + session_key + kind + content (+ metadata) and NO token.
- RelayAdapter.send_follow_up(session_key, kind, content, metadata): builds
  that action and returns a SendResult. The connector resolves the real
  capability (its resolveOutboundCapability), enforces the tenant match so
  tenant B can't wield tenant A's capability, and egresses; success=False
  when the capability is absent/expired/mismatched (nothing to retry — a
  leaked gateway holds zero capability material).
- StubConnector records follow_ups + a canned next_follow_up_result.

Tests: round-trips without a token; the wire action carries only session
refs (no credential value field — the 'kind' string is a type ref, not the
secret); failure surfaces when the connector can't resolve; no-transport
fails cleanly. 55 passed. §4 doc entry follows in the contract-rewrite commit.
2026-06-10 19:48:00 +10:00
Ben
d603371644 test(gateway): shed platform crypto from the relay path (A2 invariant)
Under the A2 trust model the connector is the SOLE crypto/identity
boundary: it verifies/decrypts every inbound platform payload at the edge
(it holds the tenant secrets), normalizes to a tenant-scoped MessageEvent,
and forwards only the sanitized event. The gateway re-validates nothing —
it cannot without being handed the shared signing secret, which on a
shared bot is itself the cross-tenant leak.

The relay path already imports no platform-crypto today; this locks that
in as an enforced invariant so nobody bolts re-validation (Discord
ed25519, Twilio HMAC, WeCom BizMsgCrypt, generic webhook signature checks)
onto the relay later and silently re-couples the gateway to platform
secrets it must never hold. Verification stays in the direct platform
adapters (gateway/platforms/*) which serve non-relay deployments.

- test_relay_package_imports_no_platform_crypto: AST-walks gateway/relay/*
  and fails on any import of a platform-crypto/verification module.
- test_relay_package_calls_no_signature_verification: fails on any
  verification-symbol reference (ed25519/hmac/bizmsg/verify_*).

Invariants (assert the relation 'relay re-validates nothing'), not frozen
snapshots. Verified the guard bites: injecting a wecom_crypto import makes
it fail, removing it goes green. docs §6 rewrite follows in a later commit.
2026-06-10 19:45:08 +10:00
Ben
4f59b4c657 test(gateway): Telegram relay round-trip (Phase 1 generalization proof)
The Phase 1 exit gate requires BOTH Discord and Telegram to round-trip
through the relay stub, but test_relay_roundtrip.py only covered Discord.
Add the Telegram companion exercising its distinct discriminator profile:

- no guild_id — two chats isolate on chat_id alone
- forum topics share one chat_id and isolate by thread_id (the Telegram
  analog of Discord per-guild isolation), shared across participants by
  default (thread_sessions_per_user=False)
- DM isolation by chat_id
- utf16 len_unit + markdown_v2 dialect round-trip and configure the adapter
- outbound send round-trips through the stub

Proves the CapabilityDescriptor + build_session_key generalize beyond
Discord, not just the struct (which the descriptor unit tests already
covered).
2026-06-09 15:59:51 +10:00
Ben
0d585df15d test(gateway): enforce relay contract-doc ⟷ Python conformance
Add an invariant test pinning docs/relay-connector-contract.md to the
Python source of truth so the doc (which the connector repo mirrors by
hand) cannot silently drift:

- CapabilityDescriptor §2 table ⟷ dataclass fields + required/optional
- SessionSource wire keys (to_dict output) ⟷ §3 documented fields
- per-platform discriminator columns exist as real SessionSource fields
- guard that is_bot stays off the wire until deliberately promoted

Writing the test surfaced a real gap: §3 only enumerated 5 discriminators
in its per-platform table while to_dict() emits 12 keys. Seven wire keys
the connector must populate (chat_name, chat_topic, user_id_alt,
chat_id_alt, parent_chat_id, message_id, user_name) were undocumented —
a connector author reading the doc would never know to set them. Added a
complete SessionSource wire-field table to §3. The connector's existing
contract.ts already carries all 12, so no connector change is needed; the
doc was the lagging artifact.
2026-06-09 15:57:24 +10:00
Ben
ada43042f8 fix(gateway): register relay connection checker
The platform-connected-checker invariant test requires every built-in
Platform enum member to have either a generic token path or a bespoke
entry in _PLATFORM_CONNECTED_CHECKERS. Platform.RELAY was added without
one, so test_all_builtins_have_checker_or_generic_token_path failed.

Relay dials OUT to a connector and is 'connected' once an endpoint URL
is configured (extra['relay_url'] or extra['url']); the capability
descriptor is negotiated at handshake time, so the URL is the only
config-level signal in the experimental phase. Add the checker plus a
synthetic-config case exercising its True path.
2026-06-09 14:08:23 +10:00
Ben
706b359cf8 Merge remote-tracking branch 'origin/main' into feat/gateway-relay-adapter 2026-06-09 13:59:08 +10:00
Ben
5869d594ab test(relay): assert connector stub never leaks into production paths
CI guard: fails if gateway/ or plugins/ ever imports the test-only stub
connector or defines StubConnector. Matches code leaks (imports / class defs),
not prose mentions, so the transport.py docstring reference to the stub's path
is allowed.

Phase 1 complete. Task 1.6 of the gateway-relay plan.
2026-06-08 15:20:51 +10:00
Ben
1b3491e8b5 docs: relay<->connector cross-repo contract (v1, experimental)
Formal interface between the Hermes gateway (RelayAdapter) and the Node
connector repo: handshake, CapabilityDescriptor field table, MessageEvent
inbound envelope with per-platform SessionSource discriminators (Discord
guild_id is REQUIRED for server isolation), outbound action set, /stop
interrupt routing, signed-body verify-at-edge/byte-preserving rule, and the
additive-only contract_version policy. Documents bot-identity-vs-tenant
separation so single-bot consolidation (Phase 6) stays open. Read-first
artifact for the connector implementer.

Phase 1, Task 1.5 of the gateway-relay plan.
2026-06-08 15:19:47 +10:00
Ben
96e138ed24 feat(relay): route mid-turn /stop over relay interrupt channel
RelayAdapter.on_interrupt(session_key, chat_id) bridges a connector-delivered
mid-turn /stop into the existing interrupt_session_activity path, setting the
per-session _active_sessions Event and clearing typing — cancelling exactly the
targeted session's turn without touching siblings (mirrors test_stop_thread_
sibling isolation). Transport.send_interrupt carries the gateway-side egress to
the connector for socket-owner routing.

Phase 1, Task 1.4 of the gateway-relay plan.
2026-06-08 15:18:53 +10:00
Ben
75a12474ec feat(relay): register RelayAdapter through platform registry (flagged off by default)
register_relay_adapter() registers the generic 'relay' platform via the same
PlatformRegistry path as plugin adapters — no core dispatch changes. OFF by
default (dark-launch): only registers when HERMES_GATEWAY_RELAY is truthy (or
force=True for tests), so existing single-tenant/direct deployments are
unaffected. Factory builds a transport-less RelayAdapter with a placeholder
descriptor; the real descriptor is negotiated at handshake.

Phase 1, Task 1.3 of the gateway-relay plan.
2026-06-08 15:17:43 +10:00
Ben
6729118a4a feat(relay): transport protocol + test-only stub connector
Defines RelayTransport (lifecycle/handshake/inbound/outbound/interrupt) as the
gateway<->connector wire contract; RelayAdapter.connect now registers an inbound
handler that bridges connector-delivered MessageEvents into handle_message.
Adds an in-memory StubConnector under tests/ and an E2E round-trip proving:
connect registers the handler, inbound events reach the adapter, guild_id drives
build_session_key isolation (two guilds -> two keys; same guild/channel/user ->
one), outbound send round-trips, get_chat_info is proxied.

Phase 1, Task 1.2 of the gateway-relay plan.
2026-06-08 15:16:41 +10:00
Ben
eaf1721b9f feat(relay): generic RelayAdapter advertising negotiated capabilities
One BasePlatformAdapter subclass that reads its capability profile from a
CapabilityDescriptor: MAX_MESSAGE_LENGTH attribute, message_len_fn (table-driven
by len_unit: chars=len, utf16=Telegram-style code units), supports_draft_streaming.
Implements the four abstract methods (connect/disconnect/send/get_chat_info) by
delegating to an injected RelayTransport (full protocol lands in Task 1.2). Adds
Platform.RELAY enum member. No per-platform gateway code.

Phase 1, Task 1.1 of the gateway-relay plan.
2026-06-08 15:14:08 +10:00
Ben
593fba5f5d feat(relay): derive descriptor from PlatformEntry
CapabilityDescriptor.from_platform_entry() projects an existing PlatformEntry
(label, max_message_length, emoji, platform_hint, pii_safe, name) into a
descriptor, proving the descriptor is a projection of existing config rather
than a parallel concept. Runtime-only capabilities (len_unit, draft/edit/
thread/markdown) are caller-supplied. max_message_length==0 ('no limit') maps
to the stream_consumer 4096 default.

Phase 0 complete. Task 0.3 of the gateway-relay plan.
2026-06-08 15:08:41 +10:00
Ben
2b09c95c33 feat(relay): experimental CapabilityDescriptor schema
Frozen, JSON-serializable handshake payload the connector hands the future
RelayAdapter: char limit, draft-streaming/edit/threading flags, markdown
dialect, len_unit. Mostly a wire projection of PlatformEntry + the adapter
capability methods. contract_version gates additive-only evolution; declared
EXPERIMENTAL until >=2 Class-1 platforms validate it. from_json ignores
unknown keys (forward-compat) and fills optional defaults.

Phase 0, Task 0.2 of the gateway-relay plan.
2026-06-08 15:07:45 +10:00
Ben
812a2977bd test: lock gateway adapter capability surface (relay phase 0)
Behavioral regression harness locking the capability surface that the future
RelayAdapter must reproduce: the abstract-method set (connect/disconnect/send/
get_chat_info), message_len_fn default, supports_draft_streaming default, and
the stream_consumer MAX_MESSAGE_LENGTH attribute read. Passes on main before
any RelayAdapter exists.

Phase 0, Task 0.1 of the gateway-relay plan.
2026-06-08 15:06:25 +10:00
24 changed files with 2434 additions and 0 deletions

View File

@@ -0,0 +1,213 @@
# Relay ↔ Connector Contract (v1, EXPERIMENTAL)
> **Status:** EXPERIMENTAL. This contract MAY CHANGE without a deprecation
> cycle until at least two real Class-1 platforms (Discord + Telegram) have
> validated it. Evolution during the experimental phase is **additive-only**,
> gated by `contract_version`. A breaking change updates both repos in lockstep.
This document is the formal interface between the **Hermes gateway** (Python,
`gateway/relay/`) and the **connector** (Node/TypeScript,
`NousResearch/gateway-gateway`). The connector implementer's first action is to
read this file.
The gateway runs a generic `RelayAdapter` that dials **out** to the connector,
receives a `CapabilityDescriptor` at handshake, then exchanges normalized
`MessageEvent`s (inbound) and actions (outbound) over a per-turn bidirectional
WebSocket. The gateway never learns which concrete platform is fronting it; the
connector owns all platform-specific socket/identity logic.
---
## 1. Handshake
1. Gateway opens the transport (`connect`).
2. Gateway calls `handshake()`; connector returns a `CapabilityDescriptor`
(section 2) describing the platform this adapter instance fronts.
3. Gateway configures the adapter from the descriptor (char limit, length unit,
draft/edit/thread/markdown capabilities) and registers an inbound handler.
4. Connector then streams inbound events and accepts outbound actions.
`contract_version` (currently `1`) is carried in the descriptor. The gateway
ignores unknown descriptor fields (forward-compat) and fills missing optional
fields from defaults.
---
## 2. CapabilityDescriptor (handshake payload)
JSON object. Source of truth: `gateway/relay/descriptor.py`.
| Field | Type | Required | Meaning |
| --- | --- | --- | --- |
| `contract_version` | int | yes | Contract version (additive-only within a version). |
| `platform` | string | yes | Platform name (e.g. `"discord"`, `"telegram"`). |
| `label` | string | yes | Human-readable label. |
| `max_message_length` | int | yes | Char limit; gateway exposes as `MAX_MESSAGE_LENGTH`. 0 → treat as 4096. |
| `supports_draft_streaming` | bool | yes | Native draft-streaming preview support. |
| `supports_edit` | bool | yes | Edit-based streaming possible; if false, consumer degrades to one-message-per-segment. |
| `supports_threads` | bool | yes | `create_handoff_thread` capability. |
| `markdown_dialect` | string | yes | `"plain"`, `"markdown_v2"`, `"discord"`, … (drives `supports_code_blocks`). |
| `len_unit` | string | yes | `"chars"` (builtin len) or `"utf16"` (Telegram UTF-16 code units). |
| `emoji` | string | no | Display emoji (default 🔌). |
| `platform_hint` | string | no | System-prompt platform hint. |
| `pii_safe` | bool | no | Redact PII in session descriptions. |
Most fields are a projection of the gateway's existing `PlatformEntry`; the
runtime-only fields (`len_unit`, `supports_*`, `markdown_dialect`) come from the
live platform adapter's capability methods.
---
## 3. Inbound: `MessageEvent` envelope
The connector normalizes each platform wire event into a `MessageEvent`
(`gateway/platforms/base.py`) and delivers it to the gateway's inbound handler.
The gateway keys the session via `build_session_key()` from the embedded
`SessionSource` — so populating the right discriminators is the single
highest-correctness responsibility of the connector.
### SessionSource fields (the wire surface)
Source of truth: `SessionSource.to_dict()` in `gateway/session.py`. These are
every key the gateway accepts on the wire. `platform`, `chat_id`, `chat_type`,
`user_id`, `user_name`, `thread_id`, `chat_name`, and `chat_topic` are always
present (may be `null`); the rest are included only when set.
| Field | Type | Always sent | Meaning |
| --- | --- | --- | --- |
| `platform` | string | yes | Platform name (matches the descriptor's `platform`). |
| `chat_id` | string | yes | Primary conversation id (channel/chat). Session-key discriminator. |
| `chat_type` | string | yes | `dm` / `group` / `channel` / `thread` / `forum`. |
| `chat_name` | string\|null | yes | Human-readable chat name. |
| `user_id` | string\|null | yes | Message author id. Session-key discriminator. |
| `user_name` | string\|null | yes | Author display name. |
| `thread_id` | string\|null | yes | Thread/forum-topic id when in a thread. Session-key discriminator. |
| `chat_topic` | string\|null | yes | Channel topic/description (Discord, Slack). |
| `user_id_alt` | string | no | Platform-specific stable alt id (Signal UUID, Feishu union_id). |
| `chat_id_alt` | string | no | Alternate chat id (e.g. Signal group internal id). |
| `guild_id` | string | no | Discord guild / Slack workspace / Matrix server scope. **REQUIRED for Discord server isolation.** Session-key discriminator. |
| `parent_chat_id` | string | no | Parent channel when `chat_id` refers to a thread. |
| `message_id` | string | no | Id of the triggering message (for pin/reply/react). |
> `is_bot` (author-is-a-bot/webhook classification) exists on the gateway-side
> dataclass but is **intentionally NOT on the wire** in v1 — it is not part of
> `to_dict()`. Do not add it to the connector's `SessionSource` until it is
> first added here and to `to_dict()` (additive bump).
### SessionSource discriminators per platform
| Platform | chat_id | chat_type | user_id | thread_id | guild_id |
| --- | --- | --- | --- | --- | --- |
| **Discord** | channel id | `dm`/`group`/`thread` | author id | thread channel id (threads) | **guild id** (REQUIRED for server isolation) |
| **Telegram** | chat id | `dm`/`group`/`forum` | from id | forum topic id (forums) | — |
**Get Discord's `guild_id` wrong and two servers collide into one session.**
This is the #1 High-severity risk. The gateway's `build_session_key()` is the
conformance oracle: for a given `SessionSource`, the connector's normalization
must produce the same key the Python adapter would. (The Phase-1 stub tests
assert known-input → known-key.)
### Bot identity vs tenant (single-bot consolidation, Appendix A)
The envelope carries the **originating bot identity** as a field **distinct from
tenant**. Tenant is resolved from the event's own discriminator (Discord
`guild_id`, Telegram `chat_id`, webhook path/subdomain) — **never** from which
token/socket/process delivered it. This keeps one shared bot able to front many
tenants (Phase 6) without overloading an existing field.
---
## 4. Outbound: action set
The gateway calls the transport with action dicts. Source of truth:
`gateway/relay/transport.py` + `gateway/relay/adapter.py`.
| `op` | Fields | Result |
| --- | --- | --- |
| `send` | `chat_id`, `content`, `reply_to?`, `metadata?` | `{success: bool, message_id?, error?}` |
| `edit` | `chat_id`, `message_id`, `content`, `metadata?` | `{success: bool, error?}` |
| `typing` | `chat_id` | `{success: bool}` |
| `follow_up` | `session_key`, `kind`, `content`, `metadata?` | `{success: bool, message_id?, error?}` |
`get_chat_info(chat_id)` is a separate proxied call returning at least
`{name, type}`. Media actions follow the same envelope shape (deferred to a
later contract revision; additive).
**`follow_up` (A2 capability action).** Some inbound payloads carry a credential
that acts on the **shared** bot identity (e.g. a Discord interaction follow-up
token). Per §6 the connector strips that at the edge and binds it in its
capability vault keyed by the session; it **never reaches the gateway**. To use
it, the gateway issues `follow_up` naming the **session it is already in**
(`session_key`) plus the capability `kind` (e.g. `discord.interaction_token`) —
**never a token**. The connector resolves the real value from its vault,
enforces the tenant match (tenant B can never wield tenant A's capability), and
egresses. `success: false` when the capability is absent/expired or the tenant
doesn't match — the gateway has nothing to retry with, by design (a leaked
gateway holds zero capability material). Source of truth:
`gateway/relay/transport.py` (`send_follow_up`) + `gateway/relay/adapter.py`.
---
## 5. Interrupt (`/stop`) routing
- **Gateway → connector:** `send_interrupt(session_key, reason?)` egresses a
mid-turn `/stop`. The connector MUST forward it down the socket owned by the
gateway instance running that `session_key` (the routing invariant).
- **Connector → gateway:** an inbound interrupt for a `session_key` is bridged
by the adapter's `on_interrupt(session_key, chat_id)` into the existing
per-session interrupt mechanism, cancelling exactly that turn (siblings
untouched).
The interrupt rides the same per-turn bidirectional socket as inbound/outbound.
---
## 6. Trust boundary & signed-body handling (A2)
**The connector is the sole crypto/identity boundary. The gateway re-validates
nothing.**
Webhook signatures (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt) are
computed over exact raw bytes, and some payloads are *encrypted* with a shared
secret. The connector fronts a **shared** bot for many tenants and holds every
tenant's platform secrets, so it:
- **verifies / decrypts at the edge** (the only place the secrets live),
- **normalizes** the payload into a tenant-scoped `MessageEvent` (§3),
- **strips any shared-identity capability** out of the payload and binds it in
its capability vault, keyed by the session (see §4 `follow_up`),
- **forwards only the sanitized `MessageEvent`** — never the raw signed body.
The gateway therefore performs **no** platform signature/crypto verification on
the relay path; it trusts the normalized event. This is an enforced invariant on
the gateway side (`tests/gateway/relay/test_relay_sheds_crypto.py`: the relay
package imports/calls no platform-crypto).
**Why not "forward the signed body byte-for-byte so the gateway re-validates"?**
That earlier model is incoherent under an untrusted, disposable tenant gateway:
- Re-validating Twilio HMAC / WeCom crypto would require handing the gateway the
**shared signing secret** — which is itself the leak, and on a shared bot it's
a *cross-tenant* leak.
- WeCom payloads are encrypted with the shared secret; the connector must decrypt
at the edge just to route, so forwarding ciphertext would again require giving
the gateway the secret.
- A Discord interaction token lives **inside** the signed JSON body — you cannot
both preserve the bytes and strip the credential; they are the same bytes.
So byte-preservation is abandoned deliberately: the connector re-serializes the
sanitized event and the gateway trusts it. This also unifies the passthrough and
relay planes — both are "verify at the edge → emit a normalized event," differing
only in transport. See `docs/capability-trust-boundary.md` (connector repo:
`gateway-gateway`) for the full A2 rationale and the connector-side vault.
---
## 7. Versioning policy
- `contract_version` is an int; bump **only** for additive changes during the
experimental phase (new optional fields, new `op`s).
- A breaking change (renamed/removed field, changed semantics) requires a
coordinated update of both repos and a version bump.
- The connector's first PR references the commit SHA of this file it implements
against.

View File

@@ -163,6 +163,7 @@ class Platform(Enum):
BLUEBUBBLES = "bluebubbles"
QQBOT = "qqbot"
YUANBAO = "yuanbao"
RELAY = "relay" # generic relay adapter fronted by the connector (EXPERIMENTAL)
@classmethod
def _missing_(cls, value):
"""Accept unknown platform names only for known plugin adapters.
@@ -488,6 +489,13 @@ _PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] =
(cfg.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID"))
and (cfg.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET"))
),
# Relay dials OUT to a connector; it is "connected" once an endpoint URL is
# configured (extra["relay_url"] or extra["url"]). The capability descriptor
# is negotiated at handshake time, so the URL is the only config-level
# signal in the experimental phase. EXPERIMENTAL — may change.
Platform.RELAY: lambda cfg: bool(
cfg.extra.get("relay_url") or cfg.extra.get("url")
),
}

112
gateway/relay/__init__.py Normal file
View File

@@ -0,0 +1,112 @@
"""Relay/connector support package for the Hermes gateway.
EXPERIMENTAL. This package implements the gateway side of the "Gateway Gateway"
relay design: a generic ``RelayAdapter`` plus the wire-serializable
``CapabilityDescriptor`` the connector hands it at handshake time, and the
production ``WebSocketRelayTransport`` that dials the connector. The public API
(module names, descriptor field set, transport protocol) MAY CHANGE without a
deprecation cycle until at least two real Class-1 platforms (Discord + Telegram)
have shaken out the schema.
See ``docs/relay-connector-contract.md`` for the formal cross-repo interface.
Activation is driven by configuration, not a separate feature flag: the relay
platform is registered when a connector relay URL is configured
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml). Deployments
that don't set it are unaffected — exactly the same shape as ``gateway.proxy_url``.
"""
from __future__ import annotations
import os
from typing import Optional
def relay_url() -> Optional[str]:
"""The connector relay endpoint URL, or None when relay is not configured.
Checks ``GATEWAY_RELAY_URL`` (convenient for Docker) first, then
``gateway.relay_url`` in config.yaml. A non-empty value activates the relay
platform; absence means a normal direct/single-tenant gateway.
"""
url = os.environ.get("GATEWAY_RELAY_URL", "").strip()
if url:
return url.rstrip("/")
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = _load_gateway_config()
url = (cfg.get("gateway") or {}).get("relay_url", "").strip()
if url:
return url.rstrip("/")
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
pass
return None
def relay_platform_identity() -> tuple[str, str]:
"""Platform + bot id this gateway fronts over the relay (for the handshake hello).
Defaults to ``("relay", "")``; overridable via ``GATEWAY_RELAY_PLATFORM`` /
``GATEWAY_RELAY_BOT_ID`` so one connector can front several platforms.
"""
platform = os.environ.get("GATEWAY_RELAY_PLATFORM", "relay").strip() or "relay"
bot_id = os.environ.get("GATEWAY_RELAY_BOT_ID", "").strip()
return platform, bot_id
def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool:
"""Register the generic ``relay`` platform via the platform registry.
Registers when a relay URL is configured (or ``force=True`` for tests, which
builds a transport-less adapter — the unit-test posture). Returns True if
registration happened. Additive: uses the same registry path as plugin
adapters, so no core dispatch changes are needed.
When a URL is present the factory builds a live ``WebSocketRelayTransport``;
the ``RelayAdapter`` negotiates the real ``CapabilityDescriptor`` at
``connect()`` time via ``transport.handshake()``.
"""
resolved_url = url if url is not None else relay_url()
if not (force or resolved_url):
return False
from gateway.platform_registry import PlatformEntry, platform_registry
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
platform, bot_id = relay_platform_identity()
def _factory(config):
# Placeholder descriptor; replaced by the negotiated one at connect time
# when a transport is present. With no URL (force/test) the adapter is
# transport-less and keeps the placeholder.
placeholder = CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform=platform,
label="Relay",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=False,
markdown_dialect="plain",
len_unit="chars",
)
transport = None
if resolved_url:
from gateway.relay.ws_transport import WebSocketRelayTransport
transport = WebSocketRelayTransport(resolved_url, platform, bot_id)
return RelayAdapter(config, placeholder, transport=transport)
platform_registry.register(
PlatformEntry(
name="relay",
label="Relay",
adapter_factory=_factory,
check_fn=lambda: True,
source="builtin",
emoji="\U0001f50c",
)
)
return True

178
gateway/relay/adapter.py Normal file
View File

@@ -0,0 +1,178 @@
"""RelayAdapter — one generic gateway adapter fronted by the connector. EXPERIMENTAL.
A single ``BasePlatformAdapter`` subclass that, at handshake, receives a
``CapabilityDescriptor`` from the connector telling it which platform it is
fronting and which capabilities to advertise to the ``GatewayStreamConsumer``.
It implements the four abstract methods (``connect`` / ``disconnect`` / ``send``
/ ``get_chat_info``) plus the capability surface (``MAX_MESSAGE_LENGTH``,
``message_len_fn``, ``supports_draft_streaming``) by delegating wire I/O to an
injected transport and reading capabilities off the descriptor.
There is NO per-platform gateway code: the connector is the only side that knows
"this chat_id maps to a Discord channel, send it via the Discord websocket."
The gateway sees an ordinary ``MessageEvent`` in and calls ``adapter.send`` out.
EXPERIMENTAL: the transport protocol and descriptor schema may change without a
deprecation cycle until >=2 Class-1 platforms validate them.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, Optional
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import RelayTransport
logger = logging.getLogger(__name__)
def _utf16_len(text: str) -> int:
"""Count UTF-16 code units (Telegram's length unit)."""
return len(text.encode("utf-16-le")) // 2
# Table-driven length-unit selection from the descriptor's ``len_unit``.
_LEN_FNS: Dict[str, Callable[[str], int]] = {
"chars": len,
"utf16": _utf16_len,
}
class RelayAdapter(BasePlatformAdapter):
"""Generic relay adapter advertising a connector-negotiated capability profile."""
def __init__(
self,
config: PlatformConfig,
descriptor: CapabilityDescriptor,
transport: Optional[RelayTransport] = None,
) -> None:
# The relay adapter fronts many platforms but presents as a single
# logical platform to the runner; Platform.RELAY identifies it.
super().__init__(config, Platform.RELAY)
self.descriptor = descriptor
self._transport = transport
# Capability surface read by stream_consumer (getattr(..., 4096)).
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
# ── capability surface (from descriptor) ─────────────────────────────
@property
def message_len_fn(self) -> Callable[[str], int]:
return _LEN_FNS.get(self.descriptor.len_unit, len)
def supports_draft_streaming(
self,
chat_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
return self.descriptor.supports_draft_streaming
# ── abstract methods (delegated to the transport) ────────────────────
async def connect(self) -> bool:
if self._transport is None:
raise RuntimeError("RelayAdapter has no transport configured")
self._transport.set_inbound_handler(self._on_inbound)
ok = await self._transport.connect()
if not ok:
return False
# Negotiate the real capability descriptor from the connector and adopt
# it — the placeholder passed at construction is replaced by what the
# connector advertises for the platform this gateway actually fronts.
try:
descriptor = await self._transport.handshake()
except Exception as exc: # noqa: BLE001 - a failed handshake = a failed connect
logger.warning("relay handshake failed: %s", exc)
return False
self._apply_descriptor(descriptor)
return True
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
"""Adopt a (re)negotiated descriptor into the live capability surface."""
self.descriptor = descriptor
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
async def _on_inbound(self, event) -> None:
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
await self.handle_message(event)
async def on_interrupt(self, session_key: str, chat_id: str) -> None:
"""Bridge a connector-delivered /stop into the adapter's interrupt path.
The connector forwards a mid-turn interrupt down the socket owned by
the gateway instance running ``session_key``; this routes it to the
existing per-session interrupt mechanism (sets the
``_active_sessions[session_key]`` Event and clears typing), cancelling
the right turn without touching sibling sessions.
"""
await self.interrupt_session_activity(session_key, chat_id)
async def disconnect(self) -> None:
if self._transport is not None:
await self._transport.disconnect()
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
if self._transport is None:
return SendResult(success=False, error="no transport")
result = await self._transport.send_outbound(
{
"op": "send",
"chat_id": chat_id,
"content": content,
"reply_to": reply_to,
"metadata": metadata or {},
}
)
return SendResult(
success=bool(result.get("success")),
message_id=result.get("message_id"),
error=result.get("error"),
)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
# Proxied to the connector (it owns the platform connection / cache).
if self._transport is None:
return {"name": chat_id, "type": "dm"}
return await self._transport.get_chat_info(chat_id)
async def send_follow_up(
self,
session_key: str,
kind: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send via a shared-identity capability bound to a session (A2 outbound).
The gateway never holds the credential: it names the session it is
already in plus the capability ``kind``, and the connector resolves the
real value from its vault and egresses (enforcing the tenant match). Used
e.g. to post a Discord interaction follow-up as the shared bot without
the token ever reaching the gateway. See RelayTransport.send_follow_up.
"""
if self._transport is None:
return SendResult(success=False, error="no transport")
result = await self._transport.send_follow_up(
{
"op": "follow_up",
"session_key": session_key,
"kind": kind,
"content": content,
"metadata": metadata or {},
}
)
return SendResult(
success=bool(result.get("success")),
message_id=result.get("message_id"),
error=result.get("error"),
)

118
gateway/relay/descriptor.py Normal file
View File

@@ -0,0 +1,118 @@
"""CapabilityDescriptor — the relay handshake payload. EXPERIMENTAL.
The connector hands a ``CapabilityDescriptor`` to the gateway's ``RelayAdapter``
at handshake time; it tells the adapter which platform it is fronting and which
capabilities to advertise to the ``GatewayStreamConsumer`` (char limit,
draft-streaming, edit/threading support, markdown dialect, length unit). It is
the linchpin of the generalization: one gateway adapter serves Discord,
Telegram, Matrix, Signal, ... without per-platform branching.
EXPERIMENTAL: this schema MAY CHANGE without a deprecation cycle until at least
two real Class-1 platforms have validated it. Evolution during the experimental
phase is additive-only, gated by ``contract_version`` (see
docs/relay-connector-contract.md).
Field origins (most are a wire-serializable projection of ``PlatformEntry`` plus
the per-instance capability methods on ``BasePlatformAdapter``):
- ``max_message_length`` -> ``PlatformEntry.max_message_length`` / adapter
``MAX_MESSAGE_LENGTH`` attribute (read by stream_consumer).
- ``len_unit`` -> selects which ``message_len_fn`` the adapter installs
("chars" = builtin len; "utf16" = Telegram-style UTF-16 code-unit counting).
- ``supports_draft_streaming`` -> adapter ``supports_draft_streaming()`` probe.
- ``supports_edit`` -> whether edit-based streaming is possible (Discord/
Telegram yes; Signal/SMS no -> consumer degrades to one-message-per-segment).
- ``supports_threads`` -> ``create_handoff_thread`` capability flag.
- ``markdown_dialect`` -> presentation hint (e.g. "markdown_v2", "discord").
- ``emoji`` / ``platform_hint`` / ``pii_safe`` -> ``PlatformEntry`` fields of the
same name.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
# Bump additively (never reinterpret an existing field) during the experimental
# phase; a breaking change requires updating both repos in lockstep.
CONTRACT_VERSION = 1
@dataclass(frozen=True)
class CapabilityDescriptor:
"""Immutable capability descriptor negotiated at relay handshake.
Frozen so a descriptor cannot be mutated after handshake — the adapter
advertises a fixed capability profile for the life of the connection.
"""
contract_version: int
platform: str
label: str
max_message_length: int
supports_draft_streaming: bool
supports_edit: bool
supports_threads: bool
markdown_dialect: str
len_unit: str # "chars" | "utf16"
emoji: str = "\U0001f50c" # 🔌 default (matches PlatformEntry default)
platform_hint: str = ""
pii_safe: bool = False
def to_json(self) -> str:
"""Serialize to a compact, stable JSON string for the handshake frame."""
return json.dumps(asdict(self), sort_keys=True, ensure_ascii=False)
@classmethod
def from_json(cls, data: str) -> "CapabilityDescriptor":
"""Deserialize from a handshake JSON string.
Unknown keys are ignored (forward-compat: a newer connector may send
fields this gateway does not know yet); missing optional keys fall back
to dataclass defaults.
"""
raw = json.loads(data)
known = {f for f in cls.__dataclass_fields__} # type: ignore[attr-defined]
filtered = {k: v for k, v in raw.items() if k in known}
return cls(**filtered)
@classmethod
def from_platform_entry(
cls,
entry,
*,
len_unit: str = "chars",
supports_draft_streaming: bool = False,
supports_edit: bool = True,
supports_threads: bool = False,
markdown_dialect: str = "plain",
) -> "CapabilityDescriptor":
"""Project a ``gateway.platform_registry.PlatformEntry`` into a descriptor.
Demonstrates the descriptor is a *subset/projection* of what
``PlatformEntry`` already encodes, not a parallel concept: ``label``,
``max_message_length``, ``emoji``, ``platform_hint``, ``pii_safe`` and
the platform name come straight off the entry. The runtime capability
bits that ``PlatformEntry`` does NOT encode (length unit, draft/edit/
thread/markdown behavior) are supplied by the caller — in production
the connector fills these from the live adapter's capability methods.
``max_message_length`` of 0 on a ``PlatformEntry`` means "no limit";
we map that to the stream_consumer default of 4096 so the descriptor
always carries a concrete chunking bound.
"""
max_len = getattr(entry, "max_message_length", 0) or 4096
return cls(
contract_version=CONTRACT_VERSION,
platform=entry.name,
label=entry.label,
max_message_length=max_len,
supports_draft_streaming=supports_draft_streaming,
supports_edit=supports_edit,
supports_threads=supports_threads,
markdown_dialect=markdown_dialect,
len_unit=len_unit,
emoji=getattr(entry, "emoji", "\U0001f50c"),
platform_hint=getattr(entry, "platform_hint", ""),
pii_safe=getattr(entry, "pii_safe", False),
)

101
gateway/relay/transport.py Normal file
View File

@@ -0,0 +1,101 @@
"""Relay transport protocol — the gateway<->connector wire contract. EXPERIMENTAL.
The ``RelayAdapter`` (gateway side) delegates all wire I/O to a ``RelayTransport``.
The gateway dials OUT to the connector, so a production transport is a WebSocket
client; in tests it is an in-memory stub (``tests/gateway/relay/stub_connector.py``).
This module defines the protocol surface only — no concrete transport. The
contract has four concerns:
1. Lifecycle: ``connect`` / ``disconnect``.
2. Handshake: ``handshake`` returns the ``CapabilityDescriptor`` the connector
advertises for the platform this adapter fronts.
3. Inbound: ``set_inbound_handler`` registers a callback the transport invokes
with each normalized ``MessageEvent`` the connector delivers.
4. Outbound: ``send_outbound`` carries send/edit/typing actions back to the
connector; ``get_chat_info`` proxies a chat-info lookup; ``send_interrupt``
routes a mid-turn /stop down the socket that owns the session_key.
EXPERIMENTAL: may change without a deprecation cycle until >=2 Class-1 platforms
validate it. See docs/relay-connector-contract.md.
"""
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, runtime_checkable
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
# Callback the transport invokes for each inbound normalized event.
InboundHandler = Callable[[MessageEvent], Awaitable[None]]
@runtime_checkable
class RelayTransport(Protocol):
"""Full gateway<->connector transport contract."""
async def connect(self) -> bool:
"""Open the connection to the connector; return True on success."""
...
async def disconnect(self) -> None:
"""Close the connection."""
...
async def handshake(self) -> CapabilityDescriptor:
"""Return the capability descriptor the connector advertises."""
...
def set_inbound_handler(self, handler: InboundHandler) -> None:
"""Register the callback invoked with each inbound MessageEvent."""
...
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Carry an outbound action (send/edit/typing) to the connector.
Returns a result dict; for ``op == "send"`` it carries
``success`` and optionally ``message_id`` / ``error``.
"""
...
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Proxy a chat-info lookup to the connector."""
...
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
"""Route a mid-turn /stop to the connector for ``session_key``.
The connector forwards it down the socket owned by the gateway
instance running that session (the /stop routing invariant). On the
gateway side this is the OUTBOUND direction; the actual task
cancellation happens when the connector echoes an interrupt inbound
(handled in Task 1.4).
"""
...
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Act on a shared-identity capability bound to a session (A2 outbound).
Some platforms hand the connector a credential that acts on the SHARED
bot identity (e.g. a Discord interaction follow-up token, valid ~15min).
Under A2 that credential NEVER reaches the gateway — the connector
stripped it at the edge and bound it in its capability vault keyed by
the session. To use it, the gateway issues a SEMANTIC action against the
session it is already in; it never names or holds a token.
The action dict carries:
``op`` == ``"follow_up"``
``session_key`` the session whose bound capability to wield
``kind`` the capability kind (e.g. ``"discord.interaction_token"``)
``content`` the message content to send via that capability
``metadata?`` optional extras
The connector resolves the real capability (``resolveOutboundCapability``
on its side), enforces the tenant match (tenant B can never wield tenant
A's capability), and egresses. Returns ``{success, message_id?, error?}``;
``success`` is False when the capability is absent/expired or the tenant
doesn't match — the gateway then has nothing to retry with (by design: a
leaked gateway holds zero capability material).
"""
...

View File

@@ -0,0 +1,267 @@
"""Production WebSocket RelayTransport — the gateway's live link to the connector.
The gateway dials OUT to the connector's relay endpoint over a WebSocket and
speaks the newline-delimited JSON frame protocol defined in the connector repo
(``gateway-gateway`` ``src/relay/protocol.ts``) and mirrored in
``docs/relay-connector-contract.md``:
gateway -> connector : hello, outbound, interrupt
connector -> gateway : descriptor, inbound, outbound_result, interrupt_inbound
Frames:
hello {type, platform, botId}
descriptor {type, descriptor} (handshake reply)
inbound {type, event, bufferId?} (a normalized MessageEvent)
outbound {type, requestId, action} (send/edit/typing/follow_up)
outbound_result {type, requestId, result}
interrupt {type, session_key, reason?} (gateway egresses /stop)
interrupt_inbound{type, session_key, chat_id} (connector -> owning gateway)
This is the concrete transport behind the ``RelayTransport`` Protocol; the
``RelayAdapter`` delegates all wire I/O to it. Outbound calls block on a
per-request future keyed by ``requestId`` until the matching ``outbound_result``
arrives. A background reader task pumps inbound frames to the registered handler
and resolves pending outbound futures.
EXPERIMENTAL: the frame schema may change without a deprecation cycle until at
least two Class-1 platforms validate it.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from typing import Any, Dict, Optional
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import InboundHandler
logger = logging.getLogger(__name__)
try: # lazy/optional dep — mirrors gateway/platforms/feishu.py
import websockets
except ImportError: # pragma: no cover - exercised only when the extra is absent
websockets = None # type: ignore[assignment]
WEBSOCKETS_AVAILABLE = websockets is not None
# How long to wait for the handshake descriptor and for each outbound result.
_HANDSHAKE_TIMEOUT_S = 30.0
_OUTBOUND_TIMEOUT_S = 30.0
def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent:
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
The connector emits SessionSource as the snake_case wire form (§3); map it
back onto the gateway dataclasses. Unknown message types fall back to TEXT.
"""
src = raw.get("source", {}) or {}
from gateway.config import Platform
platform = src.get("platform", "relay")
try:
platform_enum = Platform(platform)
except ValueError:
platform_enum = Platform.RELAY
source = SessionSource(
platform=platform_enum,
chat_id=src.get("chat_id", ""),
chat_type=src.get("chat_type", "dm"),
chat_name=src.get("chat_name"),
user_id=src.get("user_id"),
user_name=src.get("user_name"),
thread_id=src.get("thread_id"),
chat_topic=src.get("chat_topic"),
user_id_alt=src.get("user_id_alt"),
chat_id_alt=src.get("chat_id_alt"),
guild_id=src.get("guild_id"),
parent_chat_id=src.get("parent_chat_id"),
message_id=src.get("message_id"),
)
try:
msg_type = MessageType(raw.get("message_type", "text"))
except ValueError:
msg_type = MessageType.TEXT
return MessageEvent(
text=raw.get("text", ""),
message_type=msg_type,
source=source,
message_id=raw.get("message_id"),
reply_to_message_id=raw.get("reply_to_message_id"),
media_urls=raw.get("media_urls") or [],
)
class WebSocketRelayTransport:
"""RelayTransport over a WebSocket connection the gateway dials to the connector."""
def __init__(
self,
url: str,
platform: str,
bot_id: str,
*,
connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S,
outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S,
) -> None:
if not WEBSOCKETS_AVAILABLE:
raise RuntimeError(
"WebSocketRelayTransport requires the 'websockets' package "
"(install the messaging extra)."
)
self._url = url
self._platform = platform
self._bot_id = bot_id
self._connect_timeout_s = connect_timeout_s
self._outbound_timeout_s = outbound_timeout_s
self._ws: Any = None
self._reader: Optional[asyncio.Task[None]] = None
self._inbound: Optional[InboundHandler] = None
self._descriptor: Optional[CapabilityDescriptor] = None
self._descriptor_ready: asyncio.Future[CapabilityDescriptor] | None = None
# requestId -> future awaiting the matching outbound_result.
self._pending: Dict[str, asyncio.Future[Dict[str, Any]]] = {}
self._closing = False
# ── lifecycle ────────────────────────────────────────────────────────
async def connect(self) -> bool:
loop = asyncio.get_running_loop()
self._descriptor_ready = loop.create_future()
self._ws = await websockets.connect(self._url) # type: ignore[union-attr]
self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader")
# Send hello; the descriptor arrives via the reader and resolves handshake().
await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id})
return True
async def disconnect(self) -> None:
self._closing = True
if self._reader is not None:
self._reader.cancel()
try:
await self._reader
except (asyncio.CancelledError, Exception): # noqa: BLE001 - best-effort teardown
pass
self._reader = None
if self._ws is not None:
try:
await self._ws.close()
except Exception: # noqa: BLE001
pass
self._ws = None
# Fail any in-flight outbound waiters so callers don't hang.
for fut in self._pending.values():
if not fut.done():
fut.set_exception(RuntimeError("relay transport closed"))
self._pending.clear()
async def handshake(self) -> CapabilityDescriptor:
if self._descriptor is not None:
return self._descriptor
if self._descriptor_ready is None:
raise RuntimeError("handshake() called before connect()")
return await asyncio.wait_for(self._descriptor_ready, timeout=self._connect_timeout_s)
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
# ── outbound ─────────────────────────────────────────────────────────
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
return await self._request_response(action)
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
# follow_up rides the same outbound frame; the connector dispatches by
# action.op. Kept as a distinct method to satisfy the transport Protocol
# and to make the A2 call site explicit.
return await self._request_response(action)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
result = await self._request_response(
{"op": "get_chat_info", "chat_id": chat_id}, frame_type="outbound"
)
# The connector answers chat-info inside the outbound_result envelope.
info = result.get("chat_info") or result
return {"name": info.get("name", chat_id), "type": info.get("type", "dm")}
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
await self._send({"type": "interrupt", "session_key": session_key, "reason": reason})
async def _request_response(
self, action: Dict[str, Any], frame_type: str = "outbound"
) -> Dict[str, Any]:
if self._ws is None:
return {"success": False, "error": "relay transport not connected"}
request_id = uuid.uuid4().hex
loop = asyncio.get_running_loop()
fut: asyncio.Future[Dict[str, Any]] = loop.create_future()
self._pending[request_id] = fut
try:
await self._send({"type": frame_type, "requestId": request_id, "action": action})
return await asyncio.wait_for(fut, timeout=self._outbound_timeout_s)
except asyncio.TimeoutError:
return {"success": False, "error": "relay outbound timed out"}
finally:
self._pending.pop(request_id, None)
# ── wire I/O ─────────────────────────────────────────────────────────
async def _send(self, frame: Dict[str, Any]) -> None:
if self._ws is None:
raise RuntimeError("relay transport not connected")
await self._ws.send(json.dumps(frame) + "\n")
async def _read_loop(self) -> None:
assert self._ws is not None
buf = ""
try:
async for chunk in self._ws:
buf += chunk if isinstance(chunk, str) else chunk.decode("utf-8")
# Newline-delimited frames; keep any trailing partial line.
*lines, buf = buf.split("\n")
for line in lines:
if line.strip():
await self._handle_frame(line)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection is caller policy
if not self._closing:
logger.warning("relay ws read loop ended: %s", exc)
async def _handle_frame(self, line: str) -> None:
try:
frame = json.loads(line)
except json.JSONDecodeError:
logger.warning("relay: skipping malformed frame")
return
ftype = frame.get("type")
if ftype == "descriptor":
descriptor = CapabilityDescriptor.from_json(json.dumps(frame.get("descriptor", {})))
self._descriptor = descriptor
if self._descriptor_ready is not None and not self._descriptor_ready.done():
self._descriptor_ready.set_result(descriptor)
elif ftype == "inbound":
if self._inbound is not None:
event = _event_from_wire(frame.get("event", {}))
await self._inbound(event)
elif ftype == "outbound_result":
fut = self._pending.get(frame.get("requestId", ""))
if fut is not None and not fut.done():
fut.set_result(frame.get("result", {}))
elif ftype == "interrupt_inbound":
# Bridged into the adapter's interrupt path by the runner wiring.
handler = getattr(self, "_interrupt_inbound_handler", None)
if handler is not None:
await handler(frame.get("session_key", ""), frame.get("chat_id", ""))
else:
# hello/outbound/interrupt are gateway->connector; ignore if echoed.
pass
def set_interrupt_inbound_handler(self, handler: Any) -> None:
"""Register the callback for connector->gateway interrupt_inbound frames."""
self._interrupt_inbound_handler = handler

View File

@@ -4614,6 +4614,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"plugin discovery failed at gateway startup", exc_info=True,
)
# Register the generic relay adapter when a connector relay URL is
# configured (GATEWAY_RELAY_URL / gateway.relay_url). No URL -> no-op, so
# direct/single-tenant deployments are unaffected. When configured, the
# adapter dials the connector over a WebSocket, negotiates its capability
# descriptor at handshake, and bridges inbound/outbound like any platform.
try:
from gateway.relay import register_relay_adapter, relay_url
if register_relay_adapter():
logger.info("relay adapter registered (connector at %s)", relay_url())
except Exception:
logger.warning(
"relay adapter registration failed at gateway startup", exc_info=True,
)
# Register declarative shell hooks from cli-config.yaml. Gateway
# has no TTY, so consent has to come from one of the three opt-in
# channels (--accept-hooks on launch, HERMES_ACCEPT_HOOKS env var,

View File

View File

@@ -0,0 +1,75 @@
"""Test-only in-memory stub connector implementing RelayTransport.
MUST stay under tests/ — never under plugins/ or gateway/ (a CI guard in
test_no_stub_leak.py asserts this). It lets Phase 1 prove the gateway side of
the relay end-to-end with zero dependency on the real (Node) connector.
The stub:
- hands back a fixed CapabilityDescriptor at handshake,
- lets a test push synthetic inbound MessageEvents (push_inbound),
- records every outbound action (sent/interrupts) for assertions,
- answers get_chat_info from a small fixture map.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import InboundHandler
class StubConnector:
"""In-memory RelayTransport for tests."""
def __init__(self, descriptor: CapabilityDescriptor) -> None:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
self.follow_ups: List[Dict[str, Any]] = []
self.chat_info: Dict[str, Dict[str, Any]] = {}
# Canned result for the next send_outbound (override per-test).
self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"}
# Canned result for the next send_follow_up (override per-test). Default
# mimics a resolved capability egress; set success=False to simulate an
# absent/expired capability or a tenant mismatch on the connector side.
self.next_follow_up_result: Dict[str, Any] = {"success": True, "message_id": "f1"}
async def connect(self) -> bool:
self.connected = True
return True
async def disconnect(self) -> None:
self.connected = False
async def handshake(self) -> CapabilityDescriptor:
return self._descriptor
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
return dict(self.next_send_result)
return {"success": True}
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
return self.chat_info.get(chat_id, {"name": chat_id, "type": "dm"})
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
self.interrupts.append({"session_key": session_key, "reason": reason})
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.follow_ups.append(action)
return dict(self.next_follow_up_result)
# ── test driver ──────────────────────────────────────────────────────
async def push_inbound(self, event: MessageEvent) -> None:
"""Simulate the connector delivering a normalized inbound event."""
if self._inbound is None:
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
await self._inbound(event)

View File

@@ -0,0 +1,184 @@
"""Cross-repo contract conformance: docs/relay-connector-contract.md ⟷ Python.
The contract doc is the formal interface the connector repo
(NousResearch/gateway-gateway) implements against. The connector's TypeScript
structs are hand-mirrored from the doc, so if the Python source of truth drifts
from the doc, the two repos silently diverge and the handshake / session-keying
breaks only at integration time.
These tests make the doc ⟷ code relationship an enforced invariant:
* Every ``CapabilityDescriptor`` field (§2 table) is documented with the
correct required/optional flag, and the doc lists no fields the dataclass
lacks.
* Every ``SessionSource`` wire key (what ``to_dict()`` actually serializes)
is named in the contract doc's §3 discriminator section, and every
discriminator the doc calls out as a column header exists on the dataclass.
They are invariants, NOT change-detector snapshots: they assert the *relation*
between two artifacts that must move together, not a frozen list of names. Add
a field to the descriptor and the doc, and the test stays green; add it to only
one, and CI fails — which is exactly the lockstep guarantee the plan's
Cross-Repo Coordination Checklist calls for.
"""
from __future__ import annotations
import re
from pathlib import Path
import pytest
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.session import SessionSource
# Repo root: tests/gateway/relay/ -> repo root is parents[3]
_CONTRACT_DOC = (
Path(__file__).resolve().parents[3] / "docs" / "relay-connector-contract.md"
)
def _doc_text() -> str:
assert _CONTRACT_DOC.exists(), (
f"Contract doc missing at {_CONTRACT_DOC}. It is the formal cross-repo "
f"interface (Phase 1, Task 1.5) and must ship with the relay adapter."
)
return _CONTRACT_DOC.read_text(encoding="utf-8")
def _parse_descriptor_table(text: str) -> dict[str, bool]:
"""Parse §2's markdown table → {field_name: required}.
Rows look like: ``| `field` | type | yes|no | meaning |``. Returns a map of
field name to whether the Required column says "yes".
"""
fields: dict[str, bool] = {}
# Restrict to the §2 section so §3/§4 tables don't bleed in.
section = text.split("## 2. CapabilityDescriptor", 1)[-1].split("## 3.", 1)[0]
row_re = re.compile(r"^\|\s*`([a-z_]+)`\s*\|[^|]*\|\s*(yes|no)\s*\|", re.M)
for name, required in row_re.findall(section):
fields[name] = required.strip() == "yes"
return fields
def test_descriptor_fields_match_contract_doc():
"""§2 table ⟷ CapabilityDescriptor dataclass, names + required/optional."""
documented = _parse_descriptor_table(_doc_text())
assert documented, "Failed to parse any descriptor fields from the §2 table."
dc_fields = CapabilityDescriptor.__dataclass_fields__ # type: ignore[attr-defined]
# A dataclass field is "required" iff it has no default and no default_factory.
import dataclasses
code_required = {
name
for name, f in dc_fields.items()
if f.default is dataclasses.MISSING
and f.default_factory is dataclasses.MISSING # type: ignore[misc]
}
code_names = set(dc_fields.keys())
doc_names = set(documented.keys())
missing_from_doc = code_names - doc_names
assert not missing_from_doc, (
f"CapabilityDescriptor fields missing from the §2 contract-doc table: "
f"{sorted(missing_from_doc)}. Document them so the connector mirrors them."
)
extra_in_doc = doc_names - code_names
assert not extra_in_doc, (
f"Contract-doc §2 table documents fields the dataclass does not have: "
f"{sorted(extra_in_doc)}. Remove them or add them to descriptor.py."
)
# Required/optional must agree, so the connector knows which fields it may omit.
for name, doc_required in documented.items():
assert doc_required == (name in code_required), (
f"Field '{name}': contract doc says required={doc_required}, but the "
f"dataclass says required={name in code_required}. Reconcile them."
)
def _session_source_wire_keys() -> set[str]:
"""Keys ``SessionSource.to_dict()`` can emit (the actual wire surface).
Build a maximally-populated source so conditionally-included keys (the
``if self.x:`` branches in ``to_dict``) all appear.
"""
from gateway.config import Platform
src = SessionSource(
platform=Platform.DISCORD,
chat_id="c",
chat_name="n",
chat_type="channel",
user_id="u",
user_name="un",
thread_id="t",
chat_topic="topic",
user_id_alt="ua",
chat_id_alt="ca",
guild_id="g",
parent_chat_id="p",
message_id="m",
)
return set(src.to_dict().keys())
def test_session_source_wire_keys_documented_in_contract():
"""Every wire key SessionSource.to_dict() emits is named in the contract doc.
The doc enumerates discriminators in prose + a per-platform table (§3) rather
than a strict field table, so this asserts presence-by-name: a wire key the
connector must populate but which appears nowhere in the doc is a silent gap.
"""
text = _doc_text()
# Limit to §3 (the MessageEvent / SessionSource section).
section = text.split("## 3. Inbound", 1)[-1].split("## 4.", 1)[0]
wire_keys = _session_source_wire_keys()
# Keys that are self-evidently covered by the §3 narrative/table.
# We assert each wire key appears as a backticked token or table cell.
undocumented = sorted(k for k in wire_keys if k not in section)
assert not undocumented, (
f"SessionSource wire keys absent from the §3 contract-doc section: "
f"{undocumented}. The connector normalizes events into these keys; if the "
f"doc doesn't name them the connector author can't know to populate them. "
f"Document them (prose or the discriminator table)."
)
def test_internal_only_session_fields_stay_off_the_wire():
"""Guard the inverse: fields deliberately NOT serialized must not leak.
``is_bot`` is an internal author-classification flag that today is NOT in
``to_dict()`` (so the connector's TS contract correctly omits it). If someone
adds it to the wire without updating the contract doc + connector, this flips
and forces the conversation. This documents the intentional omission.
"""
wire_keys = _session_source_wire_keys()
assert "is_bot" not in wire_keys, (
"is_bot is now serialized by SessionSource.to_dict(). If this is "
"intentional, add it to docs/relay-connector-contract.md §3 and the "
"connector's SessionSource interface, then update this guard."
)
@pytest.mark.parametrize("discriminator", ["chat_id", "chat_type", "user_id", "thread_id", "guild_id"])
def test_discord_telegram_discriminator_columns_present(discriminator):
"""§3's per-platform table headers must exist as SessionSource fields.
These five columns drive build_session_key() and are the #1 High-severity
risk surface (Discord guild_id collision). If the doc advertises a
discriminator column the dataclass can't carry, the connector has nowhere to
put it.
"""
assert discriminator in SessionSource.__dataclass_fields__, ( # type: ignore[attr-defined]
f"Contract doc §3 lists '{discriminator}' as a session discriminator, "
f"but SessionSource has no such field."
)
# And it must be reachable on the wire (chat_type is always emitted; the rest
# are conditional but still possible keys).
assert discriminator in _session_source_wire_keys(), (
f"Discriminator '{discriminator}' never appears in SessionSource.to_dict() "
f"output — the connector cannot transmit it to the gateway."
)

View File

@@ -0,0 +1,66 @@
"""Tests for the experimental CapabilityDescriptor (relay Phase 0, Task 0.2)."""
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def _telegram_descriptor(**overrides) -> CapabilityDescriptor:
base = dict(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
pii_safe=False,
)
base.update(overrides)
return CapabilityDescriptor(**base)
def test_descriptor_roundtrips_json():
d = _telegram_descriptor()
assert CapabilityDescriptor.from_json(d.to_json()) == d
def test_descriptor_is_frozen():
d = _telegram_descriptor()
try:
d.max_message_length = 1 # type: ignore[misc]
except Exception as exc: # FrozenInstanceError
assert "cannot assign" in str(exc) or "frozen" in str(exc).lower()
else: # pragma: no cover
raise AssertionError("descriptor should be immutable (frozen)")
def test_from_json_ignores_unknown_keys():
"""A newer connector may send fields this gateway doesn't know — those are
dropped, not fatal (forward-compat during the experimental phase)."""
d = _telegram_descriptor()
raw = d.to_json()[:-1] + ', "future_field": "ignored"}'
restored = CapabilityDescriptor.from_json(raw)
assert restored == d
def test_from_json_fills_optional_defaults():
"""Optional fields (emoji/platform_hint/pii_safe) fall back to defaults."""
minimal = (
'{"contract_version": 1, "platform": "x", "label": "X", '
'"max_message_length": 2000, "supports_draft_streaming": false, '
'"supports_edit": false, "supports_threads": false, '
'"markdown_dialect": "plain", "len_unit": "chars"}'
)
d = CapabilityDescriptor.from_json(minimal)
assert d.pii_safe is False
assert d.platform_hint == ""
assert d.emoji == "\U0001f50c"
def test_module_is_marked_experimental():
import gateway.relay.descriptor as m
assert "EXPERIMENTAL" in (m.__doc__ or "")

View File

@@ -0,0 +1,64 @@
"""Descriptor <- PlatformEntry projection (relay Phase 0, Task 0.3).
Proves the CapabilityDescriptor is a projection of the existing PlatformEntry,
not a parallel concept: the entry's label/limit/emoji/hint/pii fields carry
straight through.
"""
from gateway.platform_registry import PlatformEntry
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def _entry(**overrides) -> PlatformEntry:
base = dict(
name="telegram",
label="Telegram",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
max_message_length=4096,
pii_safe=False,
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
)
base.update(overrides)
return PlatformEntry(**base)
def test_projection_carries_platform_entry_fields():
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
assert d.contract_version == CONTRACT_VERSION
assert d.platform == "telegram"
assert d.label == "Telegram"
assert d.max_message_length == 4096
assert d.emoji == "\u2708\ufe0f"
assert d.platform_hint == "You are on Telegram."
assert d.pii_safe is False
assert d.len_unit == "utf16"
def test_zero_max_length_maps_to_4096_default():
"""PlatformEntry.max_message_length == 0 means 'no limit'; the descriptor
carries a concrete bound matching the stream_consumer default."""
d = CapabilityDescriptor.from_platform_entry(_entry(max_message_length=0))
assert d.max_message_length == 4096
def test_runtime_capabilities_supplied_by_caller():
"""PlatformEntry doesn't encode draft/edit/thread/markdown behavior — those
come from the caller (the connector, reading the live adapter)."""
d = CapabilityDescriptor.from_platform_entry(
_entry(),
supports_draft_streaming=True,
supports_edit=False,
supports_threads=True,
markdown_dialect="discord",
)
assert d.supports_draft_streaming is True
assert d.supports_edit is False
assert d.supports_threads is True
assert d.markdown_dialect == "discord"
def test_projection_roundtrips_through_json():
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
assert CapabilityDescriptor.from_json(d.to_json()) == d

View File

@@ -0,0 +1,44 @@
"""CI guard: the test-only StubConnector must never leak into production paths.
The relay stub connector lives under tests/ and exists only to prove the
gateway side of the relay without the real (Node) connector. If it ever appears
under gateway/ or plugins/, that's a production leak — fail loudly.
"""
from __future__ import annotations
import pathlib
import re
_REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
_FORBIDDEN_DIRS = ("gateway", "plugins")
# Match actual code leaks (imports / class definitions), not prose mentions in
# docstrings/comments. A production file that *imports* the stub or *defines*
# StubConnector is a real leak; a docstring that references the stub's path as
# documentation is not.
_LEAK_PATTERNS = (
re.compile(r"^\s*(?:from|import)\s+.*stub_connector", re.MULTILINE),
re.compile(r"^\s*(?:from|import)\s+.*\bStubConnector\b", re.MULTILINE),
re.compile(r"^\s*class\s+StubConnector\b", re.MULTILINE),
)
def test_stub_connector_does_not_leak_into_production_paths():
offenders: list[str] = []
for top in _FORBIDDEN_DIRS:
base = _REPO_ROOT / top
if not base.is_dir():
continue
for path in base.rglob("*.py"):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError: # pragma: no cover
continue
for pat in _LEAK_PATTERNS:
if pat.search(text):
offenders.append(
f"{path.relative_to(_REPO_ROOT)} matches {pat.pattern!r}"
)
assert not offenders, (
"relay test stub leaked into production paths:\n " + "\n ".join(offenders)
)

View File

@@ -0,0 +1,77 @@
"""RelayAdapter capability-advertisement tests (relay Phase 1, Task 1.1)."""
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def make_desc(**kw) -> CapabilityDescriptor:
base = dict(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="",
pii_safe=False,
)
base.update(kw)
return CapabilityDescriptor(**base)
def _adapter(**desc_kw) -> RelayAdapter:
return RelayAdapter(PlatformConfig(), make_desc(**desc_kw))
def test_relay_platform_member_exists():
assert Platform("relay") is Platform.RELAY
def test_advertises_descriptor_max_length():
a = _adapter(max_message_length=2000)
assert a.MAX_MESSAGE_LENGTH == 2000
def test_supports_draft_streaming_follows_descriptor():
assert _adapter(supports_draft_streaming=False).supports_draft_streaming() is False
assert _adapter(supports_draft_streaming=True).supports_draft_streaming() is True
def test_len_fn_utf16_counts_code_units():
a = _adapter(len_unit="utf16")
# An astral-plane emoji is two UTF-16 code units.
assert a.message_len_fn("\U0001f600") == 2
def test_len_fn_chars_uses_builtin_len():
a = _adapter(len_unit="chars")
assert a.message_len_fn("\U0001f600") == 1
def test_is_a_base_platform_adapter():
# stream_consumer's isinstance(adapter, BasePlatformAdapter) guard must pass.
from gateway.platforms.base import BasePlatformAdapter
assert isinstance(_adapter(), BasePlatformAdapter)
@pytest.mark.asyncio
async def test_connect_without_transport_raises():
a = _adapter()
with pytest.raises(RuntimeError, match="no transport"):
await a.connect()
@pytest.mark.asyncio
async def test_send_without_transport_returns_failure():
a = _adapter()
result = await a.send("chat1", "hello")
assert result.success is False
assert result.error == "no transport"

View File

@@ -0,0 +1,117 @@
"""A2 outbound capability action: the token-less ``follow_up`` op.
Proves the gateway can act on a shared-identity capability (e.g. a Discord
interaction follow-up token) WITHOUT ever holding the credential: it names the
session it is in plus the capability ``kind``, and the connector resolves the
real value from its vault and egresses. See gateway/relay/transport.py
(send_follow_up) and docs/relay-connector-contract.md §4.
The gateway side is what's exercised here (against the stub connector); the
connector's resolve + tenant-match enforcement lives in the connector repo
(resolveOutboundCapability). The key gateway-side guarantees:
- the wire action carries NO token (only session_key + kind + content),
- success/failure surfaces from the connector's resolve result,
- a failed resolve (absent/expired/tenant mismatch) returns success=False
with nothing for the gateway to retry with.
"""
from __future__ import annotations
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _discord_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def wired():
stub = StubConnector(_discord_descriptor())
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_follow_up_round_trips_without_a_token(wired):
adapter, stub = wired
await adapter.connect()
stub.next_follow_up_result = {"success": True, "message_id": "fu-7"}
result = await adapter.send_follow_up(
session_key="agent:main:discord:group:chanA:userX",
kind="discord.interaction_token",
content="here is your follow-up",
)
assert result.success is True
assert result.message_id == "fu-7"
assert len(stub.follow_ups) == 1
action = stub.follow_ups[0]
assert action["op"] == "follow_up"
assert action["session_key"] == "agent:main:discord:group:chanA:userX"
assert action["kind"] == "discord.interaction_token"
assert action["content"] == "here is your follow-up"
@pytest.mark.asyncio
async def test_follow_up_wire_action_carries_no_credential(wired):
"""The action dict must carry only session refs — no credential VALUE.
Note the capability ``kind`` legitimately names the credential type
(e.g. ``"discord.interaction_token"``) — that's a reference, not the secret.
The guarantee is structural: the action has exactly the token-less semantic
fields, and no field holds an actual credential value.
"""
adapter, stub = wired
await adapter.connect()
await adapter.send_follow_up(
session_key="sess-1", kind="discord.interaction_token", content="x", metadata={"a": 1}
)
action = stub.follow_ups[0]
# Exactly the token-less semantic fields (+ metadata); no value/secret field.
assert set(action.keys()) == {"op", "session_key", "kind", "content", "metadata"}
# No field NAMES a credential carrier (the kind string is a type ref, allowed).
assert "value" not in action
assert "token" not in action
assert "secret" not in action
assert "credential" not in action
@pytest.mark.asyncio
async def test_follow_up_failure_surfaces_when_capability_unresolvable(wired):
"""Connector couldn't resolve (absent/expired/tenant mismatch) -> success=False."""
adapter, stub = wired
await adapter.connect()
stub.next_follow_up_result = {"success": False, "error": "capability absent or tenant mismatch"}
result = await adapter.send_follow_up(
session_key="sess-1", kind="discord.interaction_token", content="x"
)
assert result.success is False
assert result.message_id is None
assert "tenant mismatch" in (result.error or "")
@pytest.mark.asyncio
async def test_follow_up_without_transport_fails_cleanly():
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=None)
result = await adapter.send_follow_up(session_key="s", kind="k", content="c")
assert result.success is False
assert result.error == "no transport"

View File

@@ -0,0 +1,69 @@
"""Relay /stop interrupt routing (relay Phase 1, Task 1.4).
Proves a connector-delivered mid-turn interrupt reaches the existing per-session
interrupt mechanism and cancels exactly the targeted session_key's turn — never
a sibling's. Mirrors the isolation discipline of test_stop_thread_sibling.py.
"""
from __future__ import annotations
import asyncio
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _desc() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def adapter():
return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc()))
@pytest.mark.asyncio
async def test_interrupt_sets_only_target_session_event(adapter):
key_a = "agent:main:discord:group:chanA:userX"
key_b = "agent:main:discord:group:chanB:userY"
ev_a = asyncio.Event()
ev_b = asyncio.Event()
adapter._active_sessions[key_a] = ev_a
adapter._active_sessions[key_b] = ev_b
await adapter.on_interrupt(key_a, chat_id="chanA")
assert ev_a.is_set() is True, "target session's interrupt Event must be set"
assert ev_b.is_set() is False, "sibling session must be untouched"
@pytest.mark.asyncio
async def test_interrupt_unknown_session_is_noop(adapter):
# No active session for this key — must not raise.
await adapter.on_interrupt("agent:main:discord:group:nope:userZ", chat_id="nope")
@pytest.mark.asyncio
async def test_outbound_interrupt_reaches_connector(adapter):
"""The gateway-side /stop egress: send_interrupt is carried to the connector
so it can forward down the socket owning the session_key."""
stub = adapter._transport
await stub.send_interrupt("agent:main:discord:group:chanA:userX", reason="stop")
assert stub.interrupts == [
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
]

View File

@@ -0,0 +1,66 @@
"""RelayAdapter registration via the platform registry.
The relay platform is registered when a connector relay URL is configured
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml) — the same
config-driven shape as ``gateway.proxy_url``, not a separate feature flag. With
no URL configured, registration is a no-op so direct/single-tenant deployments
are unaffected. ``force=True`` registers a transport-less adapter for tests.
"""
from __future__ import annotations
import pytest
from gateway.config import PlatformConfig
from gateway.platform_registry import platform_registry
from gateway.relay import register_relay_adapter, relay_url
from gateway.relay.adapter import RelayAdapter
@pytest.fixture(autouse=True)
def _clean_registry(monkeypatch):
"""Each test starts/ends with no 'relay' entry and a clean relay env."""
monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False)
monkeypatch.delenv("GATEWAY_RELAY_PLATFORM", raising=False)
monkeypatch.delenv("GATEWAY_RELAY_BOT_ID", raising=False)
platform_registry.unregister("relay")
yield
platform_registry.unregister("relay")
def test_off_when_no_url_configured(monkeypatch):
# No GATEWAY_RELAY_URL and (assuming) no gateway.relay_url in config.
monkeypatch.setattr("gateway.relay.relay_url", lambda: None)
assert register_relay_adapter() is False
assert platform_registry.is_registered("relay") is False
def test_registers_when_url_configured(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay")
assert relay_url() == "wss://connector.example/relay"
assert register_relay_adapter() is True
assert platform_registry.is_registered("relay") is True
def test_explicit_url_arg_registers():
assert register_relay_adapter(url="wss://connector.example/relay") is True
assert platform_registry.is_registered("relay") is True
def test_force_registers_without_url():
assert register_relay_adapter(force=True) is True
assert platform_registry.is_registered("relay") is True
def test_trailing_slash_stripped(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay/")
assert relay_url() == "wss://connector.example/relay"
def test_create_adapter_yields_relay_adapter():
# force=True builds a transport-less adapter (no live dial in unit tests).
register_relay_adapter(force=True)
adapter = platform_registry.create_adapter("relay", PlatformConfig())
assert isinstance(adapter, RelayAdapter)
# Placeholder descriptor until handshake negotiates the real one.
assert adapter.descriptor.platform == "relay"

View File

@@ -0,0 +1,122 @@
"""End-to-end relay round-trip against the in-memory stub connector.
Proves the gateway side of the relay works with no real connector:
- connect() registers the inbound handler,
- a connector-delivered MessageEvent reaches the adapter's message path,
- SessionSource discriminators (guild_id) drive build_session_key isolation,
- an outbound send round-trips through the transport.
These target the transport contract + session-key derivation (Task 1.2's gate),
not the full agent turn — handle_message is patched to capture the event.
"""
from __future__ import annotations
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _discord_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
emoji="\U0001f47e",
platform_hint="You are on Discord.",
pii_safe=False,
)
def _discord_event(guild_id: str, channel_id: str, user_id: str, text: str) -> MessageEvent:
"""Synthetic inbound the connector would build from a discord.js message."""
source = SessionSource(
platform=Platform.DISCORD,
chat_id=channel_id,
chat_type="group",
user_id=user_id,
guild_id=guild_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
@pytest.fixture
def wired():
stub = StubConnector(_discord_descriptor())
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_connect_registers_inbound_handler(wired):
adapter, stub = wired
assert stub._inbound is None
ok = await adapter.connect()
assert ok is True
assert stub.connected is True
assert stub._inbound is not None
@pytest.mark.asyncio
async def test_inbound_event_reaches_adapter(wired, monkeypatch):
adapter, stub = wired
captured = []
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
await adapter.connect()
ev = _discord_event("guildA", "chan1", "userX", "hello")
await stub.push_inbound(ev)
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_two_guilds_isolate_into_distinct_session_keys(wired):
adapter, _ = wired
ev_a = _discord_event("guildA", "chan1", "userX", "hi from A")
ev_b = _discord_event("guildB", "chan2", "userX", "hi from B")
key_a = build_session_key(ev_a.source)
key_b = build_session_key(ev_b.source)
assert key_a != key_b
# Same guild + channel + user collapses to one session.
ev_a2 = _discord_event("guildA", "chan1", "userX", "again")
assert build_session_key(ev_a2.source) == key_a
@pytest.mark.asyncio
async def test_outbound_send_round_trips(wired):
adapter, stub = wired
await adapter.connect()
stub.next_send_result = {"success": True, "message_id": "msg-42"}
result = await adapter.send("chan1", "a reply", metadata={"k": "v"})
assert result.success is True
assert result.message_id == "msg-42"
assert len(stub.sent) == 1
assert stub.sent[0]["op"] == "send"
assert stub.sent[0]["chat_id"] == "chan1"
assert stub.sent[0]["content"] == "a reply"
@pytest.mark.asyncio
async def test_get_chat_info_proxied_to_connector(wired):
adapter, stub = wired
stub.chat_info["chan1"] = {"name": "general", "type": "group"}
info = await adapter.get_chat_info("chan1")
assert info == {"name": "general", "type": "group"}
async def _async_capture(sink, event):
sink.append(event)
return None

View File

@@ -0,0 +1,167 @@
"""End-to-end relay round-trip for Telegram against the in-memory stub.
Companion to ``test_relay_roundtrip.py`` (Discord). Proves the relay generalizes
beyond Discord — the Phase 1 exit gate requires *both* Telegram and Discord
descriptors to round-trip and their inbound ``MessageEvent``s to drive
``build_session_key()`` correctly.
Telegram's discriminator profile differs from Discord's, which is the point:
- No ``guild_id``; isolation between chats comes from ``chat_id`` alone.
- Forum topics live inside ONE ``chat_id`` and isolate by ``thread_id`` (the
Telegram analog of Discord's per-guild isolation).
- Forum/thread sessions are shared across participants by default
(``thread_sessions_per_user=False``) — user_id is NOT appended in a thread.
- ``len_unit="utf16"`` (Telegram counts UTF-16 code units) and
``markdown_dialect="markdown_v2"`` — distinct from Discord's chars/discord.
If the descriptor or session-keying only worked for Discord, these fail.
"""
from __future__ import annotations
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _telegram_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=True, # Telegram DMs support sendMessageDraft
supports_edit=True,
supports_threads=True, # forum topics
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
pii_safe=False,
)
def _tg_group_event(chat_id: str, user_id: str, text: str, thread_id: str | None = None) -> MessageEvent:
"""Synthetic inbound the connector would build from a Telegram update.
A plain group message has no thread_id; a forum-topic message carries the
topic id as thread_id (no guild_id — Telegram has no guild concept).
"""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="forum" if thread_id else "group",
user_id=user_id,
thread_id=thread_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
def _tg_dm_event(chat_id: str, user_id: str, text: str) -> MessageEvent:
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="dm",
user_id=user_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
@pytest.fixture
def wired():
desc = _telegram_descriptor()
stub = StubConnector(desc)
adapter = RelayAdapter(PlatformConfig(), desc, transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_telegram_descriptor_round_trips_through_stub(wired):
"""The connector's handshake descriptor for Telegram survives JSON + the
adapter configures itself from it (utf16 length unit, 4096 limit)."""
adapter, stub = wired
desc = _telegram_descriptor()
assert CapabilityDescriptor.from_json(desc.to_json()) == desc
# Adapter reflects the descriptor's capability profile.
assert adapter.MAX_MESSAGE_LENGTH == 4096
assert adapter.supports_draft_streaming() is True
# utf16 length unit selects a non-default len fn (Telegram counts UTF-16).
assert adapter.message_len_fn is not len
@pytest.mark.asyncio
async def test_inbound_telegram_event_reaches_adapter(wired, monkeypatch):
adapter, stub = wired
captured: list[MessageEvent] = []
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
await adapter.connect()
await stub.push_inbound(_tg_group_event("chat-100", "userX", "hello"))
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.platform == Platform.TELEGRAM
assert captured[0].source.guild_id is None # Telegram has no guild
@pytest.mark.asyncio
async def test_two_telegram_chats_isolate_by_chat_id(wired):
"""No guild_id on Telegram — two distinct chats must still isolate, keyed
on chat_id alone (the Discord-guild role is played by chat_id here)."""
ev_a = _tg_group_event("chat-A", "userX", "hi A")
ev_b = _tg_group_event("chat-B", "userX", "hi B")
key_a = build_session_key(ev_a.source)
key_b = build_session_key(ev_b.source)
assert key_a != key_b
# Same chat + same user collapses to one session.
ev_a2 = _tg_group_event("chat-A", "userX", "again")
assert build_session_key(ev_a2.source) == key_a
@pytest.mark.asyncio
async def test_forum_topics_isolate_by_thread_id_within_one_chat(wired):
"""Telegram forum topics share a single chat_id and isolate by thread_id —
the Telegram analog of Discord per-guild isolation. Two topics in the same
forum must NOT collide, and (threads shared across participants by default)
a second user in the same topic shares the session."""
topic1 = _tg_group_event("forum-1", "userX", "in topic 1", thread_id="t-1")
topic2 = _tg_group_event("forum-1", "userX", "in topic 2", thread_id="t-2")
k1 = build_session_key(topic1.source)
k2 = build_session_key(topic2.source)
assert k1 != k2, "two forum topics in one chat must not share a session"
# Same chat, no topic → distinct from any topic session.
plain = _tg_group_event("forum-1", "userX", "no topic")
assert build_session_key(plain.source) not in {k1, k2}
# Threads are shared across participants by default: a different user in the
# same topic lands on the SAME session key (user_id not appended in threads).
topic1_other_user = _tg_group_event("forum-1", "userY", "me too", thread_id="t-1")
assert build_session_key(topic1_other_user.source) == k1
@pytest.mark.asyncio
async def test_telegram_dm_isolates_by_chat_id(wired):
dm_a = _tg_dm_event("dm-111", "userX", "hey")
dm_b = _tg_dm_event("dm-222", "userY", "yo")
assert build_session_key(dm_a.source) != build_session_key(dm_b.source)
assert build_session_key(dm_a.source).startswith("agent:main:telegram:dm:")
@pytest.mark.asyncio
async def test_outbound_send_round_trips_telegram(wired):
adapter, stub = wired
await adapter.connect()
stub.next_send_result = {"success": True, "message_id": "tg-77"}
result = await adapter.send("chat-100", "a reply")
assert result.success is True
assert result.message_id == "tg-77"
assert stub.sent[0]["op"] == "send"
assert stub.sent[0]["chat_id"] == "chat-100"
async def _async_capture(sink, event):
sink.append(event)
return None

View File

@@ -0,0 +1,91 @@
"""Invariant: the relay path sheds platform crypto — it re-validates nothing.
Under the A2 trust model (see docs/relay-connector-contract.md §6), the
*connector* is the sole crypto/identity boundary: it verifies/decrypts every
inbound platform payload at the edge (it holds the tenant secrets), normalizes
it to a tenant-scoped ``MessageEvent``, and forwards only the sanitized event.
The gateway re-validates nothing — it cannot, without being handed the shared
signing secret, which would itself be the leak on a shared bot.
The relay package therefore MUST NOT import or call platform signature/crypto
verification (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt, generic webhook
signature checks). Those live in the *direct* platform adapters
(``gateway/platforms/*``) which serve non-relay deployments; the relay receives
already-trusted events. This test fails if someone bolts re-validation onto the
relay path, re-coupling the gateway to platform secrets it must never hold.
It is an invariant (asserts the *relation* "relay imports no crypto"), not a
change-detector snapshot of a frozen import list.
"""
from __future__ import annotations
import ast
import re
from pathlib import Path
# gateway/relay package directory: tests/gateway/relay/ -> repo root parents[3].
_REPO_ROOT = Path(__file__).resolve().parents[3]
_RELAY_PKG = _REPO_ROOT / "gateway" / "relay"
# Modules / symbols that mean "platform crypto re-validation". If the relay path
# imports any of these it has re-coupled the gateway to a platform secret.
_FORBIDDEN_MODULE_TOKENS = (
"wecom_crypto",
"wecom_callback",
"webhook", # gateway.platforms.webhook holds signature verification
)
_FORBIDDEN_SYMBOL_RE = re.compile(
r"(ed25519|verify_key|verifykey|verify_signature|verify_ed25519|"
r"verify_webhook|bizmsg|hmac|x[-_]signature)",
re.IGNORECASE,
)
def _relay_py_files() -> list[Path]:
assert _RELAY_PKG.is_dir(), f"relay package missing at {_RELAY_PKG}"
return sorted(_RELAY_PKG.glob("*.py"))
def test_relay_package_imports_no_platform_crypto():
"""No module in gateway/relay imports a platform-crypto / verification module."""
offenders: list[str] = []
for path in _relay_py_files():
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
for node in ast.walk(tree):
mods: list[str] = []
if isinstance(node, ast.Import):
mods = [alias.name for alias in node.names]
elif isinstance(node, ast.ImportFrom):
mods = [node.module or ""]
mods += [f"{node.module or ''}.{a.name}" for a in node.names]
for mod in mods:
if any(tok in mod for tok in _FORBIDDEN_MODULE_TOKENS):
offenders.append(f"{path.name}: imports '{mod}'")
assert not offenders, (
"The relay path must re-validate NOTHING (A2: connector is the sole "
"crypto boundary). Found platform-crypto imports in the relay package:\n "
+ "\n ".join(offenders)
+ "\nMove verification to the connector edge; the gateway trusts the "
"normalized MessageEvent. See docs/relay-connector-contract.md §6."
)
def test_relay_package_calls_no_signature_verification():
"""No relay module references a signature/crypto-verification symbol by name."""
offenders: list[str] = []
for path in _relay_py_files():
for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
# Skip comments / docstrings-as-prose: only flag code-like usage.
stripped = line.strip()
if stripped.startswith("#"):
continue
m = _FORBIDDEN_SYMBOL_RE.search(line)
if m:
offenders.append(f"{path.name}:{lineno}: '{m.group(0)}' in: {stripped[:80]}")
assert not offenders, (
"The relay path must not perform platform signature/crypto verification "
"(A2). Found verification-symbol references:\n "
+ "\n ".join(offenders)
+ "\nThe connector verifies at the edge; the gateway re-validates nothing."
)

View File

@@ -0,0 +1,179 @@
"""WebSocketRelayTransport against a real in-process WebSocket server.
Exercises the production transport over an actual ``websockets`` server (no
mock socket): handshake (hello -> descriptor), inbound frame -> handler,
outbound request/response correlation, and follow_up routing. Proves the wire
framing (newline-delimited JSON) and the request/response future plumbing work
end to end on a live socket.
Skipped cleanly if the optional ``websockets`` dependency is absent.
"""
from __future__ import annotations
import asyncio
import json
import pytest
import pytest_asyncio
from gateway.relay.ws_transport import WebSocketRelayTransport, WEBSOCKETS_AVAILABLE
pytestmark = pytest.mark.skipif(not WEBSOCKETS_AVAILABLE, reason="websockets not installed")
if WEBSOCKETS_AVAILABLE:
import websockets
DESCRIPTOR = {
"contract_version": 1,
"platform": "discord",
"label": "Discord",
"max_message_length": 2000,
"supports_draft_streaming": False,
"supports_edit": True,
"supports_threads": True,
"markdown_dialect": "discord",
"len_unit": "chars",
}
class _StubConnectorServer:
"""Minimal connector: answers hello with a descriptor, echoes outbound."""
def __init__(self):
self.received: list[dict] = []
self._server = None
self.url = ""
# Push channel: tests set this to a frame dict to deliver inbound.
self._to_push: list[dict] = []
async def start(self):
self._server = await websockets.serve(self._handle, "127.0.0.1", 0)
sock = next(iter(self._server.sockets))
port = sock.getsockname()[1]
self.url = f"ws://127.0.0.1:{port}"
async def stop(self):
if self._server is not None:
self._server.close()
await self._server.wait_closed()
async def _handle(self, ws):
async for raw in ws:
for line in str(raw).split("\n"):
if not line.strip():
continue
frame = json.loads(line)
self.received.append(frame)
await self._on_frame(ws, frame)
async def _on_frame(self, ws, frame):
ftype = frame.get("type")
if ftype == "hello":
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
# Deliver any queued inbound frames right after handshake.
for f in self._to_push:
await ws.send(json.dumps(f) + "\n")
elif ftype == "outbound":
action = frame.get("action", {})
# Echo a successful result correlated by requestId.
result = {"success": True, "message_id": f"srv-{action.get('op')}"}
await ws.send(
json.dumps({"type": "outbound_result", "requestId": frame["requestId"], "result": result})
+ "\n"
)
@pytest_asyncio.fixture
async def server():
srv = _StubConnectorServer()
await srv.start()
yield srv
await srv.stop()
@pytest.mark.asyncio
async def test_handshake_negotiates_descriptor(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
desc = await t.handshake()
assert desc.platform == "discord"
assert desc.max_message_length == 2000
# The hello carried the platform + botId.
hello = next(f for f in server.received if f["type"] == "hello")
assert hello["platform"] == "discord"
assert hello["botId"] == "appShared"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_inbound_frame_reaches_handler(server):
server._to_push = [
{
"type": "inbound",
"event": {
"text": "hello from connector",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
},
"bufferId": "buf-1",
}
]
received = []
t = WebSocketRelayTransport(server.url, "discord", "appShared")
t.set_inbound_handler(lambda ev: received.append(ev) or asyncio.sleep(0))
await t.connect()
try:
await t.handshake()
# Give the reader a tick to deliver the pushed inbound frame.
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].text == "hello from connector"
assert received[0].source.guild_id == "guildA"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_outbound_round_trips_with_correlation(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
result = await t.send_outbound({"op": "send", "chat_id": "chan1", "content": "hi"})
assert result["success"] is True
assert result["message_id"] == "srv-send"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_follow_up_round_trips(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
result = await t.send_follow_up(
{"op": "follow_up", "session_key": "s1", "kind": "discord.interaction_token", "content": "fu"}
)
assert result["success"] is True
assert result["message_id"] == "srv-follow_up"
# The follow_up rode an outbound frame the connector saw.
outbound = [f for f in server.received if f["type"] == "outbound"]
assert any(f["action"]["op"] == "follow_up" for f in outbound)
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_disconnect_fails_pending_waiters_cleanly(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared", outbound_timeout_s=5)
await t.connect()
await t.handshake()
await t.disconnect()
# After disconnect, an outbound returns a structured failure rather than hanging.
result = await t.send_outbound({"op": "send", "chat_id": "c", "content": "x"})
assert result["success"] is False

View File

@@ -98,6 +98,8 @@ def test_checker_returns_true_when_configured(platform, checker, monkeypatch):
mock_config.extra = {"app_id": "app", "app_secret": "sec"}
elif platform == Platform.DINGTALK:
mock_config.extra = {"client_id": "id", "client_secret": "sec"}
elif platform == Platform.RELAY:
mock_config.extra = {"relay_url": "wss://connector.example/relay"}
else:
pytest.skip(f"No synthetic config defined for {platform.value}")

View File

@@ -0,0 +1,99 @@
"""Phase 0 regression harness for the relay/connector work.
Locks the *behavioral contract* that the future ``RelayAdapter`` must reproduce:
the gateway's ``stream_consumer`` and ``BasePlatformAdapter`` read per-platform
capabilities through a small, stable surface. A relay adapter that exposes the
same surface (``MAX_MESSAGE_LENGTH`` attribute, ``message_len_fn`` property,
``supports_draft_streaming`` probe, and only the abstract methods) slots into
the existing consumer with no consumer changes.
These are deliberately *behavioral* (construct an adapter, drive the code,
assert the observable outcome) rather than source-string snapshots, per the
repo's "don't write change-detector tests" rule. They pass on ``main`` before
any ``RelayAdapter`` exists — they describe the contract, not the relay.
"""
import inspect
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter
class _MinAdapter(BasePlatformAdapter):
"""Smallest concrete adapter: implements exactly the abstract methods."""
async def connect(self): # pragma: no cover - not called
return True
async def disconnect(self): # pragma: no cover - not called
return None
async def send(self, *args, **kwargs): # pragma: no cover - not called
return None
async def get_chat_info(self, chat_id): # pragma: no cover - not called
return {}
def _make() -> BasePlatformAdapter:
return _MinAdapter(PlatformConfig(), Platform.LOCAL)
def test_abstract_methods_are_the_known_set():
"""The relay adapter must implement exactly this set of abstract methods.
Everything else on BasePlatformAdapter has a default, so ONE generic
RelayAdapter overriding the right subset is feasible without per-platform
gateway classes. If a new abstractmethod is added here, the relay design
(and the cross-repo contract) must be revisited — hence the lock.
NOTE: this is four methods, not three — ``get_chat_info`` is abstract too
(defined far below the connect/disconnect/send cluster in base.py). The
RelayAdapter must implement it (proxying a chat-info lookup to the
connector, or returning a descriptor-derived stub).
"""
abstract = {
name
for name, member in inspect.getmembers(BasePlatformAdapter)
if getattr(member, "__isabstractmethod__", False)
}
assert abstract == {"connect", "disconnect", "send", "get_chat_info"}
def test_message_len_fn_defaults_to_len():
"""message_len_fn is the per-platform length-unit hook (Telegram overrides
it for UTF-16). The default is plain ``len``; the relay adapter will
override it from its negotiated descriptor's ``len_unit``."""
inst = _make()
assert inst.message_len_fn("hello") == 5
def test_supports_draft_streaming_defaults_false():
"""Draft streaming is opt-in per platform; the consumer falls back to the
edit-based path when False. The relay adapter flips this from its
descriptor's ``supports_draft_streaming`` flag."""
inst = _make()
assert inst.supports_draft_streaming() is False
def test_stream_consumer_reads_max_message_length_by_attribute():
"""The consumer resolves the per-platform char limit by reading the
adapter's ``MAX_MESSAGE_LENGTH`` attribute (defaulting to 4096 when
absent). The relay adapter exposes this as an attribute set from its
descriptor — so a relay adapter that sets the attribute is chunked
correctly with no consumer change.
"""
from gateway import stream_consumer
class _NoLimit:
pass
class _WithLimit:
MAX_MESSAGE_LENGTH = 1234
assert getattr(_NoLimit(), "MAX_MESSAGE_LENGTH", 4096) == 4096
assert getattr(_WithLimit(), "MAX_MESSAGE_LENGTH", 4096) == 1234
# The consumer depends on BasePlatformAdapter for the message_len_fn
# isinstance guard (import-level contract the relay adapter satisfies by
# subclassing BasePlatformAdapter).
assert stream_consumer._BasePlatformAdapter is BasePlatformAdapter