mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-12 05:09:01 +08:00
Compare commits
19 Commits
fix/plugin
...
feat/gatew
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19399a7e32 | ||
|
|
b075c1ec91 | ||
|
|
f325dc71e5 | ||
|
|
ce120f0473 | ||
|
|
6dd4caf378 | ||
|
|
d603371644 | ||
|
|
4f59b4c657 | ||
|
|
0d585df15d | ||
|
|
ada43042f8 | ||
|
|
706b359cf8 | ||
|
|
5869d594ab | ||
|
|
1b3491e8b5 | ||
|
|
96e138ed24 | ||
|
|
75a12474ec | ||
|
|
6729118a4a | ||
|
|
eaf1721b9f | ||
|
|
593fba5f5d | ||
|
|
2b09c95c33 | ||
|
|
812a2977bd |
213
docs/relay-connector-contract.md
Normal file
213
docs/relay-connector-contract.md
Normal file
@@ -0,0 +1,213 @@
|
||||
# Relay ↔ Connector Contract (v1, EXPERIMENTAL)
|
||||
|
||||
> **Status:** EXPERIMENTAL. This contract MAY CHANGE without a deprecation
|
||||
> cycle until at least two real Class-1 platforms (Discord + Telegram) have
|
||||
> validated it. Evolution during the experimental phase is **additive-only**,
|
||||
> gated by `contract_version`. A breaking change updates both repos in lockstep.
|
||||
|
||||
This document is the formal interface between the **Hermes gateway** (Python,
|
||||
`gateway/relay/`) and the **connector** (Node/TypeScript,
|
||||
`NousResearch/gateway-gateway`). The connector implementer's first action is to
|
||||
read this file.
|
||||
|
||||
The gateway runs a generic `RelayAdapter` that dials **out** to the connector,
|
||||
receives a `CapabilityDescriptor` at handshake, then exchanges normalized
|
||||
`MessageEvent`s (inbound) and actions (outbound) over a per-turn bidirectional
|
||||
WebSocket. The gateway never learns which concrete platform is fronting it; the
|
||||
connector owns all platform-specific socket/identity logic.
|
||||
|
||||
---
|
||||
|
||||
## 1. Handshake
|
||||
|
||||
1. Gateway opens the transport (`connect`).
|
||||
2. Gateway calls `handshake()`; connector returns a `CapabilityDescriptor`
|
||||
(section 2) describing the platform this adapter instance fronts.
|
||||
3. Gateway configures the adapter from the descriptor (char limit, length unit,
|
||||
draft/edit/thread/markdown capabilities) and registers an inbound handler.
|
||||
4. Connector then streams inbound events and accepts outbound actions.
|
||||
|
||||
`contract_version` (currently `1`) is carried in the descriptor. The gateway
|
||||
ignores unknown descriptor fields (forward-compat) and fills missing optional
|
||||
fields from defaults.
|
||||
|
||||
---
|
||||
|
||||
## 2. CapabilityDescriptor (handshake payload)
|
||||
|
||||
JSON object. Source of truth: `gateway/relay/descriptor.py`.
|
||||
|
||||
| Field | Type | Required | Meaning |
|
||||
| --- | --- | --- | --- |
|
||||
| `contract_version` | int | yes | Contract version (additive-only within a version). |
|
||||
| `platform` | string | yes | Platform name (e.g. `"discord"`, `"telegram"`). |
|
||||
| `label` | string | yes | Human-readable label. |
|
||||
| `max_message_length` | int | yes | Char limit; gateway exposes as `MAX_MESSAGE_LENGTH`. 0 → treat as 4096. |
|
||||
| `supports_draft_streaming` | bool | yes | Native draft-streaming preview support. |
|
||||
| `supports_edit` | bool | yes | Edit-based streaming possible; if false, consumer degrades to one-message-per-segment. |
|
||||
| `supports_threads` | bool | yes | `create_handoff_thread` capability. |
|
||||
| `markdown_dialect` | string | yes | `"plain"`, `"markdown_v2"`, `"discord"`, … (drives `supports_code_blocks`). |
|
||||
| `len_unit` | string | yes | `"chars"` (builtin len) or `"utf16"` (Telegram UTF-16 code units). |
|
||||
| `emoji` | string | no | Display emoji (default 🔌). |
|
||||
| `platform_hint` | string | no | System-prompt platform hint. |
|
||||
| `pii_safe` | bool | no | Redact PII in session descriptions. |
|
||||
|
||||
Most fields are a projection of the gateway's existing `PlatformEntry`; the
|
||||
runtime-only fields (`len_unit`, `supports_*`, `markdown_dialect`) come from the
|
||||
live platform adapter's capability methods.
|
||||
|
||||
---
|
||||
|
||||
## 3. Inbound: `MessageEvent` envelope
|
||||
|
||||
The connector normalizes each platform wire event into a `MessageEvent`
|
||||
(`gateway/platforms/base.py`) and delivers it to the gateway's inbound handler.
|
||||
The gateway keys the session via `build_session_key()` from the embedded
|
||||
`SessionSource` — so populating the right discriminators is the single
|
||||
highest-correctness responsibility of the connector.
|
||||
|
||||
### SessionSource fields (the wire surface)
|
||||
|
||||
Source of truth: `SessionSource.to_dict()` in `gateway/session.py`. These are
|
||||
every key the gateway accepts on the wire. `platform`, `chat_id`, `chat_type`,
|
||||
`user_id`, `user_name`, `thread_id`, `chat_name`, and `chat_topic` are always
|
||||
present (may be `null`); the rest are included only when set.
|
||||
|
||||
| Field | Type | Always sent | Meaning |
|
||||
| --- | --- | --- | --- |
|
||||
| `platform` | string | yes | Platform name (matches the descriptor's `platform`). |
|
||||
| `chat_id` | string | yes | Primary conversation id (channel/chat). Session-key discriminator. |
|
||||
| `chat_type` | string | yes | `dm` / `group` / `channel` / `thread` / `forum`. |
|
||||
| `chat_name` | string\|null | yes | Human-readable chat name. |
|
||||
| `user_id` | string\|null | yes | Message author id. Session-key discriminator. |
|
||||
| `user_name` | string\|null | yes | Author display name. |
|
||||
| `thread_id` | string\|null | yes | Thread/forum-topic id when in a thread. Session-key discriminator. |
|
||||
| `chat_topic` | string\|null | yes | Channel topic/description (Discord, Slack). |
|
||||
| `user_id_alt` | string | no | Platform-specific stable alt id (Signal UUID, Feishu union_id). |
|
||||
| `chat_id_alt` | string | no | Alternate chat id (e.g. Signal group internal id). |
|
||||
| `guild_id` | string | no | Discord guild / Slack workspace / Matrix server scope. **REQUIRED for Discord server isolation.** Session-key discriminator. |
|
||||
| `parent_chat_id` | string | no | Parent channel when `chat_id` refers to a thread. |
|
||||
| `message_id` | string | no | Id of the triggering message (for pin/reply/react). |
|
||||
|
||||
> `is_bot` (author-is-a-bot/webhook classification) exists on the gateway-side
|
||||
> dataclass but is **intentionally NOT on the wire** in v1 — it is not part of
|
||||
> `to_dict()`. Do not add it to the connector's `SessionSource` until it is
|
||||
> first added here and to `to_dict()` (additive bump).
|
||||
|
||||
### SessionSource discriminators per platform
|
||||
|
||||
| Platform | chat_id | chat_type | user_id | thread_id | guild_id |
|
||||
| --- | --- | --- | --- | --- | --- |
|
||||
| **Discord** | channel id | `dm`/`group`/`thread` | author id | thread channel id (threads) | **guild id** (REQUIRED for server isolation) |
|
||||
| **Telegram** | chat id | `dm`/`group`/`forum` | from id | forum topic id (forums) | — |
|
||||
|
||||
**Get Discord's `guild_id` wrong and two servers collide into one session.**
|
||||
This is the #1 High-severity risk. The gateway's `build_session_key()` is the
|
||||
conformance oracle: for a given `SessionSource`, the connector's normalization
|
||||
must produce the same key the Python adapter would. (The Phase-1 stub tests
|
||||
assert known-input → known-key.)
|
||||
|
||||
### Bot identity vs tenant (single-bot consolidation, Appendix A)
|
||||
|
||||
The envelope carries the **originating bot identity** as a field **distinct from
|
||||
tenant**. Tenant is resolved from the event's own discriminator (Discord
|
||||
`guild_id`, Telegram `chat_id`, webhook path/subdomain) — **never** from which
|
||||
token/socket/process delivered it. This keeps one shared bot able to front many
|
||||
tenants (Phase 6) without overloading an existing field.
|
||||
|
||||
---
|
||||
|
||||
## 4. Outbound: action set
|
||||
|
||||
The gateway calls the transport with action dicts. Source of truth:
|
||||
`gateway/relay/transport.py` + `gateway/relay/adapter.py`.
|
||||
|
||||
| `op` | Fields | Result |
|
||||
| --- | --- | --- |
|
||||
| `send` | `chat_id`, `content`, `reply_to?`, `metadata?` | `{success: bool, message_id?, error?}` |
|
||||
| `edit` | `chat_id`, `message_id`, `content`, `metadata?` | `{success: bool, error?}` |
|
||||
| `typing` | `chat_id` | `{success: bool}` |
|
||||
| `follow_up` | `session_key`, `kind`, `content`, `metadata?` | `{success: bool, message_id?, error?}` |
|
||||
|
||||
`get_chat_info(chat_id)` is a separate proxied call returning at least
|
||||
`{name, type}`. Media actions follow the same envelope shape (deferred to a
|
||||
later contract revision; additive).
|
||||
|
||||
**`follow_up` (A2 capability action).** Some inbound payloads carry a credential
|
||||
that acts on the **shared** bot identity (e.g. a Discord interaction follow-up
|
||||
token). Per §6 the connector strips that at the edge and binds it in its
|
||||
capability vault keyed by the session; it **never reaches the gateway**. To use
|
||||
it, the gateway issues `follow_up` naming the **session it is already in**
|
||||
(`session_key`) plus the capability `kind` (e.g. `discord.interaction_token`) —
|
||||
**never a token**. The connector resolves the real value from its vault,
|
||||
enforces the tenant match (tenant B can never wield tenant A's capability), and
|
||||
egresses. `success: false` when the capability is absent/expired or the tenant
|
||||
doesn't match — the gateway has nothing to retry with, by design (a leaked
|
||||
gateway holds zero capability material). Source of truth:
|
||||
`gateway/relay/transport.py` (`send_follow_up`) + `gateway/relay/adapter.py`.
|
||||
|
||||
---
|
||||
|
||||
## 5. Interrupt (`/stop`) routing
|
||||
|
||||
- **Gateway → connector:** `send_interrupt(session_key, reason?)` egresses a
|
||||
mid-turn `/stop`. The connector MUST forward it down the socket owned by the
|
||||
gateway instance running that `session_key` (the routing invariant).
|
||||
- **Connector → gateway:** an inbound interrupt for a `session_key` is bridged
|
||||
by the adapter's `on_interrupt(session_key, chat_id)` into the existing
|
||||
per-session interrupt mechanism, cancelling exactly that turn (siblings
|
||||
untouched).
|
||||
|
||||
The interrupt rides the same per-turn bidirectional socket as inbound/outbound.
|
||||
|
||||
---
|
||||
|
||||
## 6. Trust boundary & signed-body handling (A2)
|
||||
|
||||
**The connector is the sole crypto/identity boundary. The gateway re-validates
|
||||
nothing.**
|
||||
|
||||
Webhook signatures (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt) are
|
||||
computed over exact raw bytes, and some payloads are *encrypted* with a shared
|
||||
secret. The connector fronts a **shared** bot for many tenants and holds every
|
||||
tenant's platform secrets, so it:
|
||||
|
||||
- **verifies / decrypts at the edge** (the only place the secrets live),
|
||||
- **normalizes** the payload into a tenant-scoped `MessageEvent` (§3),
|
||||
- **strips any shared-identity capability** out of the payload and binds it in
|
||||
its capability vault, keyed by the session (see §4 `follow_up`),
|
||||
- **forwards only the sanitized `MessageEvent`** — never the raw signed body.
|
||||
|
||||
The gateway therefore performs **no** platform signature/crypto verification on
|
||||
the relay path; it trusts the normalized event. This is an enforced invariant on
|
||||
the gateway side (`tests/gateway/relay/test_relay_sheds_crypto.py`: the relay
|
||||
package imports/calls no platform-crypto).
|
||||
|
||||
**Why not "forward the signed body byte-for-byte so the gateway re-validates"?**
|
||||
That earlier model is incoherent under an untrusted, disposable tenant gateway:
|
||||
|
||||
- Re-validating Twilio HMAC / WeCom crypto would require handing the gateway the
|
||||
**shared signing secret** — which is itself the leak, and on a shared bot it's
|
||||
a *cross-tenant* leak.
|
||||
- WeCom payloads are encrypted with the shared secret; the connector must decrypt
|
||||
at the edge just to route, so forwarding ciphertext would again require giving
|
||||
the gateway the secret.
|
||||
- A Discord interaction token lives **inside** the signed JSON body — you cannot
|
||||
both preserve the bytes and strip the credential; they are the same bytes.
|
||||
|
||||
So byte-preservation is abandoned deliberately: the connector re-serializes the
|
||||
sanitized event and the gateway trusts it. This also unifies the passthrough and
|
||||
relay planes — both are "verify at the edge → emit a normalized event," differing
|
||||
only in transport. See `docs/capability-trust-boundary.md` (connector repo:
|
||||
`gateway-gateway`) for the full A2 rationale and the connector-side vault.
|
||||
|
||||
---
|
||||
|
||||
## 7. Versioning policy
|
||||
|
||||
- `contract_version` is an int; bump **only** for additive changes during the
|
||||
experimental phase (new optional fields, new `op`s).
|
||||
- A breaking change (renamed/removed field, changed semantics) requires a
|
||||
coordinated update of both repos and a version bump.
|
||||
- The connector's first PR references the commit SHA of this file it implements
|
||||
against.
|
||||
@@ -163,6 +163,7 @@ class Platform(Enum):
|
||||
BLUEBUBBLES = "bluebubbles"
|
||||
QQBOT = "qqbot"
|
||||
YUANBAO = "yuanbao"
|
||||
RELAY = "relay" # generic relay adapter fronted by the connector (EXPERIMENTAL)
|
||||
@classmethod
|
||||
def _missing_(cls, value):
|
||||
"""Accept unknown platform names only for known plugin adapters.
|
||||
@@ -488,6 +489,13 @@ _PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] =
|
||||
(cfg.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID"))
|
||||
and (cfg.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET"))
|
||||
),
|
||||
# Relay dials OUT to a connector; it is "connected" once an endpoint URL is
|
||||
# configured (extra["relay_url"] or extra["url"]). The capability descriptor
|
||||
# is negotiated at handshake time, so the URL is the only config-level
|
||||
# signal in the experimental phase. EXPERIMENTAL — may change.
|
||||
Platform.RELAY: lambda cfg: bool(
|
||||
cfg.extra.get("relay_url") or cfg.extra.get("url")
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
||||
112
gateway/relay/__init__.py
Normal file
112
gateway/relay/__init__.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""Relay/connector support package for the Hermes gateway.
|
||||
|
||||
EXPERIMENTAL. This package implements the gateway side of the "Gateway Gateway"
|
||||
relay design: a generic ``RelayAdapter`` plus the wire-serializable
|
||||
``CapabilityDescriptor`` the connector hands it at handshake time, and the
|
||||
production ``WebSocketRelayTransport`` that dials the connector. The public API
|
||||
(module names, descriptor field set, transport protocol) MAY CHANGE without a
|
||||
deprecation cycle until at least two real Class-1 platforms (Discord + Telegram)
|
||||
have shaken out the schema.
|
||||
|
||||
See ``docs/relay-connector-contract.md`` for the formal cross-repo interface.
|
||||
|
||||
Activation is driven by configuration, not a separate feature flag: the relay
|
||||
platform is registered when a connector relay URL is configured
|
||||
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml). Deployments
|
||||
that don't set it are unaffected — exactly the same shape as ``gateway.proxy_url``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def relay_url() -> Optional[str]:
|
||||
"""The connector relay endpoint URL, or None when relay is not configured.
|
||||
|
||||
Checks ``GATEWAY_RELAY_URL`` (convenient for Docker) first, then
|
||||
``gateway.relay_url`` in config.yaml. A non-empty value activates the relay
|
||||
platform; absence means a normal direct/single-tenant gateway.
|
||||
"""
|
||||
url = os.environ.get("GATEWAY_RELAY_URL", "").strip()
|
||||
if url:
|
||||
return url.rstrip("/")
|
||||
try:
|
||||
from gateway.run import _load_gateway_config # late import to avoid cycle
|
||||
|
||||
cfg = _load_gateway_config()
|
||||
url = (cfg.get("gateway") or {}).get("relay_url", "").strip()
|
||||
if url:
|
||||
return url.rstrip("/")
|
||||
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def relay_platform_identity() -> tuple[str, str]:
|
||||
"""Platform + bot id this gateway fronts over the relay (for the handshake hello).
|
||||
|
||||
Defaults to ``("relay", "")``; overridable via ``GATEWAY_RELAY_PLATFORM`` /
|
||||
``GATEWAY_RELAY_BOT_ID`` so one connector can front several platforms.
|
||||
"""
|
||||
platform = os.environ.get("GATEWAY_RELAY_PLATFORM", "relay").strip() or "relay"
|
||||
bot_id = os.environ.get("GATEWAY_RELAY_BOT_ID", "").strip()
|
||||
return platform, bot_id
|
||||
|
||||
|
||||
def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool:
|
||||
"""Register the generic ``relay`` platform via the platform registry.
|
||||
|
||||
Registers when a relay URL is configured (or ``force=True`` for tests, which
|
||||
builds a transport-less adapter — the unit-test posture). Returns True if
|
||||
registration happened. Additive: uses the same registry path as plugin
|
||||
adapters, so no core dispatch changes are needed.
|
||||
|
||||
When a URL is present the factory builds a live ``WebSocketRelayTransport``;
|
||||
the ``RelayAdapter`` negotiates the real ``CapabilityDescriptor`` at
|
||||
``connect()`` time via ``transport.handshake()``.
|
||||
"""
|
||||
resolved_url = url if url is not None else relay_url()
|
||||
if not (force or resolved_url):
|
||||
return False
|
||||
|
||||
from gateway.platform_registry import PlatformEntry, platform_registry
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
platform, bot_id = relay_platform_identity()
|
||||
|
||||
def _factory(config):
|
||||
# Placeholder descriptor; replaced by the negotiated one at connect time
|
||||
# when a transport is present. With no URL (force/test) the adapter is
|
||||
# transport-less and keeps the placeholder.
|
||||
placeholder = CapabilityDescriptor(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform=platform,
|
||||
label="Relay",
|
||||
max_message_length=4096,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=False,
|
||||
markdown_dialect="plain",
|
||||
len_unit="chars",
|
||||
)
|
||||
transport = None
|
||||
if resolved_url:
|
||||
from gateway.relay.ws_transport import WebSocketRelayTransport
|
||||
|
||||
transport = WebSocketRelayTransport(resolved_url, platform, bot_id)
|
||||
return RelayAdapter(config, placeholder, transport=transport)
|
||||
|
||||
platform_registry.register(
|
||||
PlatformEntry(
|
||||
name="relay",
|
||||
label="Relay",
|
||||
adapter_factory=_factory,
|
||||
check_fn=lambda: True,
|
||||
source="builtin",
|
||||
emoji="\U0001f50c",
|
||||
)
|
||||
)
|
||||
return True
|
||||
178
gateway/relay/adapter.py
Normal file
178
gateway/relay/adapter.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""RelayAdapter — one generic gateway adapter fronted by the connector. EXPERIMENTAL.
|
||||
|
||||
A single ``BasePlatformAdapter`` subclass that, at handshake, receives a
|
||||
``CapabilityDescriptor`` from the connector telling it which platform it is
|
||||
fronting and which capabilities to advertise to the ``GatewayStreamConsumer``.
|
||||
It implements the four abstract methods (``connect`` / ``disconnect`` / ``send``
|
||||
/ ``get_chat_info``) plus the capability surface (``MAX_MESSAGE_LENGTH``,
|
||||
``message_len_fn``, ``supports_draft_streaming``) by delegating wire I/O to an
|
||||
injected transport and reading capabilities off the descriptor.
|
||||
|
||||
There is NO per-platform gateway code: the connector is the only side that knows
|
||||
"this chat_id maps to a Discord channel, send it via the Discord websocket."
|
||||
The gateway sees an ordinary ``MessageEvent`` in and calls ``adapter.send`` out.
|
||||
|
||||
EXPERIMENTAL: the transport protocol and descriptor schema may change without a
|
||||
deprecation cycle until >=2 Class-1 platforms validate them.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter, SendResult
|
||||
from gateway.relay.descriptor import CapabilityDescriptor
|
||||
from gateway.relay.transport import RelayTransport
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _utf16_len(text: str) -> int:
|
||||
"""Count UTF-16 code units (Telegram's length unit)."""
|
||||
return len(text.encode("utf-16-le")) // 2
|
||||
|
||||
|
||||
# Table-driven length-unit selection from the descriptor's ``len_unit``.
|
||||
_LEN_FNS: Dict[str, Callable[[str], int]] = {
|
||||
"chars": len,
|
||||
"utf16": _utf16_len,
|
||||
}
|
||||
|
||||
|
||||
class RelayAdapter(BasePlatformAdapter):
|
||||
"""Generic relay adapter advertising a connector-negotiated capability profile."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: PlatformConfig,
|
||||
descriptor: CapabilityDescriptor,
|
||||
transport: Optional[RelayTransport] = None,
|
||||
) -> None:
|
||||
# The relay adapter fronts many platforms but presents as a single
|
||||
# logical platform to the runner; Platform.RELAY identifies it.
|
||||
super().__init__(config, Platform.RELAY)
|
||||
self.descriptor = descriptor
|
||||
self._transport = transport
|
||||
# Capability surface read by stream_consumer (getattr(..., 4096)).
|
||||
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
|
||||
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
|
||||
|
||||
# ── capability surface (from descriptor) ─────────────────────────────
|
||||
@property
|
||||
def message_len_fn(self) -> Callable[[str], int]:
|
||||
return _LEN_FNS.get(self.descriptor.len_unit, len)
|
||||
|
||||
def supports_draft_streaming(
|
||||
self,
|
||||
chat_type: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> bool:
|
||||
return self.descriptor.supports_draft_streaming
|
||||
|
||||
# ── abstract methods (delegated to the transport) ────────────────────
|
||||
async def connect(self) -> bool:
|
||||
if self._transport is None:
|
||||
raise RuntimeError("RelayAdapter has no transport configured")
|
||||
self._transport.set_inbound_handler(self._on_inbound)
|
||||
ok = await self._transport.connect()
|
||||
if not ok:
|
||||
return False
|
||||
# Negotiate the real capability descriptor from the connector and adopt
|
||||
# it — the placeholder passed at construction is replaced by what the
|
||||
# connector advertises for the platform this gateway actually fronts.
|
||||
try:
|
||||
descriptor = await self._transport.handshake()
|
||||
except Exception as exc: # noqa: BLE001 - a failed handshake = a failed connect
|
||||
logger.warning("relay handshake failed: %s", exc)
|
||||
return False
|
||||
self._apply_descriptor(descriptor)
|
||||
return True
|
||||
|
||||
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
|
||||
"""Adopt a (re)negotiated descriptor into the live capability surface."""
|
||||
self.descriptor = descriptor
|
||||
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
|
||||
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
|
||||
|
||||
async def _on_inbound(self, event) -> None:
|
||||
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
|
||||
await self.handle_message(event)
|
||||
|
||||
async def on_interrupt(self, session_key: str, chat_id: str) -> None:
|
||||
"""Bridge a connector-delivered /stop into the adapter's interrupt path.
|
||||
|
||||
The connector forwards a mid-turn interrupt down the socket owned by
|
||||
the gateway instance running ``session_key``; this routes it to the
|
||||
existing per-session interrupt mechanism (sets the
|
||||
``_active_sessions[session_key]`` Event and clears typing), cancelling
|
||||
the right turn without touching sibling sessions.
|
||||
"""
|
||||
await self.interrupt_session_activity(session_key, chat_id)
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
if self._transport is not None:
|
||||
await self._transport.disconnect()
|
||||
|
||||
async def send(
|
||||
self,
|
||||
chat_id: str,
|
||||
content: str,
|
||||
reply_to: Optional[str] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
if self._transport is None:
|
||||
return SendResult(success=False, error="no transport")
|
||||
result = await self._transport.send_outbound(
|
||||
{
|
||||
"op": "send",
|
||||
"chat_id": chat_id,
|
||||
"content": content,
|
||||
"reply_to": reply_to,
|
||||
"metadata": metadata or {},
|
||||
}
|
||||
)
|
||||
return SendResult(
|
||||
success=bool(result.get("success")),
|
||||
message_id=result.get("message_id"),
|
||||
error=result.get("error"),
|
||||
)
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
# Proxied to the connector (it owns the platform connection / cache).
|
||||
if self._transport is None:
|
||||
return {"name": chat_id, "type": "dm"}
|
||||
return await self._transport.get_chat_info(chat_id)
|
||||
|
||||
async def send_follow_up(
|
||||
self,
|
||||
session_key: str,
|
||||
kind: str,
|
||||
content: str,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
) -> SendResult:
|
||||
"""Send via a shared-identity capability bound to a session (A2 outbound).
|
||||
|
||||
The gateway never holds the credential: it names the session it is
|
||||
already in plus the capability ``kind``, and the connector resolves the
|
||||
real value from its vault and egresses (enforcing the tenant match). Used
|
||||
e.g. to post a Discord interaction follow-up as the shared bot without
|
||||
the token ever reaching the gateway. See RelayTransport.send_follow_up.
|
||||
"""
|
||||
if self._transport is None:
|
||||
return SendResult(success=False, error="no transport")
|
||||
result = await self._transport.send_follow_up(
|
||||
{
|
||||
"op": "follow_up",
|
||||
"session_key": session_key,
|
||||
"kind": kind,
|
||||
"content": content,
|
||||
"metadata": metadata or {},
|
||||
}
|
||||
)
|
||||
return SendResult(
|
||||
success=bool(result.get("success")),
|
||||
message_id=result.get("message_id"),
|
||||
error=result.get("error"),
|
||||
)
|
||||
118
gateway/relay/descriptor.py
Normal file
118
gateway/relay/descriptor.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""CapabilityDescriptor — the relay handshake payload. EXPERIMENTAL.
|
||||
|
||||
The connector hands a ``CapabilityDescriptor`` to the gateway's ``RelayAdapter``
|
||||
at handshake time; it tells the adapter which platform it is fronting and which
|
||||
capabilities to advertise to the ``GatewayStreamConsumer`` (char limit,
|
||||
draft-streaming, edit/threading support, markdown dialect, length unit). It is
|
||||
the linchpin of the generalization: one gateway adapter serves Discord,
|
||||
Telegram, Matrix, Signal, ... without per-platform branching.
|
||||
|
||||
EXPERIMENTAL: this schema MAY CHANGE without a deprecation cycle until at least
|
||||
two real Class-1 platforms have validated it. Evolution during the experimental
|
||||
phase is additive-only, gated by ``contract_version`` (see
|
||||
docs/relay-connector-contract.md).
|
||||
|
||||
Field origins (most are a wire-serializable projection of ``PlatformEntry`` plus
|
||||
the per-instance capability methods on ``BasePlatformAdapter``):
|
||||
|
||||
- ``max_message_length`` -> ``PlatformEntry.max_message_length`` / adapter
|
||||
``MAX_MESSAGE_LENGTH`` attribute (read by stream_consumer).
|
||||
- ``len_unit`` -> selects which ``message_len_fn`` the adapter installs
|
||||
("chars" = builtin len; "utf16" = Telegram-style UTF-16 code-unit counting).
|
||||
- ``supports_draft_streaming`` -> adapter ``supports_draft_streaming()`` probe.
|
||||
- ``supports_edit`` -> whether edit-based streaming is possible (Discord/
|
||||
Telegram yes; Signal/SMS no -> consumer degrades to one-message-per-segment).
|
||||
- ``supports_threads`` -> ``create_handoff_thread`` capability flag.
|
||||
- ``markdown_dialect`` -> presentation hint (e.g. "markdown_v2", "discord").
|
||||
- ``emoji`` / ``platform_hint`` / ``pii_safe`` -> ``PlatformEntry`` fields of the
|
||||
same name.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from dataclasses import asdict, dataclass
|
||||
|
||||
# Bump additively (never reinterpret an existing field) during the experimental
|
||||
# phase; a breaking change requires updating both repos in lockstep.
|
||||
CONTRACT_VERSION = 1
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class CapabilityDescriptor:
|
||||
"""Immutable capability descriptor negotiated at relay handshake.
|
||||
|
||||
Frozen so a descriptor cannot be mutated after handshake — the adapter
|
||||
advertises a fixed capability profile for the life of the connection.
|
||||
"""
|
||||
|
||||
contract_version: int
|
||||
platform: str
|
||||
label: str
|
||||
max_message_length: int
|
||||
supports_draft_streaming: bool
|
||||
supports_edit: bool
|
||||
supports_threads: bool
|
||||
markdown_dialect: str
|
||||
len_unit: str # "chars" | "utf16"
|
||||
emoji: str = "\U0001f50c" # 🔌 default (matches PlatformEntry default)
|
||||
platform_hint: str = ""
|
||||
pii_safe: bool = False
|
||||
|
||||
def to_json(self) -> str:
|
||||
"""Serialize to a compact, stable JSON string for the handshake frame."""
|
||||
return json.dumps(asdict(self), sort_keys=True, ensure_ascii=False)
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, data: str) -> "CapabilityDescriptor":
|
||||
"""Deserialize from a handshake JSON string.
|
||||
|
||||
Unknown keys are ignored (forward-compat: a newer connector may send
|
||||
fields this gateway does not know yet); missing optional keys fall back
|
||||
to dataclass defaults.
|
||||
"""
|
||||
raw = json.loads(data)
|
||||
known = {f for f in cls.__dataclass_fields__} # type: ignore[attr-defined]
|
||||
filtered = {k: v for k, v in raw.items() if k in known}
|
||||
return cls(**filtered)
|
||||
|
||||
@classmethod
|
||||
def from_platform_entry(
|
||||
cls,
|
||||
entry,
|
||||
*,
|
||||
len_unit: str = "chars",
|
||||
supports_draft_streaming: bool = False,
|
||||
supports_edit: bool = True,
|
||||
supports_threads: bool = False,
|
||||
markdown_dialect: str = "plain",
|
||||
) -> "CapabilityDescriptor":
|
||||
"""Project a ``gateway.platform_registry.PlatformEntry`` into a descriptor.
|
||||
|
||||
Demonstrates the descriptor is a *subset/projection* of what
|
||||
``PlatformEntry`` already encodes, not a parallel concept: ``label``,
|
||||
``max_message_length``, ``emoji``, ``platform_hint``, ``pii_safe`` and
|
||||
the platform name come straight off the entry. The runtime capability
|
||||
bits that ``PlatformEntry`` does NOT encode (length unit, draft/edit/
|
||||
thread/markdown behavior) are supplied by the caller — in production
|
||||
the connector fills these from the live adapter's capability methods.
|
||||
|
||||
``max_message_length`` of 0 on a ``PlatformEntry`` means "no limit";
|
||||
we map that to the stream_consumer default of 4096 so the descriptor
|
||||
always carries a concrete chunking bound.
|
||||
"""
|
||||
max_len = getattr(entry, "max_message_length", 0) or 4096
|
||||
return cls(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform=entry.name,
|
||||
label=entry.label,
|
||||
max_message_length=max_len,
|
||||
supports_draft_streaming=supports_draft_streaming,
|
||||
supports_edit=supports_edit,
|
||||
supports_threads=supports_threads,
|
||||
markdown_dialect=markdown_dialect,
|
||||
len_unit=len_unit,
|
||||
emoji=getattr(entry, "emoji", "\U0001f50c"),
|
||||
platform_hint=getattr(entry, "platform_hint", ""),
|
||||
pii_safe=getattr(entry, "pii_safe", False),
|
||||
)
|
||||
101
gateway/relay/transport.py
Normal file
101
gateway/relay/transport.py
Normal file
@@ -0,0 +1,101 @@
|
||||
"""Relay transport protocol — the gateway<->connector wire contract. EXPERIMENTAL.
|
||||
|
||||
The ``RelayAdapter`` (gateway side) delegates all wire I/O to a ``RelayTransport``.
|
||||
The gateway dials OUT to the connector, so a production transport is a WebSocket
|
||||
client; in tests it is an in-memory stub (``tests/gateway/relay/stub_connector.py``).
|
||||
|
||||
This module defines the protocol surface only — no concrete transport. The
|
||||
contract has four concerns:
|
||||
|
||||
1. Lifecycle: ``connect`` / ``disconnect``.
|
||||
2. Handshake: ``handshake`` returns the ``CapabilityDescriptor`` the connector
|
||||
advertises for the platform this adapter fronts.
|
||||
3. Inbound: ``set_inbound_handler`` registers a callback the transport invokes
|
||||
with each normalized ``MessageEvent`` the connector delivers.
|
||||
4. Outbound: ``send_outbound`` carries send/edit/typing actions back to the
|
||||
connector; ``get_chat_info`` proxies a chat-info lookup; ``send_interrupt``
|
||||
routes a mid-turn /stop down the socket that owns the session_key.
|
||||
|
||||
EXPERIMENTAL: may change without a deprecation cycle until >=2 Class-1 platforms
|
||||
validate it. See docs/relay-connector-contract.md.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, runtime_checkable
|
||||
|
||||
from gateway.platforms.base import MessageEvent
|
||||
from gateway.relay.descriptor import CapabilityDescriptor
|
||||
|
||||
# Callback the transport invokes for each inbound normalized event.
|
||||
InboundHandler = Callable[[MessageEvent], Awaitable[None]]
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class RelayTransport(Protocol):
|
||||
"""Full gateway<->connector transport contract."""
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Open the connection to the connector; return True on success."""
|
||||
...
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Close the connection."""
|
||||
...
|
||||
|
||||
async def handshake(self) -> CapabilityDescriptor:
|
||||
"""Return the capability descriptor the connector advertises."""
|
||||
...
|
||||
|
||||
def set_inbound_handler(self, handler: InboundHandler) -> None:
|
||||
"""Register the callback invoked with each inbound MessageEvent."""
|
||||
...
|
||||
|
||||
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Carry an outbound action (send/edit/typing) to the connector.
|
||||
|
||||
Returns a result dict; for ``op == "send"`` it carries
|
||||
``success`` and optionally ``message_id`` / ``error``.
|
||||
"""
|
||||
...
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
"""Proxy a chat-info lookup to the connector."""
|
||||
...
|
||||
|
||||
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
|
||||
"""Route a mid-turn /stop to the connector for ``session_key``.
|
||||
|
||||
The connector forwards it down the socket owned by the gateway
|
||||
instance running that session (the /stop routing invariant). On the
|
||||
gateway side this is the OUTBOUND direction; the actual task
|
||||
cancellation happens when the connector echoes an interrupt inbound
|
||||
(handled in Task 1.4).
|
||||
"""
|
||||
...
|
||||
|
||||
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Act on a shared-identity capability bound to a session (A2 outbound).
|
||||
|
||||
Some platforms hand the connector a credential that acts on the SHARED
|
||||
bot identity (e.g. a Discord interaction follow-up token, valid ~15min).
|
||||
Under A2 that credential NEVER reaches the gateway — the connector
|
||||
stripped it at the edge and bound it in its capability vault keyed by
|
||||
the session. To use it, the gateway issues a SEMANTIC action against the
|
||||
session it is already in; it never names or holds a token.
|
||||
|
||||
The action dict carries:
|
||||
``op`` == ``"follow_up"``
|
||||
``session_key`` the session whose bound capability to wield
|
||||
``kind`` the capability kind (e.g. ``"discord.interaction_token"``)
|
||||
``content`` the message content to send via that capability
|
||||
``metadata?`` optional extras
|
||||
|
||||
The connector resolves the real capability (``resolveOutboundCapability``
|
||||
on its side), enforces the tenant match (tenant B can never wield tenant
|
||||
A's capability), and egresses. Returns ``{success, message_id?, error?}``;
|
||||
``success`` is False when the capability is absent/expired or the tenant
|
||||
doesn't match — the gateway then has nothing to retry with (by design: a
|
||||
leaked gateway holds zero capability material).
|
||||
"""
|
||||
...
|
||||
267
gateway/relay/ws_transport.py
Normal file
267
gateway/relay/ws_transport.py
Normal file
@@ -0,0 +1,267 @@
|
||||
"""Production WebSocket RelayTransport — the gateway's live link to the connector.
|
||||
|
||||
The gateway dials OUT to the connector's relay endpoint over a WebSocket and
|
||||
speaks the newline-delimited JSON frame protocol defined in the connector repo
|
||||
(``gateway-gateway`` ``src/relay/protocol.ts``) and mirrored in
|
||||
``docs/relay-connector-contract.md``:
|
||||
|
||||
gateway -> connector : hello, outbound, interrupt
|
||||
connector -> gateway : descriptor, inbound, outbound_result, interrupt_inbound
|
||||
|
||||
Frames:
|
||||
hello {type, platform, botId}
|
||||
descriptor {type, descriptor} (handshake reply)
|
||||
inbound {type, event, bufferId?} (a normalized MessageEvent)
|
||||
outbound {type, requestId, action} (send/edit/typing/follow_up)
|
||||
outbound_result {type, requestId, result}
|
||||
interrupt {type, session_key, reason?} (gateway egresses /stop)
|
||||
interrupt_inbound{type, session_key, chat_id} (connector -> owning gateway)
|
||||
|
||||
This is the concrete transport behind the ``RelayTransport`` Protocol; the
|
||||
``RelayAdapter`` delegates all wire I/O to it. Outbound calls block on a
|
||||
per-request future keyed by ``requestId`` until the matching ``outbound_result``
|
||||
arrives. A background reader task pumps inbound frames to the registered handler
|
||||
and resolves pending outbound futures.
|
||||
|
||||
EXPERIMENTAL: the frame schema may change without a deprecation cycle until at
|
||||
least two Class-1 platforms validate it.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.session import SessionSource
|
||||
from gateway.relay.descriptor import CapabilityDescriptor
|
||||
from gateway.relay.transport import InboundHandler
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try: # lazy/optional dep — mirrors gateway/platforms/feishu.py
|
||||
import websockets
|
||||
except ImportError: # pragma: no cover - exercised only when the extra is absent
|
||||
websockets = None # type: ignore[assignment]
|
||||
|
||||
WEBSOCKETS_AVAILABLE = websockets is not None
|
||||
|
||||
# How long to wait for the handshake descriptor and for each outbound result.
|
||||
_HANDSHAKE_TIMEOUT_S = 30.0
|
||||
_OUTBOUND_TIMEOUT_S = 30.0
|
||||
|
||||
|
||||
def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent:
|
||||
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
|
||||
|
||||
The connector emits SessionSource as the snake_case wire form (§3); map it
|
||||
back onto the gateway dataclasses. Unknown message types fall back to TEXT.
|
||||
"""
|
||||
src = raw.get("source", {}) or {}
|
||||
from gateway.config import Platform
|
||||
|
||||
platform = src.get("platform", "relay")
|
||||
try:
|
||||
platform_enum = Platform(platform)
|
||||
except ValueError:
|
||||
platform_enum = Platform.RELAY
|
||||
|
||||
source = SessionSource(
|
||||
platform=platform_enum,
|
||||
chat_id=src.get("chat_id", ""),
|
||||
chat_type=src.get("chat_type", "dm"),
|
||||
chat_name=src.get("chat_name"),
|
||||
user_id=src.get("user_id"),
|
||||
user_name=src.get("user_name"),
|
||||
thread_id=src.get("thread_id"),
|
||||
chat_topic=src.get("chat_topic"),
|
||||
user_id_alt=src.get("user_id_alt"),
|
||||
chat_id_alt=src.get("chat_id_alt"),
|
||||
guild_id=src.get("guild_id"),
|
||||
parent_chat_id=src.get("parent_chat_id"),
|
||||
message_id=src.get("message_id"),
|
||||
)
|
||||
try:
|
||||
msg_type = MessageType(raw.get("message_type", "text"))
|
||||
except ValueError:
|
||||
msg_type = MessageType.TEXT
|
||||
|
||||
return MessageEvent(
|
||||
text=raw.get("text", ""),
|
||||
message_type=msg_type,
|
||||
source=source,
|
||||
message_id=raw.get("message_id"),
|
||||
reply_to_message_id=raw.get("reply_to_message_id"),
|
||||
media_urls=raw.get("media_urls") or [],
|
||||
)
|
||||
|
||||
|
||||
class WebSocketRelayTransport:
|
||||
"""RelayTransport over a WebSocket connection the gateway dials to the connector."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
url: str,
|
||||
platform: str,
|
||||
bot_id: str,
|
||||
*,
|
||||
connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S,
|
||||
outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S,
|
||||
) -> None:
|
||||
if not WEBSOCKETS_AVAILABLE:
|
||||
raise RuntimeError(
|
||||
"WebSocketRelayTransport requires the 'websockets' package "
|
||||
"(install the messaging extra)."
|
||||
)
|
||||
self._url = url
|
||||
self._platform = platform
|
||||
self._bot_id = bot_id
|
||||
self._connect_timeout_s = connect_timeout_s
|
||||
self._outbound_timeout_s = outbound_timeout_s
|
||||
|
||||
self._ws: Any = None
|
||||
self._reader: Optional[asyncio.Task[None]] = None
|
||||
self._inbound: Optional[InboundHandler] = None
|
||||
self._descriptor: Optional[CapabilityDescriptor] = None
|
||||
self._descriptor_ready: asyncio.Future[CapabilityDescriptor] | None = None
|
||||
# requestId -> future awaiting the matching outbound_result.
|
||||
self._pending: Dict[str, asyncio.Future[Dict[str, Any]]] = {}
|
||||
self._closing = False
|
||||
|
||||
# ── lifecycle ────────────────────────────────────────────────────────
|
||||
async def connect(self) -> bool:
|
||||
loop = asyncio.get_running_loop()
|
||||
self._descriptor_ready = loop.create_future()
|
||||
self._ws = await websockets.connect(self._url) # type: ignore[union-attr]
|
||||
self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader")
|
||||
# Send hello; the descriptor arrives via the reader and resolves handshake().
|
||||
await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id})
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self._closing = True
|
||||
if self._reader is not None:
|
||||
self._reader.cancel()
|
||||
try:
|
||||
await self._reader
|
||||
except (asyncio.CancelledError, Exception): # noqa: BLE001 - best-effort teardown
|
||||
pass
|
||||
self._reader = None
|
||||
if self._ws is not None:
|
||||
try:
|
||||
await self._ws.close()
|
||||
except Exception: # noqa: BLE001
|
||||
pass
|
||||
self._ws = None
|
||||
# Fail any in-flight outbound waiters so callers don't hang.
|
||||
for fut in self._pending.values():
|
||||
if not fut.done():
|
||||
fut.set_exception(RuntimeError("relay transport closed"))
|
||||
self._pending.clear()
|
||||
|
||||
async def handshake(self) -> CapabilityDescriptor:
|
||||
if self._descriptor is not None:
|
||||
return self._descriptor
|
||||
if self._descriptor_ready is None:
|
||||
raise RuntimeError("handshake() called before connect()")
|
||||
return await asyncio.wait_for(self._descriptor_ready, timeout=self._connect_timeout_s)
|
||||
|
||||
def set_inbound_handler(self, handler: InboundHandler) -> None:
|
||||
self._inbound = handler
|
||||
|
||||
# ── outbound ─────────────────────────────────────────────────────────
|
||||
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return await self._request_response(action)
|
||||
|
||||
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
# follow_up rides the same outbound frame; the connector dispatches by
|
||||
# action.op. Kept as a distinct method to satisfy the transport Protocol
|
||||
# and to make the A2 call site explicit.
|
||||
return await self._request_response(action)
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
result = await self._request_response(
|
||||
{"op": "get_chat_info", "chat_id": chat_id}, frame_type="outbound"
|
||||
)
|
||||
# The connector answers chat-info inside the outbound_result envelope.
|
||||
info = result.get("chat_info") or result
|
||||
return {"name": info.get("name", chat_id), "type": info.get("type", "dm")}
|
||||
|
||||
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
|
||||
await self._send({"type": "interrupt", "session_key": session_key, "reason": reason})
|
||||
|
||||
async def _request_response(
|
||||
self, action: Dict[str, Any], frame_type: str = "outbound"
|
||||
) -> Dict[str, Any]:
|
||||
if self._ws is None:
|
||||
return {"success": False, "error": "relay transport not connected"}
|
||||
request_id = uuid.uuid4().hex
|
||||
loop = asyncio.get_running_loop()
|
||||
fut: asyncio.Future[Dict[str, Any]] = loop.create_future()
|
||||
self._pending[request_id] = fut
|
||||
try:
|
||||
await self._send({"type": frame_type, "requestId": request_id, "action": action})
|
||||
return await asyncio.wait_for(fut, timeout=self._outbound_timeout_s)
|
||||
except asyncio.TimeoutError:
|
||||
return {"success": False, "error": "relay outbound timed out"}
|
||||
finally:
|
||||
self._pending.pop(request_id, None)
|
||||
|
||||
# ── wire I/O ─────────────────────────────────────────────────────────
|
||||
async def _send(self, frame: Dict[str, Any]) -> None:
|
||||
if self._ws is None:
|
||||
raise RuntimeError("relay transport not connected")
|
||||
await self._ws.send(json.dumps(frame) + "\n")
|
||||
|
||||
async def _read_loop(self) -> None:
|
||||
assert self._ws is not None
|
||||
buf = ""
|
||||
try:
|
||||
async for chunk in self._ws:
|
||||
buf += chunk if isinstance(chunk, str) else chunk.decode("utf-8")
|
||||
# Newline-delimited frames; keep any trailing partial line.
|
||||
*lines, buf = buf.split("\n")
|
||||
for line in lines:
|
||||
if line.strip():
|
||||
await self._handle_frame(line)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection is caller policy
|
||||
if not self._closing:
|
||||
logger.warning("relay ws read loop ended: %s", exc)
|
||||
|
||||
async def _handle_frame(self, line: str) -> None:
|
||||
try:
|
||||
frame = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("relay: skipping malformed frame")
|
||||
return
|
||||
ftype = frame.get("type")
|
||||
if ftype == "descriptor":
|
||||
descriptor = CapabilityDescriptor.from_json(json.dumps(frame.get("descriptor", {})))
|
||||
self._descriptor = descriptor
|
||||
if self._descriptor_ready is not None and not self._descriptor_ready.done():
|
||||
self._descriptor_ready.set_result(descriptor)
|
||||
elif ftype == "inbound":
|
||||
if self._inbound is not None:
|
||||
event = _event_from_wire(frame.get("event", {}))
|
||||
await self._inbound(event)
|
||||
elif ftype == "outbound_result":
|
||||
fut = self._pending.get(frame.get("requestId", ""))
|
||||
if fut is not None and not fut.done():
|
||||
fut.set_result(frame.get("result", {}))
|
||||
elif ftype == "interrupt_inbound":
|
||||
# Bridged into the adapter's interrupt path by the runner wiring.
|
||||
handler = getattr(self, "_interrupt_inbound_handler", None)
|
||||
if handler is not None:
|
||||
await handler(frame.get("session_key", ""), frame.get("chat_id", ""))
|
||||
else:
|
||||
# hello/outbound/interrupt are gateway->connector; ignore if echoed.
|
||||
pass
|
||||
|
||||
def set_interrupt_inbound_handler(self, handler: Any) -> None:
|
||||
"""Register the callback for connector->gateway interrupt_inbound frames."""
|
||||
self._interrupt_inbound_handler = handler
|
||||
@@ -4614,6 +4614,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
"plugin discovery failed at gateway startup", exc_info=True,
|
||||
)
|
||||
|
||||
# Register the generic relay adapter when a connector relay URL is
|
||||
# configured (GATEWAY_RELAY_URL / gateway.relay_url). No URL -> no-op, so
|
||||
# direct/single-tenant deployments are unaffected. When configured, the
|
||||
# adapter dials the connector over a WebSocket, negotiates its capability
|
||||
# descriptor at handshake, and bridges inbound/outbound like any platform.
|
||||
try:
|
||||
from gateway.relay import register_relay_adapter, relay_url
|
||||
|
||||
if register_relay_adapter():
|
||||
logger.info("relay adapter registered (connector at %s)", relay_url())
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"relay adapter registration failed at gateway startup", exc_info=True,
|
||||
)
|
||||
|
||||
# Register declarative shell hooks from cli-config.yaml. Gateway
|
||||
# has no TTY, so consent has to come from one of the three opt-in
|
||||
# channels (--accept-hooks on launch, HERMES_ACCEPT_HOOKS env var,
|
||||
|
||||
0
tests/gateway/relay/__init__.py
Normal file
0
tests/gateway/relay/__init__.py
Normal file
75
tests/gateway/relay/stub_connector.py
Normal file
75
tests/gateway/relay/stub_connector.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""Test-only in-memory stub connector implementing RelayTransport.
|
||||
|
||||
MUST stay under tests/ — never under plugins/ or gateway/ (a CI guard in
|
||||
test_no_stub_leak.py asserts this). It lets Phase 1 prove the gateway side of
|
||||
the relay end-to-end with zero dependency on the real (Node) connector.
|
||||
|
||||
The stub:
|
||||
- hands back a fixed CapabilityDescriptor at handshake,
|
||||
- lets a test push synthetic inbound MessageEvents (push_inbound),
|
||||
- records every outbound action (sent/interrupts) for assertions,
|
||||
- answers get_chat_info from a small fixture map.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from gateway.platforms.base import MessageEvent
|
||||
from gateway.relay.descriptor import CapabilityDescriptor
|
||||
from gateway.relay.transport import InboundHandler
|
||||
|
||||
|
||||
class StubConnector:
|
||||
"""In-memory RelayTransport for tests."""
|
||||
|
||||
def __init__(self, descriptor: CapabilityDescriptor) -> None:
|
||||
self._descriptor = descriptor
|
||||
self._inbound: Optional[InboundHandler] = None
|
||||
self.connected = False
|
||||
self.sent: List[Dict[str, Any]] = []
|
||||
self.interrupts: List[Dict[str, Any]] = []
|
||||
self.follow_ups: List[Dict[str, Any]] = []
|
||||
self.chat_info: Dict[str, Dict[str, Any]] = {}
|
||||
# Canned result for the next send_outbound (override per-test).
|
||||
self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"}
|
||||
# Canned result for the next send_follow_up (override per-test). Default
|
||||
# mimics a resolved capability egress; set success=False to simulate an
|
||||
# absent/expired capability or a tenant mismatch on the connector side.
|
||||
self.next_follow_up_result: Dict[str, Any] = {"success": True, "message_id": "f1"}
|
||||
|
||||
async def connect(self) -> bool:
|
||||
self.connected = True
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self.connected = False
|
||||
|
||||
async def handshake(self) -> CapabilityDescriptor:
|
||||
return self._descriptor
|
||||
|
||||
def set_inbound_handler(self, handler: InboundHandler) -> None:
|
||||
self._inbound = handler
|
||||
|
||||
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
self.sent.append(action)
|
||||
if action.get("op") == "send":
|
||||
return dict(self.next_send_result)
|
||||
return {"success": True}
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
return self.chat_info.get(chat_id, {"name": chat_id, "type": "dm"})
|
||||
|
||||
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
|
||||
self.interrupts.append({"session_key": session_key, "reason": reason})
|
||||
|
||||
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
|
||||
self.follow_ups.append(action)
|
||||
return dict(self.next_follow_up_result)
|
||||
|
||||
# ── test driver ──────────────────────────────────────────────────────
|
||||
async def push_inbound(self, event: MessageEvent) -> None:
|
||||
"""Simulate the connector delivering a normalized inbound event."""
|
||||
if self._inbound is None:
|
||||
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
|
||||
await self._inbound(event)
|
||||
184
tests/gateway/relay/test_contract_doc_conformance.py
Normal file
184
tests/gateway/relay/test_contract_doc_conformance.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Cross-repo contract conformance: docs/relay-connector-contract.md ⟷ Python.
|
||||
|
||||
The contract doc is the formal interface the connector repo
|
||||
(NousResearch/gateway-gateway) implements against. The connector's TypeScript
|
||||
structs are hand-mirrored from the doc, so if the Python source of truth drifts
|
||||
from the doc, the two repos silently diverge and the handshake / session-keying
|
||||
breaks only at integration time.
|
||||
|
||||
These tests make the doc ⟷ code relationship an enforced invariant:
|
||||
|
||||
* Every ``CapabilityDescriptor`` field (§2 table) is documented with the
|
||||
correct required/optional flag, and the doc lists no fields the dataclass
|
||||
lacks.
|
||||
* Every ``SessionSource`` wire key (what ``to_dict()`` actually serializes)
|
||||
is named in the contract doc's §3 discriminator section, and every
|
||||
discriminator the doc calls out as a column header exists on the dataclass.
|
||||
|
||||
They are invariants, NOT change-detector snapshots: they assert the *relation*
|
||||
between two artifacts that must move together, not a frozen list of names. Add
|
||||
a field to the descriptor and the doc, and the test stays green; add it to only
|
||||
one, and CI fails — which is exactly the lockstep guarantee the plan's
|
||||
Cross-Repo Coordination Checklist calls for.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.relay.descriptor import CapabilityDescriptor
|
||||
from gateway.session import SessionSource
|
||||
|
||||
# Repo root: tests/gateway/relay/ -> repo root is parents[3]
|
||||
_CONTRACT_DOC = (
|
||||
Path(__file__).resolve().parents[3] / "docs" / "relay-connector-contract.md"
|
||||
)
|
||||
|
||||
|
||||
def _doc_text() -> str:
|
||||
assert _CONTRACT_DOC.exists(), (
|
||||
f"Contract doc missing at {_CONTRACT_DOC}. It is the formal cross-repo "
|
||||
f"interface (Phase 1, Task 1.5) and must ship with the relay adapter."
|
||||
)
|
||||
return _CONTRACT_DOC.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def _parse_descriptor_table(text: str) -> dict[str, bool]:
|
||||
"""Parse §2's markdown table → {field_name: required}.
|
||||
|
||||
Rows look like: ``| `field` | type | yes|no | meaning |``. Returns a map of
|
||||
field name to whether the Required column says "yes".
|
||||
"""
|
||||
fields: dict[str, bool] = {}
|
||||
# Restrict to the §2 section so §3/§4 tables don't bleed in.
|
||||
section = text.split("## 2. CapabilityDescriptor", 1)[-1].split("## 3.", 1)[0]
|
||||
row_re = re.compile(r"^\|\s*`([a-z_]+)`\s*\|[^|]*\|\s*(yes|no)\s*\|", re.M)
|
||||
for name, required in row_re.findall(section):
|
||||
fields[name] = required.strip() == "yes"
|
||||
return fields
|
||||
|
||||
|
||||
def test_descriptor_fields_match_contract_doc():
|
||||
"""§2 table ⟷ CapabilityDescriptor dataclass, names + required/optional."""
|
||||
documented = _parse_descriptor_table(_doc_text())
|
||||
assert documented, "Failed to parse any descriptor fields from the §2 table."
|
||||
|
||||
dc_fields = CapabilityDescriptor.__dataclass_fields__ # type: ignore[attr-defined]
|
||||
# A dataclass field is "required" iff it has no default and no default_factory.
|
||||
import dataclasses
|
||||
|
||||
code_required = {
|
||||
name
|
||||
for name, f in dc_fields.items()
|
||||
if f.default is dataclasses.MISSING
|
||||
and f.default_factory is dataclasses.MISSING # type: ignore[misc]
|
||||
}
|
||||
code_names = set(dc_fields.keys())
|
||||
doc_names = set(documented.keys())
|
||||
|
||||
missing_from_doc = code_names - doc_names
|
||||
assert not missing_from_doc, (
|
||||
f"CapabilityDescriptor fields missing from the §2 contract-doc table: "
|
||||
f"{sorted(missing_from_doc)}. Document them so the connector mirrors them."
|
||||
)
|
||||
extra_in_doc = doc_names - code_names
|
||||
assert not extra_in_doc, (
|
||||
f"Contract-doc §2 table documents fields the dataclass does not have: "
|
||||
f"{sorted(extra_in_doc)}. Remove them or add them to descriptor.py."
|
||||
)
|
||||
|
||||
# Required/optional must agree, so the connector knows which fields it may omit.
|
||||
for name, doc_required in documented.items():
|
||||
assert doc_required == (name in code_required), (
|
||||
f"Field '{name}': contract doc says required={doc_required}, but the "
|
||||
f"dataclass says required={name in code_required}. Reconcile them."
|
||||
)
|
||||
|
||||
|
||||
def _session_source_wire_keys() -> set[str]:
|
||||
"""Keys ``SessionSource.to_dict()`` can emit (the actual wire surface).
|
||||
|
||||
Build a maximally-populated source so conditionally-included keys (the
|
||||
``if self.x:`` branches in ``to_dict``) all appear.
|
||||
"""
|
||||
from gateway.config import Platform
|
||||
|
||||
src = SessionSource(
|
||||
platform=Platform.DISCORD,
|
||||
chat_id="c",
|
||||
chat_name="n",
|
||||
chat_type="channel",
|
||||
user_id="u",
|
||||
user_name="un",
|
||||
thread_id="t",
|
||||
chat_topic="topic",
|
||||
user_id_alt="ua",
|
||||
chat_id_alt="ca",
|
||||
guild_id="g",
|
||||
parent_chat_id="p",
|
||||
message_id="m",
|
||||
)
|
||||
return set(src.to_dict().keys())
|
||||
|
||||
|
||||
def test_session_source_wire_keys_documented_in_contract():
|
||||
"""Every wire key SessionSource.to_dict() emits is named in the contract doc.
|
||||
|
||||
The doc enumerates discriminators in prose + a per-platform table (§3) rather
|
||||
than a strict field table, so this asserts presence-by-name: a wire key the
|
||||
connector must populate but which appears nowhere in the doc is a silent gap.
|
||||
"""
|
||||
text = _doc_text()
|
||||
# Limit to §3 (the MessageEvent / SessionSource section).
|
||||
section = text.split("## 3. Inbound", 1)[-1].split("## 4.", 1)[0]
|
||||
wire_keys = _session_source_wire_keys()
|
||||
|
||||
# Keys that are self-evidently covered by the §3 narrative/table.
|
||||
# We assert each wire key appears as a backticked token or table cell.
|
||||
undocumented = sorted(k for k in wire_keys if k not in section)
|
||||
assert not undocumented, (
|
||||
f"SessionSource wire keys absent from the §3 contract-doc section: "
|
||||
f"{undocumented}. The connector normalizes events into these keys; if the "
|
||||
f"doc doesn't name them the connector author can't know to populate them. "
|
||||
f"Document them (prose or the discriminator table)."
|
||||
)
|
||||
|
||||
|
||||
def test_internal_only_session_fields_stay_off_the_wire():
|
||||
"""Guard the inverse: fields deliberately NOT serialized must not leak.
|
||||
|
||||
``is_bot`` is an internal author-classification flag that today is NOT in
|
||||
``to_dict()`` (so the connector's TS contract correctly omits it). If someone
|
||||
adds it to the wire without updating the contract doc + connector, this flips
|
||||
and forces the conversation. This documents the intentional omission.
|
||||
"""
|
||||
wire_keys = _session_source_wire_keys()
|
||||
assert "is_bot" not in wire_keys, (
|
||||
"is_bot is now serialized by SessionSource.to_dict(). If this is "
|
||||
"intentional, add it to docs/relay-connector-contract.md §3 and the "
|
||||
"connector's SessionSource interface, then update this guard."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("discriminator", ["chat_id", "chat_type", "user_id", "thread_id", "guild_id"])
|
||||
def test_discord_telegram_discriminator_columns_present(discriminator):
|
||||
"""§3's per-platform table headers must exist as SessionSource fields.
|
||||
|
||||
These five columns drive build_session_key() and are the #1 High-severity
|
||||
risk surface (Discord guild_id collision). If the doc advertises a
|
||||
discriminator column the dataclass can't carry, the connector has nowhere to
|
||||
put it.
|
||||
"""
|
||||
assert discriminator in SessionSource.__dataclass_fields__, ( # type: ignore[attr-defined]
|
||||
f"Contract doc §3 lists '{discriminator}' as a session discriminator, "
|
||||
f"but SessionSource has no such field."
|
||||
)
|
||||
# And it must be reachable on the wire (chat_type is always emitted; the rest
|
||||
# are conditional but still possible keys).
|
||||
assert discriminator in _session_source_wire_keys(), (
|
||||
f"Discriminator '{discriminator}' never appears in SessionSource.to_dict() "
|
||||
f"output — the connector cannot transmit it to the gateway."
|
||||
)
|
||||
66
tests/gateway/relay/test_descriptor.py
Normal file
66
tests/gateway/relay/test_descriptor.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""Tests for the experimental CapabilityDescriptor (relay Phase 0, Task 0.2)."""
|
||||
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
|
||||
def _telegram_descriptor(**overrides) -> CapabilityDescriptor:
|
||||
base = dict(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="telegram",
|
||||
label="Telegram",
|
||||
max_message_length=4096,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=True,
|
||||
markdown_dialect="markdown_v2",
|
||||
len_unit="utf16",
|
||||
emoji="\u2708\ufe0f",
|
||||
platform_hint="You are on Telegram.",
|
||||
pii_safe=False,
|
||||
)
|
||||
base.update(overrides)
|
||||
return CapabilityDescriptor(**base)
|
||||
|
||||
|
||||
def test_descriptor_roundtrips_json():
|
||||
d = _telegram_descriptor()
|
||||
assert CapabilityDescriptor.from_json(d.to_json()) == d
|
||||
|
||||
|
||||
def test_descriptor_is_frozen():
|
||||
d = _telegram_descriptor()
|
||||
try:
|
||||
d.max_message_length = 1 # type: ignore[misc]
|
||||
except Exception as exc: # FrozenInstanceError
|
||||
assert "cannot assign" in str(exc) or "frozen" in str(exc).lower()
|
||||
else: # pragma: no cover
|
||||
raise AssertionError("descriptor should be immutable (frozen)")
|
||||
|
||||
|
||||
def test_from_json_ignores_unknown_keys():
|
||||
"""A newer connector may send fields this gateway doesn't know — those are
|
||||
dropped, not fatal (forward-compat during the experimental phase)."""
|
||||
d = _telegram_descriptor()
|
||||
raw = d.to_json()[:-1] + ', "future_field": "ignored"}'
|
||||
restored = CapabilityDescriptor.from_json(raw)
|
||||
assert restored == d
|
||||
|
||||
|
||||
def test_from_json_fills_optional_defaults():
|
||||
"""Optional fields (emoji/platform_hint/pii_safe) fall back to defaults."""
|
||||
minimal = (
|
||||
'{"contract_version": 1, "platform": "x", "label": "X", '
|
||||
'"max_message_length": 2000, "supports_draft_streaming": false, '
|
||||
'"supports_edit": false, "supports_threads": false, '
|
||||
'"markdown_dialect": "plain", "len_unit": "chars"}'
|
||||
)
|
||||
d = CapabilityDescriptor.from_json(minimal)
|
||||
assert d.pii_safe is False
|
||||
assert d.platform_hint == ""
|
||||
assert d.emoji == "\U0001f50c"
|
||||
|
||||
|
||||
def test_module_is_marked_experimental():
|
||||
import gateway.relay.descriptor as m
|
||||
|
||||
assert "EXPERIMENTAL" in (m.__doc__ or "")
|
||||
64
tests/gateway/relay/test_descriptor_from_entry.py
Normal file
64
tests/gateway/relay/test_descriptor_from_entry.py
Normal file
@@ -0,0 +1,64 @@
|
||||
"""Descriptor <- PlatformEntry projection (relay Phase 0, Task 0.3).
|
||||
|
||||
Proves the CapabilityDescriptor is a projection of the existing PlatformEntry,
|
||||
not a parallel concept: the entry's label/limit/emoji/hint/pii fields carry
|
||||
straight through.
|
||||
"""
|
||||
|
||||
from gateway.platform_registry import PlatformEntry
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
|
||||
def _entry(**overrides) -> PlatformEntry:
|
||||
base = dict(
|
||||
name="telegram",
|
||||
label="Telegram",
|
||||
adapter_factory=lambda cfg: None,
|
||||
check_fn=lambda: True,
|
||||
max_message_length=4096,
|
||||
pii_safe=False,
|
||||
emoji="\u2708\ufe0f",
|
||||
platform_hint="You are on Telegram.",
|
||||
)
|
||||
base.update(overrides)
|
||||
return PlatformEntry(**base)
|
||||
|
||||
|
||||
def test_projection_carries_platform_entry_fields():
|
||||
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
|
||||
assert d.contract_version == CONTRACT_VERSION
|
||||
assert d.platform == "telegram"
|
||||
assert d.label == "Telegram"
|
||||
assert d.max_message_length == 4096
|
||||
assert d.emoji == "\u2708\ufe0f"
|
||||
assert d.platform_hint == "You are on Telegram."
|
||||
assert d.pii_safe is False
|
||||
assert d.len_unit == "utf16"
|
||||
|
||||
|
||||
def test_zero_max_length_maps_to_4096_default():
|
||||
"""PlatformEntry.max_message_length == 0 means 'no limit'; the descriptor
|
||||
carries a concrete bound matching the stream_consumer default."""
|
||||
d = CapabilityDescriptor.from_platform_entry(_entry(max_message_length=0))
|
||||
assert d.max_message_length == 4096
|
||||
|
||||
|
||||
def test_runtime_capabilities_supplied_by_caller():
|
||||
"""PlatformEntry doesn't encode draft/edit/thread/markdown behavior — those
|
||||
come from the caller (the connector, reading the live adapter)."""
|
||||
d = CapabilityDescriptor.from_platform_entry(
|
||||
_entry(),
|
||||
supports_draft_streaming=True,
|
||||
supports_edit=False,
|
||||
supports_threads=True,
|
||||
markdown_dialect="discord",
|
||||
)
|
||||
assert d.supports_draft_streaming is True
|
||||
assert d.supports_edit is False
|
||||
assert d.supports_threads is True
|
||||
assert d.markdown_dialect == "discord"
|
||||
|
||||
|
||||
def test_projection_roundtrips_through_json():
|
||||
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
|
||||
assert CapabilityDescriptor.from_json(d.to_json()) == d
|
||||
44
tests/gateway/relay/test_no_stub_leak.py
Normal file
44
tests/gateway/relay/test_no_stub_leak.py
Normal file
@@ -0,0 +1,44 @@
|
||||
"""CI guard: the test-only StubConnector must never leak into production paths.
|
||||
|
||||
The relay stub connector lives under tests/ and exists only to prove the
|
||||
gateway side of the relay without the real (Node) connector. If it ever appears
|
||||
under gateway/ or plugins/, that's a production leak — fail loudly.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pathlib
|
||||
import re
|
||||
|
||||
_REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
|
||||
_FORBIDDEN_DIRS = ("gateway", "plugins")
|
||||
# Match actual code leaks (imports / class definitions), not prose mentions in
|
||||
# docstrings/comments. A production file that *imports* the stub or *defines*
|
||||
# StubConnector is a real leak; a docstring that references the stub's path as
|
||||
# documentation is not.
|
||||
_LEAK_PATTERNS = (
|
||||
re.compile(r"^\s*(?:from|import)\s+.*stub_connector", re.MULTILINE),
|
||||
re.compile(r"^\s*(?:from|import)\s+.*\bStubConnector\b", re.MULTILINE),
|
||||
re.compile(r"^\s*class\s+StubConnector\b", re.MULTILINE),
|
||||
)
|
||||
|
||||
|
||||
def test_stub_connector_does_not_leak_into_production_paths():
|
||||
offenders: list[str] = []
|
||||
for top in _FORBIDDEN_DIRS:
|
||||
base = _REPO_ROOT / top
|
||||
if not base.is_dir():
|
||||
continue
|
||||
for path in base.rglob("*.py"):
|
||||
try:
|
||||
text = path.read_text(encoding="utf-8", errors="ignore")
|
||||
except OSError: # pragma: no cover
|
||||
continue
|
||||
for pat in _LEAK_PATTERNS:
|
||||
if pat.search(text):
|
||||
offenders.append(
|
||||
f"{path.relative_to(_REPO_ROOT)} matches {pat.pattern!r}"
|
||||
)
|
||||
assert not offenders, (
|
||||
"relay test stub leaked into production paths:\n " + "\n ".join(offenders)
|
||||
)
|
||||
77
tests/gateway/relay/test_relay_adapter.py
Normal file
77
tests/gateway/relay/test_relay_adapter.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""RelayAdapter capability-advertisement tests (relay Phase 1, Task 1.1)."""
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
|
||||
def make_desc(**kw) -> CapabilityDescriptor:
|
||||
base = dict(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="telegram",
|
||||
label="Telegram",
|
||||
max_message_length=4096,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=True,
|
||||
markdown_dialect="markdown_v2",
|
||||
len_unit="utf16",
|
||||
emoji="\u2708\ufe0f",
|
||||
platform_hint="",
|
||||
pii_safe=False,
|
||||
)
|
||||
base.update(kw)
|
||||
return CapabilityDescriptor(**base)
|
||||
|
||||
|
||||
def _adapter(**desc_kw) -> RelayAdapter:
|
||||
return RelayAdapter(PlatformConfig(), make_desc(**desc_kw))
|
||||
|
||||
|
||||
def test_relay_platform_member_exists():
|
||||
assert Platform("relay") is Platform.RELAY
|
||||
|
||||
|
||||
def test_advertises_descriptor_max_length():
|
||||
a = _adapter(max_message_length=2000)
|
||||
assert a.MAX_MESSAGE_LENGTH == 2000
|
||||
|
||||
|
||||
def test_supports_draft_streaming_follows_descriptor():
|
||||
assert _adapter(supports_draft_streaming=False).supports_draft_streaming() is False
|
||||
assert _adapter(supports_draft_streaming=True).supports_draft_streaming() is True
|
||||
|
||||
|
||||
def test_len_fn_utf16_counts_code_units():
|
||||
a = _adapter(len_unit="utf16")
|
||||
# An astral-plane emoji is two UTF-16 code units.
|
||||
assert a.message_len_fn("\U0001f600") == 2
|
||||
|
||||
|
||||
def test_len_fn_chars_uses_builtin_len():
|
||||
a = _adapter(len_unit="chars")
|
||||
assert a.message_len_fn("\U0001f600") == 1
|
||||
|
||||
|
||||
def test_is_a_base_platform_adapter():
|
||||
# stream_consumer's isinstance(adapter, BasePlatformAdapter) guard must pass.
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
|
||||
assert isinstance(_adapter(), BasePlatformAdapter)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_without_transport_raises():
|
||||
a = _adapter()
|
||||
with pytest.raises(RuntimeError, match="no transport"):
|
||||
await a.connect()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_without_transport_returns_failure():
|
||||
a = _adapter()
|
||||
result = await a.send("chat1", "hello")
|
||||
assert result.success is False
|
||||
assert result.error == "no transport"
|
||||
117
tests/gateway/relay/test_relay_follow_up.py
Normal file
117
tests/gateway/relay/test_relay_follow_up.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""A2 outbound capability action: the token-less ``follow_up`` op.
|
||||
|
||||
Proves the gateway can act on a shared-identity capability (e.g. a Discord
|
||||
interaction follow-up token) WITHOUT ever holding the credential: it names the
|
||||
session it is in plus the capability ``kind``, and the connector resolves the
|
||||
real value from its vault and egresses. See gateway/relay/transport.py
|
||||
(send_follow_up) and docs/relay-connector-contract.md §4.
|
||||
|
||||
The gateway side is what's exercised here (against the stub connector); the
|
||||
connector's resolve + tenant-match enforcement lives in the connector repo
|
||||
(resolveOutboundCapability). The key gateway-side guarantees:
|
||||
- the wire action carries NO token (only session_key + kind + content),
|
||||
- success/failure surfaces from the connector's resolve result,
|
||||
- a failed resolve (absent/expired/tenant mismatch) returns success=False
|
||||
with nothing for the gateway to retry with.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
from tests.gateway.relay.stub_connector import StubConnector
|
||||
|
||||
|
||||
def _discord_descriptor() -> CapabilityDescriptor:
|
||||
return CapabilityDescriptor(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="discord",
|
||||
label="Discord",
|
||||
max_message_length=2000,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=True,
|
||||
markdown_dialect="discord",
|
||||
len_unit="chars",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wired():
|
||||
stub = StubConnector(_discord_descriptor())
|
||||
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
|
||||
return adapter, stub
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_follow_up_round_trips_without_a_token(wired):
|
||||
adapter, stub = wired
|
||||
await adapter.connect()
|
||||
stub.next_follow_up_result = {"success": True, "message_id": "fu-7"}
|
||||
|
||||
result = await adapter.send_follow_up(
|
||||
session_key="agent:main:discord:group:chanA:userX",
|
||||
kind="discord.interaction_token",
|
||||
content="here is your follow-up",
|
||||
)
|
||||
|
||||
assert result.success is True
|
||||
assert result.message_id == "fu-7"
|
||||
assert len(stub.follow_ups) == 1
|
||||
action = stub.follow_ups[0]
|
||||
assert action["op"] == "follow_up"
|
||||
assert action["session_key"] == "agent:main:discord:group:chanA:userX"
|
||||
assert action["kind"] == "discord.interaction_token"
|
||||
assert action["content"] == "here is your follow-up"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_follow_up_wire_action_carries_no_credential(wired):
|
||||
"""The action dict must carry only session refs — no credential VALUE.
|
||||
|
||||
Note the capability ``kind`` legitimately names the credential type
|
||||
(e.g. ``"discord.interaction_token"``) — that's a reference, not the secret.
|
||||
The guarantee is structural: the action has exactly the token-less semantic
|
||||
fields, and no field holds an actual credential value.
|
||||
"""
|
||||
adapter, stub = wired
|
||||
await adapter.connect()
|
||||
await adapter.send_follow_up(
|
||||
session_key="sess-1", kind="discord.interaction_token", content="x", metadata={"a": 1}
|
||||
)
|
||||
action = stub.follow_ups[0]
|
||||
# Exactly the token-less semantic fields (+ metadata); no value/secret field.
|
||||
assert set(action.keys()) == {"op", "session_key", "kind", "content", "metadata"}
|
||||
# No field NAMES a credential carrier (the kind string is a type ref, allowed).
|
||||
assert "value" not in action
|
||||
assert "token" not in action
|
||||
assert "secret" not in action
|
||||
assert "credential" not in action
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_follow_up_failure_surfaces_when_capability_unresolvable(wired):
|
||||
"""Connector couldn't resolve (absent/expired/tenant mismatch) -> success=False."""
|
||||
adapter, stub = wired
|
||||
await adapter.connect()
|
||||
stub.next_follow_up_result = {"success": False, "error": "capability absent or tenant mismatch"}
|
||||
|
||||
result = await adapter.send_follow_up(
|
||||
session_key="sess-1", kind="discord.interaction_token", content="x"
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert result.message_id is None
|
||||
assert "tenant mismatch" in (result.error or "")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_follow_up_without_transport_fails_cleanly():
|
||||
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=None)
|
||||
result = await adapter.send_follow_up(session_key="s", kind="k", content="c")
|
||||
assert result.success is False
|
||||
assert result.error == "no transport"
|
||||
69
tests/gateway/relay/test_relay_interrupt.py
Normal file
69
tests/gateway/relay/test_relay_interrupt.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Relay /stop interrupt routing (relay Phase 1, Task 1.4).
|
||||
|
||||
Proves a connector-delivered mid-turn interrupt reaches the existing per-session
|
||||
interrupt mechanism and cancels exactly the targeted session_key's turn — never
|
||||
a sibling's. Mirrors the isolation discipline of test_stop_thread_sibling.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
from tests.gateway.relay.stub_connector import StubConnector
|
||||
|
||||
|
||||
def _desc() -> CapabilityDescriptor:
|
||||
return CapabilityDescriptor(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="discord",
|
||||
label="Discord",
|
||||
max_message_length=2000,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=True,
|
||||
markdown_dialect="discord",
|
||||
len_unit="chars",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def adapter():
|
||||
return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc()))
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_interrupt_sets_only_target_session_event(adapter):
|
||||
key_a = "agent:main:discord:group:chanA:userX"
|
||||
key_b = "agent:main:discord:group:chanB:userY"
|
||||
ev_a = asyncio.Event()
|
||||
ev_b = asyncio.Event()
|
||||
adapter._active_sessions[key_a] = ev_a
|
||||
adapter._active_sessions[key_b] = ev_b
|
||||
|
||||
await adapter.on_interrupt(key_a, chat_id="chanA")
|
||||
|
||||
assert ev_a.is_set() is True, "target session's interrupt Event must be set"
|
||||
assert ev_b.is_set() is False, "sibling session must be untouched"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_interrupt_unknown_session_is_noop(adapter):
|
||||
# No active session for this key — must not raise.
|
||||
await adapter.on_interrupt("agent:main:discord:group:nope:userZ", chat_id="nope")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_interrupt_reaches_connector(adapter):
|
||||
"""The gateway-side /stop egress: send_interrupt is carried to the connector
|
||||
so it can forward down the socket owning the session_key."""
|
||||
stub = adapter._transport
|
||||
await stub.send_interrupt("agent:main:discord:group:chanA:userX", reason="stop")
|
||||
assert stub.interrupts == [
|
||||
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
|
||||
]
|
||||
66
tests/gateway/relay/test_relay_registration.py
Normal file
66
tests/gateway/relay/test_relay_registration.py
Normal file
@@ -0,0 +1,66 @@
|
||||
"""RelayAdapter registration via the platform registry.
|
||||
|
||||
The relay platform is registered when a connector relay URL is configured
|
||||
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml) — the same
|
||||
config-driven shape as ``gateway.proxy_url``, not a separate feature flag. With
|
||||
no URL configured, registration is a no-op so direct/single-tenant deployments
|
||||
are unaffected. ``force=True`` registers a transport-less adapter for tests.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.platform_registry import platform_registry
|
||||
from gateway.relay import register_relay_adapter, relay_url
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_registry(monkeypatch):
|
||||
"""Each test starts/ends with no 'relay' entry and a clean relay env."""
|
||||
monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False)
|
||||
monkeypatch.delenv("GATEWAY_RELAY_PLATFORM", raising=False)
|
||||
monkeypatch.delenv("GATEWAY_RELAY_BOT_ID", raising=False)
|
||||
platform_registry.unregister("relay")
|
||||
yield
|
||||
platform_registry.unregister("relay")
|
||||
|
||||
|
||||
def test_off_when_no_url_configured(monkeypatch):
|
||||
# No GATEWAY_RELAY_URL and (assuming) no gateway.relay_url in config.
|
||||
monkeypatch.setattr("gateway.relay.relay_url", lambda: None)
|
||||
assert register_relay_adapter() is False
|
||||
assert platform_registry.is_registered("relay") is False
|
||||
|
||||
|
||||
def test_registers_when_url_configured(monkeypatch):
|
||||
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay")
|
||||
assert relay_url() == "wss://connector.example/relay"
|
||||
assert register_relay_adapter() is True
|
||||
assert platform_registry.is_registered("relay") is True
|
||||
|
||||
|
||||
def test_explicit_url_arg_registers():
|
||||
assert register_relay_adapter(url="wss://connector.example/relay") is True
|
||||
assert platform_registry.is_registered("relay") is True
|
||||
|
||||
|
||||
def test_force_registers_without_url():
|
||||
assert register_relay_adapter(force=True) is True
|
||||
assert platform_registry.is_registered("relay") is True
|
||||
|
||||
|
||||
def test_trailing_slash_stripped(monkeypatch):
|
||||
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay/")
|
||||
assert relay_url() == "wss://connector.example/relay"
|
||||
|
||||
|
||||
def test_create_adapter_yields_relay_adapter():
|
||||
# force=True builds a transport-less adapter (no live dial in unit tests).
|
||||
register_relay_adapter(force=True)
|
||||
adapter = platform_registry.create_adapter("relay", PlatformConfig())
|
||||
assert isinstance(adapter, RelayAdapter)
|
||||
# Placeholder descriptor until handshake negotiates the real one.
|
||||
assert adapter.descriptor.platform == "relay"
|
||||
122
tests/gateway/relay/test_relay_roundtrip.py
Normal file
122
tests/gateway/relay/test_relay_roundtrip.py
Normal file
@@ -0,0 +1,122 @@
|
||||
"""End-to-end relay round-trip against the in-memory stub connector.
|
||||
|
||||
Proves the gateway side of the relay works with no real connector:
|
||||
- connect() registers the inbound handler,
|
||||
- a connector-delivered MessageEvent reaches the adapter's message path,
|
||||
- SessionSource discriminators (guild_id) drive build_session_key isolation,
|
||||
- an outbound send round-trips through the transport.
|
||||
|
||||
These target the transport contract + session-key derivation (Task 1.2's gate),
|
||||
not the full agent turn — handle_message is patched to capture the event.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
from tests.gateway.relay.stub_connector import StubConnector
|
||||
|
||||
|
||||
def _discord_descriptor() -> CapabilityDescriptor:
|
||||
return CapabilityDescriptor(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="discord",
|
||||
label="Discord",
|
||||
max_message_length=2000,
|
||||
supports_draft_streaming=False,
|
||||
supports_edit=True,
|
||||
supports_threads=True,
|
||||
markdown_dialect="discord",
|
||||
len_unit="chars",
|
||||
emoji="\U0001f47e",
|
||||
platform_hint="You are on Discord.",
|
||||
pii_safe=False,
|
||||
)
|
||||
|
||||
|
||||
def _discord_event(guild_id: str, channel_id: str, user_id: str, text: str) -> MessageEvent:
|
||||
"""Synthetic inbound the connector would build from a discord.js message."""
|
||||
source = SessionSource(
|
||||
platform=Platform.DISCORD,
|
||||
chat_id=channel_id,
|
||||
chat_type="group",
|
||||
user_id=user_id,
|
||||
guild_id=guild_id,
|
||||
)
|
||||
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wired():
|
||||
stub = StubConnector(_discord_descriptor())
|
||||
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
|
||||
return adapter, stub
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_registers_inbound_handler(wired):
|
||||
adapter, stub = wired
|
||||
assert stub._inbound is None
|
||||
ok = await adapter.connect()
|
||||
assert ok is True
|
||||
assert stub.connected is True
|
||||
assert stub._inbound is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbound_event_reaches_adapter(wired, monkeypatch):
|
||||
adapter, stub = wired
|
||||
captured = []
|
||||
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
|
||||
await adapter.connect()
|
||||
ev = _discord_event("guildA", "chan1", "userX", "hello")
|
||||
await stub.push_inbound(ev)
|
||||
assert len(captured) == 1
|
||||
assert captured[0].text == "hello"
|
||||
assert captured[0].source.guild_id == "guildA"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_guilds_isolate_into_distinct_session_keys(wired):
|
||||
adapter, _ = wired
|
||||
ev_a = _discord_event("guildA", "chan1", "userX", "hi from A")
|
||||
ev_b = _discord_event("guildB", "chan2", "userX", "hi from B")
|
||||
key_a = build_session_key(ev_a.source)
|
||||
key_b = build_session_key(ev_b.source)
|
||||
assert key_a != key_b
|
||||
# Same guild + channel + user collapses to one session.
|
||||
ev_a2 = _discord_event("guildA", "chan1", "userX", "again")
|
||||
assert build_session_key(ev_a2.source) == key_a
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_send_round_trips(wired):
|
||||
adapter, stub = wired
|
||||
await adapter.connect()
|
||||
stub.next_send_result = {"success": True, "message_id": "msg-42"}
|
||||
result = await adapter.send("chan1", "a reply", metadata={"k": "v"})
|
||||
assert result.success is True
|
||||
assert result.message_id == "msg-42"
|
||||
assert len(stub.sent) == 1
|
||||
assert stub.sent[0]["op"] == "send"
|
||||
assert stub.sent[0]["chat_id"] == "chan1"
|
||||
assert stub.sent[0]["content"] == "a reply"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_chat_info_proxied_to_connector(wired):
|
||||
adapter, stub = wired
|
||||
stub.chat_info["chan1"] = {"name": "general", "type": "group"}
|
||||
info = await adapter.get_chat_info("chan1")
|
||||
assert info == {"name": "general", "type": "group"}
|
||||
|
||||
|
||||
async def _async_capture(sink, event):
|
||||
sink.append(event)
|
||||
return None
|
||||
167
tests/gateway/relay/test_relay_roundtrip_telegram.py
Normal file
167
tests/gateway/relay/test_relay_roundtrip_telegram.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""End-to-end relay round-trip for Telegram against the in-memory stub.
|
||||
|
||||
Companion to ``test_relay_roundtrip.py`` (Discord). Proves the relay generalizes
|
||||
beyond Discord — the Phase 1 exit gate requires *both* Telegram and Discord
|
||||
descriptors to round-trip and their inbound ``MessageEvent``s to drive
|
||||
``build_session_key()`` correctly.
|
||||
|
||||
Telegram's discriminator profile differs from Discord's, which is the point:
|
||||
- No ``guild_id``; isolation between chats comes from ``chat_id`` alone.
|
||||
- Forum topics live inside ONE ``chat_id`` and isolate by ``thread_id`` (the
|
||||
Telegram analog of Discord's per-guild isolation).
|
||||
- Forum/thread sessions are shared across participants by default
|
||||
(``thread_sessions_per_user=False``) — user_id is NOT appended in a thread.
|
||||
- ``len_unit="utf16"`` (Telegram counts UTF-16 code units) and
|
||||
``markdown_dialect="markdown_v2"`` — distinct from Discord's chars/discord.
|
||||
|
||||
If the descriptor or session-keying only worked for Discord, these fail.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
from gateway.relay.adapter import RelayAdapter
|
||||
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
|
||||
|
||||
from tests.gateway.relay.stub_connector import StubConnector
|
||||
|
||||
|
||||
def _telegram_descriptor() -> CapabilityDescriptor:
|
||||
return CapabilityDescriptor(
|
||||
contract_version=CONTRACT_VERSION,
|
||||
platform="telegram",
|
||||
label="Telegram",
|
||||
max_message_length=4096,
|
||||
supports_draft_streaming=True, # Telegram DMs support sendMessageDraft
|
||||
supports_edit=True,
|
||||
supports_threads=True, # forum topics
|
||||
markdown_dialect="markdown_v2",
|
||||
len_unit="utf16",
|
||||
emoji="\u2708\ufe0f",
|
||||
platform_hint="You are on Telegram.",
|
||||
pii_safe=False,
|
||||
)
|
||||
|
||||
|
||||
def _tg_group_event(chat_id: str, user_id: str, text: str, thread_id: str | None = None) -> MessageEvent:
|
||||
"""Synthetic inbound the connector would build from a Telegram update.
|
||||
|
||||
A plain group message has no thread_id; a forum-topic message carries the
|
||||
topic id as thread_id (no guild_id — Telegram has no guild concept).
|
||||
"""
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id=chat_id,
|
||||
chat_type="forum" if thread_id else "group",
|
||||
user_id=user_id,
|
||||
thread_id=thread_id,
|
||||
)
|
||||
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
|
||||
|
||||
|
||||
def _tg_dm_event(chat_id: str, user_id: str, text: str) -> MessageEvent:
|
||||
source = SessionSource(
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_id=chat_id,
|
||||
chat_type="dm",
|
||||
user_id=user_id,
|
||||
)
|
||||
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wired():
|
||||
desc = _telegram_descriptor()
|
||||
stub = StubConnector(desc)
|
||||
adapter = RelayAdapter(PlatformConfig(), desc, transport=stub)
|
||||
return adapter, stub
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telegram_descriptor_round_trips_through_stub(wired):
|
||||
"""The connector's handshake descriptor for Telegram survives JSON + the
|
||||
adapter configures itself from it (utf16 length unit, 4096 limit)."""
|
||||
adapter, stub = wired
|
||||
desc = _telegram_descriptor()
|
||||
assert CapabilityDescriptor.from_json(desc.to_json()) == desc
|
||||
# Adapter reflects the descriptor's capability profile.
|
||||
assert adapter.MAX_MESSAGE_LENGTH == 4096
|
||||
assert adapter.supports_draft_streaming() is True
|
||||
# utf16 length unit selects a non-default len fn (Telegram counts UTF-16).
|
||||
assert adapter.message_len_fn is not len
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbound_telegram_event_reaches_adapter(wired, monkeypatch):
|
||||
adapter, stub = wired
|
||||
captured: list[MessageEvent] = []
|
||||
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
|
||||
await adapter.connect()
|
||||
await stub.push_inbound(_tg_group_event("chat-100", "userX", "hello"))
|
||||
assert len(captured) == 1
|
||||
assert captured[0].text == "hello"
|
||||
assert captured[0].source.platform == Platform.TELEGRAM
|
||||
assert captured[0].source.guild_id is None # Telegram has no guild
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_two_telegram_chats_isolate_by_chat_id(wired):
|
||||
"""No guild_id on Telegram — two distinct chats must still isolate, keyed
|
||||
on chat_id alone (the Discord-guild role is played by chat_id here)."""
|
||||
ev_a = _tg_group_event("chat-A", "userX", "hi A")
|
||||
ev_b = _tg_group_event("chat-B", "userX", "hi B")
|
||||
key_a = build_session_key(ev_a.source)
|
||||
key_b = build_session_key(ev_b.source)
|
||||
assert key_a != key_b
|
||||
# Same chat + same user collapses to one session.
|
||||
ev_a2 = _tg_group_event("chat-A", "userX", "again")
|
||||
assert build_session_key(ev_a2.source) == key_a
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_forum_topics_isolate_by_thread_id_within_one_chat(wired):
|
||||
"""Telegram forum topics share a single chat_id and isolate by thread_id —
|
||||
the Telegram analog of Discord per-guild isolation. Two topics in the same
|
||||
forum must NOT collide, and (threads shared across participants by default)
|
||||
a second user in the same topic shares the session."""
|
||||
topic1 = _tg_group_event("forum-1", "userX", "in topic 1", thread_id="t-1")
|
||||
topic2 = _tg_group_event("forum-1", "userX", "in topic 2", thread_id="t-2")
|
||||
k1 = build_session_key(topic1.source)
|
||||
k2 = build_session_key(topic2.source)
|
||||
assert k1 != k2, "two forum topics in one chat must not share a session"
|
||||
# Same chat, no topic → distinct from any topic session.
|
||||
plain = _tg_group_event("forum-1", "userX", "no topic")
|
||||
assert build_session_key(plain.source) not in {k1, k2}
|
||||
# Threads are shared across participants by default: a different user in the
|
||||
# same topic lands on the SAME session key (user_id not appended in threads).
|
||||
topic1_other_user = _tg_group_event("forum-1", "userY", "me too", thread_id="t-1")
|
||||
assert build_session_key(topic1_other_user.source) == k1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_telegram_dm_isolates_by_chat_id(wired):
|
||||
dm_a = _tg_dm_event("dm-111", "userX", "hey")
|
||||
dm_b = _tg_dm_event("dm-222", "userY", "yo")
|
||||
assert build_session_key(dm_a.source) != build_session_key(dm_b.source)
|
||||
assert build_session_key(dm_a.source).startswith("agent:main:telegram:dm:")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_send_round_trips_telegram(wired):
|
||||
adapter, stub = wired
|
||||
await adapter.connect()
|
||||
stub.next_send_result = {"success": True, "message_id": "tg-77"}
|
||||
result = await adapter.send("chat-100", "a reply")
|
||||
assert result.success is True
|
||||
assert result.message_id == "tg-77"
|
||||
assert stub.sent[0]["op"] == "send"
|
||||
assert stub.sent[0]["chat_id"] == "chat-100"
|
||||
|
||||
|
||||
async def _async_capture(sink, event):
|
||||
sink.append(event)
|
||||
return None
|
||||
91
tests/gateway/relay/test_relay_sheds_crypto.py
Normal file
91
tests/gateway/relay/test_relay_sheds_crypto.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""Invariant: the relay path sheds platform crypto — it re-validates nothing.
|
||||
|
||||
Under the A2 trust model (see docs/relay-connector-contract.md §6), the
|
||||
*connector* is the sole crypto/identity boundary: it verifies/decrypts every
|
||||
inbound platform payload at the edge (it holds the tenant secrets), normalizes
|
||||
it to a tenant-scoped ``MessageEvent``, and forwards only the sanitized event.
|
||||
The gateway re-validates nothing — it cannot, without being handed the shared
|
||||
signing secret, which would itself be the leak on a shared bot.
|
||||
|
||||
The relay package therefore MUST NOT import or call platform signature/crypto
|
||||
verification (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt, generic webhook
|
||||
signature checks). Those live in the *direct* platform adapters
|
||||
(``gateway/platforms/*``) which serve non-relay deployments; the relay receives
|
||||
already-trusted events. This test fails if someone bolts re-validation onto the
|
||||
relay path, re-coupling the gateway to platform secrets it must never hold.
|
||||
|
||||
It is an invariant (asserts the *relation* "relay imports no crypto"), not a
|
||||
change-detector snapshot of a frozen import list.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import ast
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
# gateway/relay package directory: tests/gateway/relay/ -> repo root parents[3].
|
||||
_REPO_ROOT = Path(__file__).resolve().parents[3]
|
||||
_RELAY_PKG = _REPO_ROOT / "gateway" / "relay"
|
||||
|
||||
# Modules / symbols that mean "platform crypto re-validation". If the relay path
|
||||
# imports any of these it has re-coupled the gateway to a platform secret.
|
||||
_FORBIDDEN_MODULE_TOKENS = (
|
||||
"wecom_crypto",
|
||||
"wecom_callback",
|
||||
"webhook", # gateway.platforms.webhook holds signature verification
|
||||
)
|
||||
_FORBIDDEN_SYMBOL_RE = re.compile(
|
||||
r"(ed25519|verify_key|verifykey|verify_signature|verify_ed25519|"
|
||||
r"verify_webhook|bizmsg|hmac|x[-_]signature)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def _relay_py_files() -> list[Path]:
|
||||
assert _RELAY_PKG.is_dir(), f"relay package missing at {_RELAY_PKG}"
|
||||
return sorted(_RELAY_PKG.glob("*.py"))
|
||||
|
||||
|
||||
def test_relay_package_imports_no_platform_crypto():
|
||||
"""No module in gateway/relay imports a platform-crypto / verification module."""
|
||||
offenders: list[str] = []
|
||||
for path in _relay_py_files():
|
||||
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
|
||||
for node in ast.walk(tree):
|
||||
mods: list[str] = []
|
||||
if isinstance(node, ast.Import):
|
||||
mods = [alias.name for alias in node.names]
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
mods = [node.module or ""]
|
||||
mods += [f"{node.module or ''}.{a.name}" for a in node.names]
|
||||
for mod in mods:
|
||||
if any(tok in mod for tok in _FORBIDDEN_MODULE_TOKENS):
|
||||
offenders.append(f"{path.name}: imports '{mod}'")
|
||||
assert not offenders, (
|
||||
"The relay path must re-validate NOTHING (A2: connector is the sole "
|
||||
"crypto boundary). Found platform-crypto imports in the relay package:\n "
|
||||
+ "\n ".join(offenders)
|
||||
+ "\nMove verification to the connector edge; the gateway trusts the "
|
||||
"normalized MessageEvent. See docs/relay-connector-contract.md §6."
|
||||
)
|
||||
|
||||
|
||||
def test_relay_package_calls_no_signature_verification():
|
||||
"""No relay module references a signature/crypto-verification symbol by name."""
|
||||
offenders: list[str] = []
|
||||
for path in _relay_py_files():
|
||||
for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
|
||||
# Skip comments / docstrings-as-prose: only flag code-like usage.
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
m = _FORBIDDEN_SYMBOL_RE.search(line)
|
||||
if m:
|
||||
offenders.append(f"{path.name}:{lineno}: '{m.group(0)}' in: {stripped[:80]}")
|
||||
assert not offenders, (
|
||||
"The relay path must not perform platform signature/crypto verification "
|
||||
"(A2). Found verification-symbol references:\n "
|
||||
+ "\n ".join(offenders)
|
||||
+ "\nThe connector verifies at the edge; the gateway re-validates nothing."
|
||||
)
|
||||
179
tests/gateway/relay/test_ws_transport.py
Normal file
179
tests/gateway/relay/test_ws_transport.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""WebSocketRelayTransport against a real in-process WebSocket server.
|
||||
|
||||
Exercises the production transport over an actual ``websockets`` server (no
|
||||
mock socket): handshake (hello -> descriptor), inbound frame -> handler,
|
||||
outbound request/response correlation, and follow_up routing. Proves the wire
|
||||
framing (newline-delimited JSON) and the request/response future plumbing work
|
||||
end to end on a live socket.
|
||||
|
||||
Skipped cleanly if the optional ``websockets`` dependency is absent.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from gateway.relay.ws_transport import WebSocketRelayTransport, WEBSOCKETS_AVAILABLE
|
||||
|
||||
pytestmark = pytest.mark.skipif(not WEBSOCKETS_AVAILABLE, reason="websockets not installed")
|
||||
|
||||
if WEBSOCKETS_AVAILABLE:
|
||||
import websockets
|
||||
|
||||
|
||||
DESCRIPTOR = {
|
||||
"contract_version": 1,
|
||||
"platform": "discord",
|
||||
"label": "Discord",
|
||||
"max_message_length": 2000,
|
||||
"supports_draft_streaming": False,
|
||||
"supports_edit": True,
|
||||
"supports_threads": True,
|
||||
"markdown_dialect": "discord",
|
||||
"len_unit": "chars",
|
||||
}
|
||||
|
||||
|
||||
class _StubConnectorServer:
|
||||
"""Minimal connector: answers hello with a descriptor, echoes outbound."""
|
||||
|
||||
def __init__(self):
|
||||
self.received: list[dict] = []
|
||||
self._server = None
|
||||
self.url = ""
|
||||
# Push channel: tests set this to a frame dict to deliver inbound.
|
||||
self._to_push: list[dict] = []
|
||||
|
||||
async def start(self):
|
||||
self._server = await websockets.serve(self._handle, "127.0.0.1", 0)
|
||||
sock = next(iter(self._server.sockets))
|
||||
port = sock.getsockname()[1]
|
||||
self.url = f"ws://127.0.0.1:{port}"
|
||||
|
||||
async def stop(self):
|
||||
if self._server is not None:
|
||||
self._server.close()
|
||||
await self._server.wait_closed()
|
||||
|
||||
async def _handle(self, ws):
|
||||
async for raw in ws:
|
||||
for line in str(raw).split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
frame = json.loads(line)
|
||||
self.received.append(frame)
|
||||
await self._on_frame(ws, frame)
|
||||
|
||||
async def _on_frame(self, ws, frame):
|
||||
ftype = frame.get("type")
|
||||
if ftype == "hello":
|
||||
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
|
||||
# Deliver any queued inbound frames right after handshake.
|
||||
for f in self._to_push:
|
||||
await ws.send(json.dumps(f) + "\n")
|
||||
elif ftype == "outbound":
|
||||
action = frame.get("action", {})
|
||||
# Echo a successful result correlated by requestId.
|
||||
result = {"success": True, "message_id": f"srv-{action.get('op')}"}
|
||||
await ws.send(
|
||||
json.dumps({"type": "outbound_result", "requestId": frame["requestId"], "result": result})
|
||||
+ "\n"
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def server():
|
||||
srv = _StubConnectorServer()
|
||||
await srv.start()
|
||||
yield srv
|
||||
await srv.stop()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handshake_negotiates_descriptor(server):
|
||||
t = WebSocketRelayTransport(server.url, "discord", "appShared")
|
||||
await t.connect()
|
||||
try:
|
||||
desc = await t.handshake()
|
||||
assert desc.platform == "discord"
|
||||
assert desc.max_message_length == 2000
|
||||
# The hello carried the platform + botId.
|
||||
hello = next(f for f in server.received if f["type"] == "hello")
|
||||
assert hello["platform"] == "discord"
|
||||
assert hello["botId"] == "appShared"
|
||||
finally:
|
||||
await t.disconnect()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbound_frame_reaches_handler(server):
|
||||
server._to_push = [
|
||||
{
|
||||
"type": "inbound",
|
||||
"event": {
|
||||
"text": "hello from connector",
|
||||
"message_type": "text",
|
||||
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
|
||||
},
|
||||
"bufferId": "buf-1",
|
||||
}
|
||||
]
|
||||
received = []
|
||||
t = WebSocketRelayTransport(server.url, "discord", "appShared")
|
||||
t.set_inbound_handler(lambda ev: received.append(ev) or asyncio.sleep(0))
|
||||
await t.connect()
|
||||
try:
|
||||
await t.handshake()
|
||||
# Give the reader a tick to deliver the pushed inbound frame.
|
||||
await asyncio.sleep(0.05)
|
||||
assert len(received) == 1
|
||||
assert received[0].text == "hello from connector"
|
||||
assert received[0].source.guild_id == "guildA"
|
||||
finally:
|
||||
await t.disconnect()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_round_trips_with_correlation(server):
|
||||
t = WebSocketRelayTransport(server.url, "discord", "appShared")
|
||||
await t.connect()
|
||||
try:
|
||||
await t.handshake()
|
||||
result = await t.send_outbound({"op": "send", "chat_id": "chan1", "content": "hi"})
|
||||
assert result["success"] is True
|
||||
assert result["message_id"] == "srv-send"
|
||||
finally:
|
||||
await t.disconnect()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_follow_up_round_trips(server):
|
||||
t = WebSocketRelayTransport(server.url, "discord", "appShared")
|
||||
await t.connect()
|
||||
try:
|
||||
await t.handshake()
|
||||
result = await t.send_follow_up(
|
||||
{"op": "follow_up", "session_key": "s1", "kind": "discord.interaction_token", "content": "fu"}
|
||||
)
|
||||
assert result["success"] is True
|
||||
assert result["message_id"] == "srv-follow_up"
|
||||
# The follow_up rode an outbound frame the connector saw.
|
||||
outbound = [f for f in server.received if f["type"] == "outbound"]
|
||||
assert any(f["action"]["op"] == "follow_up" for f in outbound)
|
||||
finally:
|
||||
await t.disconnect()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnect_fails_pending_waiters_cleanly(server):
|
||||
t = WebSocketRelayTransport(server.url, "discord", "appShared", outbound_timeout_s=5)
|
||||
await t.connect()
|
||||
await t.handshake()
|
||||
await t.disconnect()
|
||||
# After disconnect, an outbound returns a structured failure rather than hanging.
|
||||
result = await t.send_outbound({"op": "send", "chat_id": "c", "content": "x"})
|
||||
assert result["success"] is False
|
||||
@@ -98,6 +98,8 @@ def test_checker_returns_true_when_configured(platform, checker, monkeypatch):
|
||||
mock_config.extra = {"app_id": "app", "app_secret": "sec"}
|
||||
elif platform == Platform.DINGTALK:
|
||||
mock_config.extra = {"client_id": "id", "client_secret": "sec"}
|
||||
elif platform == Platform.RELAY:
|
||||
mock_config.extra = {"relay_url": "wss://connector.example/relay"}
|
||||
else:
|
||||
pytest.skip(f"No synthetic config defined for {platform.value}")
|
||||
|
||||
|
||||
99
tests/gateway/test_relay_capability_surface.py
Normal file
99
tests/gateway/test_relay_capability_surface.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Phase 0 regression harness for the relay/connector work.
|
||||
|
||||
Locks the *behavioral contract* that the future ``RelayAdapter`` must reproduce:
|
||||
the gateway's ``stream_consumer`` and ``BasePlatformAdapter`` read per-platform
|
||||
capabilities through a small, stable surface. A relay adapter that exposes the
|
||||
same surface (``MAX_MESSAGE_LENGTH`` attribute, ``message_len_fn`` property,
|
||||
``supports_draft_streaming`` probe, and only the abstract methods) slots into
|
||||
the existing consumer with no consumer changes.
|
||||
|
||||
These are deliberately *behavioral* (construct an adapter, drive the code,
|
||||
assert the observable outcome) rather than source-string snapshots, per the
|
||||
repo's "don't write change-detector tests" rule. They pass on ``main`` before
|
||||
any ``RelayAdapter`` exists — they describe the contract, not the relay.
|
||||
"""
|
||||
|
||||
import inspect
|
||||
|
||||
from gateway.config import Platform, PlatformConfig
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
|
||||
|
||||
class _MinAdapter(BasePlatformAdapter):
|
||||
"""Smallest concrete adapter: implements exactly the abstract methods."""
|
||||
|
||||
async def connect(self): # pragma: no cover - not called
|
||||
return True
|
||||
|
||||
async def disconnect(self): # pragma: no cover - not called
|
||||
return None
|
||||
|
||||
async def send(self, *args, **kwargs): # pragma: no cover - not called
|
||||
return None
|
||||
|
||||
async def get_chat_info(self, chat_id): # pragma: no cover - not called
|
||||
return {}
|
||||
|
||||
|
||||
def _make() -> BasePlatformAdapter:
|
||||
return _MinAdapter(PlatformConfig(), Platform.LOCAL)
|
||||
|
||||
|
||||
def test_abstract_methods_are_the_known_set():
|
||||
"""The relay adapter must implement exactly this set of abstract methods.
|
||||
|
||||
Everything else on BasePlatformAdapter has a default, so ONE generic
|
||||
RelayAdapter overriding the right subset is feasible without per-platform
|
||||
gateway classes. If a new abstractmethod is added here, the relay design
|
||||
(and the cross-repo contract) must be revisited — hence the lock.
|
||||
|
||||
NOTE: this is four methods, not three — ``get_chat_info`` is abstract too
|
||||
(defined far below the connect/disconnect/send cluster in base.py). The
|
||||
RelayAdapter must implement it (proxying a chat-info lookup to the
|
||||
connector, or returning a descriptor-derived stub).
|
||||
"""
|
||||
abstract = {
|
||||
name
|
||||
for name, member in inspect.getmembers(BasePlatformAdapter)
|
||||
if getattr(member, "__isabstractmethod__", False)
|
||||
}
|
||||
assert abstract == {"connect", "disconnect", "send", "get_chat_info"}
|
||||
|
||||
|
||||
def test_message_len_fn_defaults_to_len():
|
||||
"""message_len_fn is the per-platform length-unit hook (Telegram overrides
|
||||
it for UTF-16). The default is plain ``len``; the relay adapter will
|
||||
override it from its negotiated descriptor's ``len_unit``."""
|
||||
inst = _make()
|
||||
assert inst.message_len_fn("hello") == 5
|
||||
|
||||
|
||||
def test_supports_draft_streaming_defaults_false():
|
||||
"""Draft streaming is opt-in per platform; the consumer falls back to the
|
||||
edit-based path when False. The relay adapter flips this from its
|
||||
descriptor's ``supports_draft_streaming`` flag."""
|
||||
inst = _make()
|
||||
assert inst.supports_draft_streaming() is False
|
||||
|
||||
|
||||
def test_stream_consumer_reads_max_message_length_by_attribute():
|
||||
"""The consumer resolves the per-platform char limit by reading the
|
||||
adapter's ``MAX_MESSAGE_LENGTH`` attribute (defaulting to 4096 when
|
||||
absent). The relay adapter exposes this as an attribute set from its
|
||||
descriptor — so a relay adapter that sets the attribute is chunked
|
||||
correctly with no consumer change.
|
||||
"""
|
||||
from gateway import stream_consumer
|
||||
|
||||
class _NoLimit:
|
||||
pass
|
||||
|
||||
class _WithLimit:
|
||||
MAX_MESSAGE_LENGTH = 1234
|
||||
|
||||
assert getattr(_NoLimit(), "MAX_MESSAGE_LENGTH", 4096) == 4096
|
||||
assert getattr(_WithLimit(), "MAX_MESSAGE_LENGTH", 4096) == 1234
|
||||
# The consumer depends on BasePlatformAdapter for the message_len_fn
|
||||
# isinstance guard (import-level contract the relay adapter satisfies by
|
||||
# subclassing BasePlatformAdapter).
|
||||
assert stream_consumer._BasePlatformAdapter is BasePlatformAdapter
|
||||
Reference in New Issue
Block a user