Compare commits

..

19 Commits

Author SHA1 Message Date
Ben
19399a7e32 test(gateway): live ws-transport round-trip + config-driven registration
- test_ws_transport.py: drives WebSocketRelayTransport against a REAL in-process
  websockets server (not a mock socket): handshake (hello->descriptor), inbound
  frame -> handler, outbound request/response correlation, follow_up routing,
  and clean disconnect failing pending waiters. Skips if websockets is absent.
- test_relay_registration.py: rewritten for the config-driven gate — registers
  when GATEWAY_RELAY_URL is set / an explicit url is passed / force=True; no-op
  without a URL; trailing slash stripped; adapter constructs through the registry.

Full relay suite: 57 passed.
2026-06-10 20:28:47 +10:00
Ben
b075c1ec91 feat(gateway): register relay adapter from config; drop HERMES_GATEWAY_RELAY gate
Wire the relay adapter into gateway startup and make activation config-driven
instead of a dark-launch flag.

- gateway/relay/__init__.py: replace relay_enabled()/HERMES_GATEWAY_RELAY with
  relay_url() (GATEWAY_RELAY_URL env or gateway.relay_url in config.yaml) — the
  same shape as gateway.proxy_url. register_relay_adapter() registers when a URL
  is configured and builds a live WebSocketRelayTransport; with no URL it's a
  no-op (direct/single-tenant deployments unaffected). force=True keeps the
  transport-less adapter for unit tests. relay_platform_identity() reads the
  hello platform/botId from GATEWAY_RELAY_PLATFORM/GATEWAY_RELAY_BOT_ID.
- gateway/run.py: call register_relay_adapter() during GatewayRunner.start(),
  right after plugin discovery, so a configured connector relay is registered
  on every boot. Failures are logged, never block startup.

This removes the dark-launch posture: the relay is on whenever it's configured,
shipping the production end state rather than hiding it behind a flag.
2026-06-10 20:28:39 +10:00
Ben
f325dc71e5 feat(gateway): production WebSocketRelayTransport + descriptor negotiation
Adds the concrete transport behind the RelayTransport Protocol — the missing
'later-phase work' the relay scaffold deferred. The gateway dials OUT to the
connector over a WebSocket and speaks the newline-delimited JSON frame protocol
(docs/relay-connector-contract.md; connector src/relay/protocol.ts):

- connect(): opens the ws, sends hello{platform,botId}, starts a background
  read loop, and resolves handshake() when the connector's descriptor frame
  arrives.
- inbound frames -> the registered InboundHandler (rebuilt into a MessageEvent
  via _event_from_wire, mapping the snake_case SessionSource wire form back
  onto the gateway dataclasses).
- send_outbound / send_follow_up / get_chat_info: request/response correlated
  by a uuid requestId against a per-request future, with a timeout so a caller
  never hangs; send_interrupt is fire-and-forget.
- disconnect(): cancels the reader, closes the ws, and fails any in-flight
  outbound waiters with a structured error.

RelayAdapter.connect() now negotiates the real CapabilityDescriptor from the
transport and adopts it (_apply_descriptor updates MAX_MESSAGE_LENGTH +
markdown surface), replacing the construction-time placeholder. Lazy
'import websockets' mirrors gateway/platforms/feishu.py; WEBSOCKETS_AVAILABLE
gates construction.
2026-06-10 20:28:27 +10:00
Ben
ce120f0473 docs(gateway): rewrite contract §6 to the A2 trust-boundary model
The contract's §6 still said the connector 'forwards the signed body
byte-for-byte so the gateway's existing crypto validates against unmodified
bytes.' That model is incoherent under an untrusted, disposable tenant
gateway on a shared bot:

- re-validating Twilio HMAC / WeCom crypto needs the shared signing secret
  (handing it over IS the cross-tenant leak),
- WeCom payloads are encrypted with that secret (the connector must decrypt
  at the edge just to route),
- a Discord interaction token lives inside the signed body — you can't both
  preserve the bytes and strip the credential.

Rewrites §6 to the actual model: the connector is the SOLE crypto/identity
boundary — verifies/decrypts at the edge, normalizes to a tenant-scoped
MessageEvent, strips shared-identity capabilities into its vault, and
forwards only the sanitized event. The gateway re-validates nothing (the
invariant test from the crypto-shed commit enforces this). Notes that this
unifies the passthrough + relay planes and points to the connector repo's
capability-trust-boundary.md.

Also documents the follow_up op in §4 (token-less capability action added
in the previous commit). The conformance test (§2/§3 tables) stays green;
contract is unpublished/EXPERIMENTAL so no version-bump ceremony. 55 passed.
2026-06-10 19:49:08 +10:00
Ben
6dd4caf378 feat(gateway): token-less follow_up outbound op (A2 capability action)
The relay outbound surface had send/edit/typing but no way to act on a
SHARED-identity capability (e.g. a Discord interaction follow-up token,
~15min) that the connector captured + stripped at the edge. Under A2 that
credential never reaches the gateway, so the gateway can't just 'send with
the token' — it needs a semantic op naming the session it's already in.

Adds the follow_up op end to end on the gateway side:
- RelayTransport.send_follow_up(action): protocol method. Action carries
  op='follow_up' + session_key + kind + content (+ metadata) and NO token.
- RelayAdapter.send_follow_up(session_key, kind, content, metadata): builds
  that action and returns a SendResult. The connector resolves the real
  capability (its resolveOutboundCapability), enforces the tenant match so
  tenant B can't wield tenant A's capability, and egresses; success=False
  when the capability is absent/expired/mismatched (nothing to retry — a
  leaked gateway holds zero capability material).
- StubConnector records follow_ups + a canned next_follow_up_result.

Tests: round-trips without a token; the wire action carries only session
refs (no credential value field — the 'kind' string is a type ref, not the
secret); failure surfaces when the connector can't resolve; no-transport
fails cleanly. 55 passed. §4 doc entry follows in the contract-rewrite commit.
2026-06-10 19:48:00 +10:00
Ben
d603371644 test(gateway): shed platform crypto from the relay path (A2 invariant)
Under the A2 trust model the connector is the SOLE crypto/identity
boundary: it verifies/decrypts every inbound platform payload at the edge
(it holds the tenant secrets), normalizes to a tenant-scoped MessageEvent,
and forwards only the sanitized event. The gateway re-validates nothing —
it cannot without being handed the shared signing secret, which on a
shared bot is itself the cross-tenant leak.

The relay path already imports no platform-crypto today; this locks that
in as an enforced invariant so nobody bolts re-validation (Discord
ed25519, Twilio HMAC, WeCom BizMsgCrypt, generic webhook signature checks)
onto the relay later and silently re-couples the gateway to platform
secrets it must never hold. Verification stays in the direct platform
adapters (gateway/platforms/*) which serve non-relay deployments.

- test_relay_package_imports_no_platform_crypto: AST-walks gateway/relay/*
  and fails on any import of a platform-crypto/verification module.
- test_relay_package_calls_no_signature_verification: fails on any
  verification-symbol reference (ed25519/hmac/bizmsg/verify_*).

Invariants (assert the relation 'relay re-validates nothing'), not frozen
snapshots. Verified the guard bites: injecting a wecom_crypto import makes
it fail, removing it goes green. docs §6 rewrite follows in a later commit.
2026-06-10 19:45:08 +10:00
Ben
4f59b4c657 test(gateway): Telegram relay round-trip (Phase 1 generalization proof)
The Phase 1 exit gate requires BOTH Discord and Telegram to round-trip
through the relay stub, but test_relay_roundtrip.py only covered Discord.
Add the Telegram companion exercising its distinct discriminator profile:

- no guild_id — two chats isolate on chat_id alone
- forum topics share one chat_id and isolate by thread_id (the Telegram
  analog of Discord per-guild isolation), shared across participants by
  default (thread_sessions_per_user=False)
- DM isolation by chat_id
- utf16 len_unit + markdown_v2 dialect round-trip and configure the adapter
- outbound send round-trips through the stub

Proves the CapabilityDescriptor + build_session_key generalize beyond
Discord, not just the struct (which the descriptor unit tests already
covered).
2026-06-09 15:59:51 +10:00
Ben
0d585df15d test(gateway): enforce relay contract-doc ⟷ Python conformance
Add an invariant test pinning docs/relay-connector-contract.md to the
Python source of truth so the doc (which the connector repo mirrors by
hand) cannot silently drift:

- CapabilityDescriptor §2 table ⟷ dataclass fields + required/optional
- SessionSource wire keys (to_dict output) ⟷ §3 documented fields
- per-platform discriminator columns exist as real SessionSource fields
- guard that is_bot stays off the wire until deliberately promoted

Writing the test surfaced a real gap: §3 only enumerated 5 discriminators
in its per-platform table while to_dict() emits 12 keys. Seven wire keys
the connector must populate (chat_name, chat_topic, user_id_alt,
chat_id_alt, parent_chat_id, message_id, user_name) were undocumented —
a connector author reading the doc would never know to set them. Added a
complete SessionSource wire-field table to §3. The connector's existing
contract.ts already carries all 12, so no connector change is needed; the
doc was the lagging artifact.
2026-06-09 15:57:24 +10:00
Ben
ada43042f8 fix(gateway): register relay connection checker
The platform-connected-checker invariant test requires every built-in
Platform enum member to have either a generic token path or a bespoke
entry in _PLATFORM_CONNECTED_CHECKERS. Platform.RELAY was added without
one, so test_all_builtins_have_checker_or_generic_token_path failed.

Relay dials OUT to a connector and is 'connected' once an endpoint URL
is configured (extra['relay_url'] or extra['url']); the capability
descriptor is negotiated at handshake time, so the URL is the only
config-level signal in the experimental phase. Add the checker plus a
synthetic-config case exercising its True path.
2026-06-09 14:08:23 +10:00
Ben
706b359cf8 Merge remote-tracking branch 'origin/main' into feat/gateway-relay-adapter 2026-06-09 13:59:08 +10:00
Ben
5869d594ab test(relay): assert connector stub never leaks into production paths
CI guard: fails if gateway/ or plugins/ ever imports the test-only stub
connector or defines StubConnector. Matches code leaks (imports / class defs),
not prose mentions, so the transport.py docstring reference to the stub's path
is allowed.

Phase 1 complete. Task 1.6 of the gateway-relay plan.
2026-06-08 15:20:51 +10:00
Ben
1b3491e8b5 docs: relay<->connector cross-repo contract (v1, experimental)
Formal interface between the Hermes gateway (RelayAdapter) and the Node
connector repo: handshake, CapabilityDescriptor field table, MessageEvent
inbound envelope with per-platform SessionSource discriminators (Discord
guild_id is REQUIRED for server isolation), outbound action set, /stop
interrupt routing, signed-body verify-at-edge/byte-preserving rule, and the
additive-only contract_version policy. Documents bot-identity-vs-tenant
separation so single-bot consolidation (Phase 6) stays open. Read-first
artifact for the connector implementer.

Phase 1, Task 1.5 of the gateway-relay plan.
2026-06-08 15:19:47 +10:00
Ben
96e138ed24 feat(relay): route mid-turn /stop over relay interrupt channel
RelayAdapter.on_interrupt(session_key, chat_id) bridges a connector-delivered
mid-turn /stop into the existing interrupt_session_activity path, setting the
per-session _active_sessions Event and clearing typing — cancelling exactly the
targeted session's turn without touching siblings (mirrors test_stop_thread_
sibling isolation). Transport.send_interrupt carries the gateway-side egress to
the connector for socket-owner routing.

Phase 1, Task 1.4 of the gateway-relay plan.
2026-06-08 15:18:53 +10:00
Ben
75a12474ec feat(relay): register RelayAdapter through platform registry (flagged off by default)
register_relay_adapter() registers the generic 'relay' platform via the same
PlatformRegistry path as plugin adapters — no core dispatch changes. OFF by
default (dark-launch): only registers when HERMES_GATEWAY_RELAY is truthy (or
force=True for tests), so existing single-tenant/direct deployments are
unaffected. Factory builds a transport-less RelayAdapter with a placeholder
descriptor; the real descriptor is negotiated at handshake.

Phase 1, Task 1.3 of the gateway-relay plan.
2026-06-08 15:17:43 +10:00
Ben
6729118a4a feat(relay): transport protocol + test-only stub connector
Defines RelayTransport (lifecycle/handshake/inbound/outbound/interrupt) as the
gateway<->connector wire contract; RelayAdapter.connect now registers an inbound
handler that bridges connector-delivered MessageEvents into handle_message.
Adds an in-memory StubConnector under tests/ and an E2E round-trip proving:
connect registers the handler, inbound events reach the adapter, guild_id drives
build_session_key isolation (two guilds -> two keys; same guild/channel/user ->
one), outbound send round-trips, get_chat_info is proxied.

Phase 1, Task 1.2 of the gateway-relay plan.
2026-06-08 15:16:41 +10:00
Ben
eaf1721b9f feat(relay): generic RelayAdapter advertising negotiated capabilities
One BasePlatformAdapter subclass that reads its capability profile from a
CapabilityDescriptor: MAX_MESSAGE_LENGTH attribute, message_len_fn (table-driven
by len_unit: chars=len, utf16=Telegram-style code units), supports_draft_streaming.
Implements the four abstract methods (connect/disconnect/send/get_chat_info) by
delegating to an injected RelayTransport (full protocol lands in Task 1.2). Adds
Platform.RELAY enum member. No per-platform gateway code.

Phase 1, Task 1.1 of the gateway-relay plan.
2026-06-08 15:14:08 +10:00
Ben
593fba5f5d feat(relay): derive descriptor from PlatformEntry
CapabilityDescriptor.from_platform_entry() projects an existing PlatformEntry
(label, max_message_length, emoji, platform_hint, pii_safe, name) into a
descriptor, proving the descriptor is a projection of existing config rather
than a parallel concept. Runtime-only capabilities (len_unit, draft/edit/
thread/markdown) are caller-supplied. max_message_length==0 ('no limit') maps
to the stream_consumer 4096 default.

Phase 0 complete. Task 0.3 of the gateway-relay plan.
2026-06-08 15:08:41 +10:00
Ben
2b09c95c33 feat(relay): experimental CapabilityDescriptor schema
Frozen, JSON-serializable handshake payload the connector hands the future
RelayAdapter: char limit, draft-streaming/edit/threading flags, markdown
dialect, len_unit. Mostly a wire projection of PlatformEntry + the adapter
capability methods. contract_version gates additive-only evolution; declared
EXPERIMENTAL until >=2 Class-1 platforms validate it. from_json ignores
unknown keys (forward-compat) and fills optional defaults.

Phase 0, Task 0.2 of the gateway-relay plan.
2026-06-08 15:07:45 +10:00
Ben
812a2977bd test: lock gateway adapter capability surface (relay phase 0)
Behavioral regression harness locking the capability surface that the future
RelayAdapter must reproduce: the abstract-method set (connect/disconnect/send/
get_chat_info), message_len_fn default, supports_draft_streaming default, and
the stream_consumer MAX_MESSAGE_LENGTH attribute read. Passes on main before
any RelayAdapter exists.

Phase 0, Task 0.1 of the gateway-relay plan.
2026-06-08 15:06:25 +10:00
47 changed files with 3928 additions and 4552 deletions

76
cli.py
View File

@@ -7302,66 +7302,24 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
self._handle_browser_command(cmd_original)
elif canonical == "plugins":
try:
# Discover from disk (bundled + user), matching `hermes plugins
# list` — so installed-but-not-enabled plugins are visible here
# too. The plugin manager only knows about *loaded* plugins, so
# using it alone made freshly-installed, not-yet-enabled plugins
# look like "nothing installed".
from hermes_cli.plugins_cmd import (
_discover_all_plugins,
_get_disabled_set,
_get_enabled_set,
_plugin_status,
)
entries = _discover_all_plugins()
enabled = _get_enabled_set()
disabled = _get_disabled_set()
# `/plugins` is a quick glance — default to user-installed
# plugins (what the user actually added). Bundled provider/
# platform plugins are summarized on one line; the full
# catalog lives behind `hermes plugins list`.
user_entries = [e for e in entries if e[3] != "bundled"]
bundled_count = len(entries) - len(user_entries)
if not user_entries:
print("No user plugins installed.")
print(" Install one: hermes plugins install owner/repo")
print(f" Or drop a plugin directory into {display_hermes_home()}/plugins/")
if bundled_count:
print(f" ({bundled_count} bundled plugins available — see: hermes plugins list)")
from hermes_cli.plugins import get_plugin_manager
mgr = get_plugin_manager()
plugins = mgr.list_plugins()
if not plugins:
print("No plugins installed.")
print(f"Drop plugin directories into {display_hermes_home()}/plugins/ to get started.")
else:
# Loaded-plugin details (tools/hooks/commands counts, errors)
# keyed by name, when available.
loaded: dict = {}
try:
from hermes_cli.plugins import get_plugin_manager
for p in get_plugin_manager().list_plugins():
loaded[p["name"]] = p
except Exception:
loaded = {}
print(f"User plugins ({len(user_entries)}):")
for name, version, _desc, source, _dir, key in sorted(user_entries):
state = _plugin_status(name, enabled, disabled, key=key)
glyph = {"enabled": "", "disabled": ""}.get(state, "")
ver = f" v{version}" if version else ""
info = loaded.get(name) or {}
bits = []
if info.get("tools"):
bits.append(f"{info['tools']} tools")
if info.get("hooks"):
bits.append(f"{info['hooks']} hooks")
if info.get("commands"):
bits.append(f"{info['commands']} commands")
detail = f" ({', '.join(bits)})" if bits else ""
label = "" if state == "enabled" else f" [{state}]"
error = f"{info['error']}" if info.get("error") else ""
print(f" {glyph} {name}{ver}{label}{detail}{error}")
if bundled_count:
print(f" (+{bundled_count} bundled — see: hermes plugins list)")
print(" Enable/disable: hermes plugins enable/disable <name>")
print(f"Plugins ({len(plugins)}):")
for p in plugins:
status = "" if p["enabled"] else ""
version = f" v{p['version']}" if p["version"] else ""
tools = f"{p['tools']} tools" if p["tools"] else ""
hooks = f"{p['hooks']} hooks" if p["hooks"] else ""
commands = f"{p['commands']} commands" if p.get("commands") else ""
parts = [x for x in [tools, hooks, commands] if x]
detail = f" ({', '.join(parts)})" if parts else ""
error = f"{p['error']}" if p["error"] else ""
print(f" {status} {p['name']}{version}{detail}{error}")
except Exception as e:
print(f"Plugin system error: {e}")
elif canonical == "rollback":

View File

@@ -0,0 +1,213 @@
# Relay ↔ Connector Contract (v1, EXPERIMENTAL)
> **Status:** EXPERIMENTAL. This contract MAY CHANGE without a deprecation
> cycle until at least two real Class-1 platforms (Discord + Telegram) have
> validated it. Evolution during the experimental phase is **additive-only**,
> gated by `contract_version`. A breaking change updates both repos in lockstep.
This document is the formal interface between the **Hermes gateway** (Python,
`gateway/relay/`) and the **connector** (Node/TypeScript,
`NousResearch/gateway-gateway`). The connector implementer's first action is to
read this file.
The gateway runs a generic `RelayAdapter` that dials **out** to the connector,
receives a `CapabilityDescriptor` at handshake, then exchanges normalized
`MessageEvent`s (inbound) and actions (outbound) over a per-turn bidirectional
WebSocket. The gateway never learns which concrete platform is fronting it; the
connector owns all platform-specific socket/identity logic.
---
## 1. Handshake
1. Gateway opens the transport (`connect`).
2. Gateway calls `handshake()`; connector returns a `CapabilityDescriptor`
(section 2) describing the platform this adapter instance fronts.
3. Gateway configures the adapter from the descriptor (char limit, length unit,
draft/edit/thread/markdown capabilities) and registers an inbound handler.
4. Connector then streams inbound events and accepts outbound actions.
`contract_version` (currently `1`) is carried in the descriptor. The gateway
ignores unknown descriptor fields (forward-compat) and fills missing optional
fields from defaults.
---
## 2. CapabilityDescriptor (handshake payload)
JSON object. Source of truth: `gateway/relay/descriptor.py`.
| Field | Type | Required | Meaning |
| --- | --- | --- | --- |
| `contract_version` | int | yes | Contract version (additive-only within a version). |
| `platform` | string | yes | Platform name (e.g. `"discord"`, `"telegram"`). |
| `label` | string | yes | Human-readable label. |
| `max_message_length` | int | yes | Char limit; gateway exposes as `MAX_MESSAGE_LENGTH`. 0 → treat as 4096. |
| `supports_draft_streaming` | bool | yes | Native draft-streaming preview support. |
| `supports_edit` | bool | yes | Edit-based streaming possible; if false, consumer degrades to one-message-per-segment. |
| `supports_threads` | bool | yes | `create_handoff_thread` capability. |
| `markdown_dialect` | string | yes | `"plain"`, `"markdown_v2"`, `"discord"`, … (drives `supports_code_blocks`). |
| `len_unit` | string | yes | `"chars"` (builtin len) or `"utf16"` (Telegram UTF-16 code units). |
| `emoji` | string | no | Display emoji (default 🔌). |
| `platform_hint` | string | no | System-prompt platform hint. |
| `pii_safe` | bool | no | Redact PII in session descriptions. |
Most fields are a projection of the gateway's existing `PlatformEntry`; the
runtime-only fields (`len_unit`, `supports_*`, `markdown_dialect`) come from the
live platform adapter's capability methods.
---
## 3. Inbound: `MessageEvent` envelope
The connector normalizes each platform wire event into a `MessageEvent`
(`gateway/platforms/base.py`) and delivers it to the gateway's inbound handler.
The gateway keys the session via `build_session_key()` from the embedded
`SessionSource` — so populating the right discriminators is the single
highest-correctness responsibility of the connector.
### SessionSource fields (the wire surface)
Source of truth: `SessionSource.to_dict()` in `gateway/session.py`. These are
every key the gateway accepts on the wire. `platform`, `chat_id`, `chat_type`,
`user_id`, `user_name`, `thread_id`, `chat_name`, and `chat_topic` are always
present (may be `null`); the rest are included only when set.
| Field | Type | Always sent | Meaning |
| --- | --- | --- | --- |
| `platform` | string | yes | Platform name (matches the descriptor's `platform`). |
| `chat_id` | string | yes | Primary conversation id (channel/chat). Session-key discriminator. |
| `chat_type` | string | yes | `dm` / `group` / `channel` / `thread` / `forum`. |
| `chat_name` | string\|null | yes | Human-readable chat name. |
| `user_id` | string\|null | yes | Message author id. Session-key discriminator. |
| `user_name` | string\|null | yes | Author display name. |
| `thread_id` | string\|null | yes | Thread/forum-topic id when in a thread. Session-key discriminator. |
| `chat_topic` | string\|null | yes | Channel topic/description (Discord, Slack). |
| `user_id_alt` | string | no | Platform-specific stable alt id (Signal UUID, Feishu union_id). |
| `chat_id_alt` | string | no | Alternate chat id (e.g. Signal group internal id). |
| `guild_id` | string | no | Discord guild / Slack workspace / Matrix server scope. **REQUIRED for Discord server isolation.** Session-key discriminator. |
| `parent_chat_id` | string | no | Parent channel when `chat_id` refers to a thread. |
| `message_id` | string | no | Id of the triggering message (for pin/reply/react). |
> `is_bot` (author-is-a-bot/webhook classification) exists on the gateway-side
> dataclass but is **intentionally NOT on the wire** in v1 — it is not part of
> `to_dict()`. Do not add it to the connector's `SessionSource` until it is
> first added here and to `to_dict()` (additive bump).
### SessionSource discriminators per platform
| Platform | chat_id | chat_type | user_id | thread_id | guild_id |
| --- | --- | --- | --- | --- | --- |
| **Discord** | channel id | `dm`/`group`/`thread` | author id | thread channel id (threads) | **guild id** (REQUIRED for server isolation) |
| **Telegram** | chat id | `dm`/`group`/`forum` | from id | forum topic id (forums) | — |
**Get Discord's `guild_id` wrong and two servers collide into one session.**
This is the #1 High-severity risk. The gateway's `build_session_key()` is the
conformance oracle: for a given `SessionSource`, the connector's normalization
must produce the same key the Python adapter would. (The Phase-1 stub tests
assert known-input → known-key.)
### Bot identity vs tenant (single-bot consolidation, Appendix A)
The envelope carries the **originating bot identity** as a field **distinct from
tenant**. Tenant is resolved from the event's own discriminator (Discord
`guild_id`, Telegram `chat_id`, webhook path/subdomain) — **never** from which
token/socket/process delivered it. This keeps one shared bot able to front many
tenants (Phase 6) without overloading an existing field.
---
## 4. Outbound: action set
The gateway calls the transport with action dicts. Source of truth:
`gateway/relay/transport.py` + `gateway/relay/adapter.py`.
| `op` | Fields | Result |
| --- | --- | --- |
| `send` | `chat_id`, `content`, `reply_to?`, `metadata?` | `{success: bool, message_id?, error?}` |
| `edit` | `chat_id`, `message_id`, `content`, `metadata?` | `{success: bool, error?}` |
| `typing` | `chat_id` | `{success: bool}` |
| `follow_up` | `session_key`, `kind`, `content`, `metadata?` | `{success: bool, message_id?, error?}` |
`get_chat_info(chat_id)` is a separate proxied call returning at least
`{name, type}`. Media actions follow the same envelope shape (deferred to a
later contract revision; additive).
**`follow_up` (A2 capability action).** Some inbound payloads carry a credential
that acts on the **shared** bot identity (e.g. a Discord interaction follow-up
token). Per §6 the connector strips that at the edge and binds it in its
capability vault keyed by the session; it **never reaches the gateway**. To use
it, the gateway issues `follow_up` naming the **session it is already in**
(`session_key`) plus the capability `kind` (e.g. `discord.interaction_token`) —
**never a token**. The connector resolves the real value from its vault,
enforces the tenant match (tenant B can never wield tenant A's capability), and
egresses. `success: false` when the capability is absent/expired or the tenant
doesn't match — the gateway has nothing to retry with, by design (a leaked
gateway holds zero capability material). Source of truth:
`gateway/relay/transport.py` (`send_follow_up`) + `gateway/relay/adapter.py`.
---
## 5. Interrupt (`/stop`) routing
- **Gateway → connector:** `send_interrupt(session_key, reason?)` egresses a
mid-turn `/stop`. The connector MUST forward it down the socket owned by the
gateway instance running that `session_key` (the routing invariant).
- **Connector → gateway:** an inbound interrupt for a `session_key` is bridged
by the adapter's `on_interrupt(session_key, chat_id)` into the existing
per-session interrupt mechanism, cancelling exactly that turn (siblings
untouched).
The interrupt rides the same per-turn bidirectional socket as inbound/outbound.
---
## 6. Trust boundary & signed-body handling (A2)
**The connector is the sole crypto/identity boundary. The gateway re-validates
nothing.**
Webhook signatures (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt) are
computed over exact raw bytes, and some payloads are *encrypted* with a shared
secret. The connector fronts a **shared** bot for many tenants and holds every
tenant's platform secrets, so it:
- **verifies / decrypts at the edge** (the only place the secrets live),
- **normalizes** the payload into a tenant-scoped `MessageEvent` (§3),
- **strips any shared-identity capability** out of the payload and binds it in
its capability vault, keyed by the session (see §4 `follow_up`),
- **forwards only the sanitized `MessageEvent`** — never the raw signed body.
The gateway therefore performs **no** platform signature/crypto verification on
the relay path; it trusts the normalized event. This is an enforced invariant on
the gateway side (`tests/gateway/relay/test_relay_sheds_crypto.py`: the relay
package imports/calls no platform-crypto).
**Why not "forward the signed body byte-for-byte so the gateway re-validates"?**
That earlier model is incoherent under an untrusted, disposable tenant gateway:
- Re-validating Twilio HMAC / WeCom crypto would require handing the gateway the
**shared signing secret** — which is itself the leak, and on a shared bot it's
a *cross-tenant* leak.
- WeCom payloads are encrypted with the shared secret; the connector must decrypt
at the edge just to route, so forwarding ciphertext would again require giving
the gateway the secret.
- A Discord interaction token lives **inside** the signed JSON body — you cannot
both preserve the bytes and strip the credential; they are the same bytes.
So byte-preservation is abandoned deliberately: the connector re-serializes the
sanitized event and the gateway trusts it. This also unifies the passthrough and
relay planes — both are "verify at the edge → emit a normalized event," differing
only in transport. See `docs/capability-trust-boundary.md` (connector repo:
`gateway-gateway`) for the full A2 rationale and the connector-side vault.
---
## 7. Versioning policy
- `contract_version` is an int; bump **only** for additive changes during the
experimental phase (new optional fields, new `op`s).
- A breaking change (renamed/removed field, changed semantics) requires a
coordinated update of both repos and a version bump.
- The connector's first PR references the commit SHA of this file it implements
against.

View File

@@ -163,6 +163,7 @@ class Platform(Enum):
BLUEBUBBLES = "bluebubbles"
QQBOT = "qqbot"
YUANBAO = "yuanbao"
RELAY = "relay" # generic relay adapter fronted by the connector (EXPERIMENTAL)
@classmethod
def _missing_(cls, value):
"""Accept unknown platform names only for known plugin adapters.
@@ -488,6 +489,13 @@ _PLATFORM_CONNECTED_CHECKERS: dict[Platform, Callable[[PlatformConfig], bool]] =
(cfg.extra.get("client_id") or os.getenv("DINGTALK_CLIENT_ID"))
and (cfg.extra.get("client_secret") or os.getenv("DINGTALK_CLIENT_SECRET"))
),
# Relay dials OUT to a connector; it is "connected" once an endpoint URL is
# configured (extra["relay_url"] or extra["url"]). The capability descriptor
# is negotiated at handshake time, so the URL is the only config-level
# signal in the experimental phase. EXPERIMENTAL — may change.
Platform.RELAY: lambda cfg: bool(
cfg.extra.get("relay_url") or cfg.extra.get("url")
),
}

112
gateway/relay/__init__.py Normal file
View File

@@ -0,0 +1,112 @@
"""Relay/connector support package for the Hermes gateway.
EXPERIMENTAL. This package implements the gateway side of the "Gateway Gateway"
relay design: a generic ``RelayAdapter`` plus the wire-serializable
``CapabilityDescriptor`` the connector hands it at handshake time, and the
production ``WebSocketRelayTransport`` that dials the connector. The public API
(module names, descriptor field set, transport protocol) MAY CHANGE without a
deprecation cycle until at least two real Class-1 platforms (Discord + Telegram)
have shaken out the schema.
See ``docs/relay-connector-contract.md`` for the formal cross-repo interface.
Activation is driven by configuration, not a separate feature flag: the relay
platform is registered when a connector relay URL is configured
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml). Deployments
that don't set it are unaffected — exactly the same shape as ``gateway.proxy_url``.
"""
from __future__ import annotations
import os
from typing import Optional
def relay_url() -> Optional[str]:
"""The connector relay endpoint URL, or None when relay is not configured.
Checks ``GATEWAY_RELAY_URL`` (convenient for Docker) first, then
``gateway.relay_url`` in config.yaml. A non-empty value activates the relay
platform; absence means a normal direct/single-tenant gateway.
"""
url = os.environ.get("GATEWAY_RELAY_URL", "").strip()
if url:
return url.rstrip("/")
try:
from gateway.run import _load_gateway_config # late import to avoid cycle
cfg = _load_gateway_config()
url = (cfg.get("gateway") or {}).get("relay_url", "").strip()
if url:
return url.rstrip("/")
except Exception: # noqa: BLE001 - config absence/parse must never crash registration
pass
return None
def relay_platform_identity() -> tuple[str, str]:
"""Platform + bot id this gateway fronts over the relay (for the handshake hello).
Defaults to ``("relay", "")``; overridable via ``GATEWAY_RELAY_PLATFORM`` /
``GATEWAY_RELAY_BOT_ID`` so one connector can front several platforms.
"""
platform = os.environ.get("GATEWAY_RELAY_PLATFORM", "relay").strip() or "relay"
bot_id = os.environ.get("GATEWAY_RELAY_BOT_ID", "").strip()
return platform, bot_id
def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool:
"""Register the generic ``relay`` platform via the platform registry.
Registers when a relay URL is configured (or ``force=True`` for tests, which
builds a transport-less adapter — the unit-test posture). Returns True if
registration happened. Additive: uses the same registry path as plugin
adapters, so no core dispatch changes are needed.
When a URL is present the factory builds a live ``WebSocketRelayTransport``;
the ``RelayAdapter`` negotiates the real ``CapabilityDescriptor`` at
``connect()`` time via ``transport.handshake()``.
"""
resolved_url = url if url is not None else relay_url()
if not (force or resolved_url):
return False
from gateway.platform_registry import PlatformEntry, platform_registry
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
platform, bot_id = relay_platform_identity()
def _factory(config):
# Placeholder descriptor; replaced by the negotiated one at connect time
# when a transport is present. With no URL (force/test) the adapter is
# transport-less and keeps the placeholder.
placeholder = CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform=platform,
label="Relay",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=False,
markdown_dialect="plain",
len_unit="chars",
)
transport = None
if resolved_url:
from gateway.relay.ws_transport import WebSocketRelayTransport
transport = WebSocketRelayTransport(resolved_url, platform, bot_id)
return RelayAdapter(config, placeholder, transport=transport)
platform_registry.register(
PlatformEntry(
name="relay",
label="Relay",
adapter_factory=_factory,
check_fn=lambda: True,
source="builtin",
emoji="\U0001f50c",
)
)
return True

178
gateway/relay/adapter.py Normal file
View File

@@ -0,0 +1,178 @@
"""RelayAdapter — one generic gateway adapter fronted by the connector. EXPERIMENTAL.
A single ``BasePlatformAdapter`` subclass that, at handshake, receives a
``CapabilityDescriptor`` from the connector telling it which platform it is
fronting and which capabilities to advertise to the ``GatewayStreamConsumer``.
It implements the four abstract methods (``connect`` / ``disconnect`` / ``send``
/ ``get_chat_info``) plus the capability surface (``MAX_MESSAGE_LENGTH``,
``message_len_fn``, ``supports_draft_streaming``) by delegating wire I/O to an
injected transport and reading capabilities off the descriptor.
There is NO per-platform gateway code: the connector is the only side that knows
"this chat_id maps to a Discord channel, send it via the Discord websocket."
The gateway sees an ordinary ``MessageEvent`` in and calls ``adapter.send`` out.
EXPERIMENTAL: the transport protocol and descriptor schema may change without a
deprecation cycle until >=2 Class-1 platforms validate them.
"""
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, Optional
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import RelayTransport
logger = logging.getLogger(__name__)
def _utf16_len(text: str) -> int:
"""Count UTF-16 code units (Telegram's length unit)."""
return len(text.encode("utf-16-le")) // 2
# Table-driven length-unit selection from the descriptor's ``len_unit``.
_LEN_FNS: Dict[str, Callable[[str], int]] = {
"chars": len,
"utf16": _utf16_len,
}
class RelayAdapter(BasePlatformAdapter):
"""Generic relay adapter advertising a connector-negotiated capability profile."""
def __init__(
self,
config: PlatformConfig,
descriptor: CapabilityDescriptor,
transport: Optional[RelayTransport] = None,
) -> None:
# The relay adapter fronts many platforms but presents as a single
# logical platform to the runner; Platform.RELAY identifies it.
super().__init__(config, Platform.RELAY)
self.descriptor = descriptor
self._transport = transport
# Capability surface read by stream_consumer (getattr(..., 4096)).
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
# ── capability surface (from descriptor) ─────────────────────────────
@property
def message_len_fn(self) -> Callable[[str], int]:
return _LEN_FNS.get(self.descriptor.len_unit, len)
def supports_draft_streaming(
self,
chat_type: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
return self.descriptor.supports_draft_streaming
# ── abstract methods (delegated to the transport) ────────────────────
async def connect(self) -> bool:
if self._transport is None:
raise RuntimeError("RelayAdapter has no transport configured")
self._transport.set_inbound_handler(self._on_inbound)
ok = await self._transport.connect()
if not ok:
return False
# Negotiate the real capability descriptor from the connector and adopt
# it — the placeholder passed at construction is replaced by what the
# connector advertises for the platform this gateway actually fronts.
try:
descriptor = await self._transport.handshake()
except Exception as exc: # noqa: BLE001 - a failed handshake = a failed connect
logger.warning("relay handshake failed: %s", exc)
return False
self._apply_descriptor(descriptor)
return True
def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None:
"""Adopt a (re)negotiated descriptor into the live capability surface."""
self.descriptor = descriptor
self.MAX_MESSAGE_LENGTH = descriptor.max_message_length
self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain")
async def _on_inbound(self, event) -> None:
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
await self.handle_message(event)
async def on_interrupt(self, session_key: str, chat_id: str) -> None:
"""Bridge a connector-delivered /stop into the adapter's interrupt path.
The connector forwards a mid-turn interrupt down the socket owned by
the gateway instance running ``session_key``; this routes it to the
existing per-session interrupt mechanism (sets the
``_active_sessions[session_key]`` Event and clears typing), cancelling
the right turn without touching sibling sessions.
"""
await self.interrupt_session_activity(session_key, chat_id)
async def disconnect(self) -> None:
if self._transport is not None:
await self._transport.disconnect()
async def send(
self,
chat_id: str,
content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
if self._transport is None:
return SendResult(success=False, error="no transport")
result = await self._transport.send_outbound(
{
"op": "send",
"chat_id": chat_id,
"content": content,
"reply_to": reply_to,
"metadata": metadata or {},
}
)
return SendResult(
success=bool(result.get("success")),
message_id=result.get("message_id"),
error=result.get("error"),
)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
# Proxied to the connector (it owns the platform connection / cache).
if self._transport is None:
return {"name": chat_id, "type": "dm"}
return await self._transport.get_chat_info(chat_id)
async def send_follow_up(
self,
session_key: str,
kind: str,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send via a shared-identity capability bound to a session (A2 outbound).
The gateway never holds the credential: it names the session it is
already in plus the capability ``kind``, and the connector resolves the
real value from its vault and egresses (enforcing the tenant match). Used
e.g. to post a Discord interaction follow-up as the shared bot without
the token ever reaching the gateway. See RelayTransport.send_follow_up.
"""
if self._transport is None:
return SendResult(success=False, error="no transport")
result = await self._transport.send_follow_up(
{
"op": "follow_up",
"session_key": session_key,
"kind": kind,
"content": content,
"metadata": metadata or {},
}
)
return SendResult(
success=bool(result.get("success")),
message_id=result.get("message_id"),
error=result.get("error"),
)

118
gateway/relay/descriptor.py Normal file
View File

@@ -0,0 +1,118 @@
"""CapabilityDescriptor — the relay handshake payload. EXPERIMENTAL.
The connector hands a ``CapabilityDescriptor`` to the gateway's ``RelayAdapter``
at handshake time; it tells the adapter which platform it is fronting and which
capabilities to advertise to the ``GatewayStreamConsumer`` (char limit,
draft-streaming, edit/threading support, markdown dialect, length unit). It is
the linchpin of the generalization: one gateway adapter serves Discord,
Telegram, Matrix, Signal, ... without per-platform branching.
EXPERIMENTAL: this schema MAY CHANGE without a deprecation cycle until at least
two real Class-1 platforms have validated it. Evolution during the experimental
phase is additive-only, gated by ``contract_version`` (see
docs/relay-connector-contract.md).
Field origins (most are a wire-serializable projection of ``PlatformEntry`` plus
the per-instance capability methods on ``BasePlatformAdapter``):
- ``max_message_length`` -> ``PlatformEntry.max_message_length`` / adapter
``MAX_MESSAGE_LENGTH`` attribute (read by stream_consumer).
- ``len_unit`` -> selects which ``message_len_fn`` the adapter installs
("chars" = builtin len; "utf16" = Telegram-style UTF-16 code-unit counting).
- ``supports_draft_streaming`` -> adapter ``supports_draft_streaming()`` probe.
- ``supports_edit`` -> whether edit-based streaming is possible (Discord/
Telegram yes; Signal/SMS no -> consumer degrades to one-message-per-segment).
- ``supports_threads`` -> ``create_handoff_thread`` capability flag.
- ``markdown_dialect`` -> presentation hint (e.g. "markdown_v2", "discord").
- ``emoji`` / ``platform_hint`` / ``pii_safe`` -> ``PlatformEntry`` fields of the
same name.
"""
from __future__ import annotations
import json
from dataclasses import asdict, dataclass
# Bump additively (never reinterpret an existing field) during the experimental
# phase; a breaking change requires updating both repos in lockstep.
CONTRACT_VERSION = 1
@dataclass(frozen=True)
class CapabilityDescriptor:
"""Immutable capability descriptor negotiated at relay handshake.
Frozen so a descriptor cannot be mutated after handshake — the adapter
advertises a fixed capability profile for the life of the connection.
"""
contract_version: int
platform: str
label: str
max_message_length: int
supports_draft_streaming: bool
supports_edit: bool
supports_threads: bool
markdown_dialect: str
len_unit: str # "chars" | "utf16"
emoji: str = "\U0001f50c" # 🔌 default (matches PlatformEntry default)
platform_hint: str = ""
pii_safe: bool = False
def to_json(self) -> str:
"""Serialize to a compact, stable JSON string for the handshake frame."""
return json.dumps(asdict(self), sort_keys=True, ensure_ascii=False)
@classmethod
def from_json(cls, data: str) -> "CapabilityDescriptor":
"""Deserialize from a handshake JSON string.
Unknown keys are ignored (forward-compat: a newer connector may send
fields this gateway does not know yet); missing optional keys fall back
to dataclass defaults.
"""
raw = json.loads(data)
known = {f for f in cls.__dataclass_fields__} # type: ignore[attr-defined]
filtered = {k: v for k, v in raw.items() if k in known}
return cls(**filtered)
@classmethod
def from_platform_entry(
cls,
entry,
*,
len_unit: str = "chars",
supports_draft_streaming: bool = False,
supports_edit: bool = True,
supports_threads: bool = False,
markdown_dialect: str = "plain",
) -> "CapabilityDescriptor":
"""Project a ``gateway.platform_registry.PlatformEntry`` into a descriptor.
Demonstrates the descriptor is a *subset/projection* of what
``PlatformEntry`` already encodes, not a parallel concept: ``label``,
``max_message_length``, ``emoji``, ``platform_hint``, ``pii_safe`` and
the platform name come straight off the entry. The runtime capability
bits that ``PlatformEntry`` does NOT encode (length unit, draft/edit/
thread/markdown behavior) are supplied by the caller — in production
the connector fills these from the live adapter's capability methods.
``max_message_length`` of 0 on a ``PlatformEntry`` means "no limit";
we map that to the stream_consumer default of 4096 so the descriptor
always carries a concrete chunking bound.
"""
max_len = getattr(entry, "max_message_length", 0) or 4096
return cls(
contract_version=CONTRACT_VERSION,
platform=entry.name,
label=entry.label,
max_message_length=max_len,
supports_draft_streaming=supports_draft_streaming,
supports_edit=supports_edit,
supports_threads=supports_threads,
markdown_dialect=markdown_dialect,
len_unit=len_unit,
emoji=getattr(entry, "emoji", "\U0001f50c"),
platform_hint=getattr(entry, "platform_hint", ""),
pii_safe=getattr(entry, "pii_safe", False),
)

101
gateway/relay/transport.py Normal file
View File

@@ -0,0 +1,101 @@
"""Relay transport protocol — the gateway<->connector wire contract. EXPERIMENTAL.
The ``RelayAdapter`` (gateway side) delegates all wire I/O to a ``RelayTransport``.
The gateway dials OUT to the connector, so a production transport is a WebSocket
client; in tests it is an in-memory stub (``tests/gateway/relay/stub_connector.py``).
This module defines the protocol surface only — no concrete transport. The
contract has four concerns:
1. Lifecycle: ``connect`` / ``disconnect``.
2. Handshake: ``handshake`` returns the ``CapabilityDescriptor`` the connector
advertises for the platform this adapter fronts.
3. Inbound: ``set_inbound_handler`` registers a callback the transport invokes
with each normalized ``MessageEvent`` the connector delivers.
4. Outbound: ``send_outbound`` carries send/edit/typing actions back to the
connector; ``get_chat_info`` proxies a chat-info lookup; ``send_interrupt``
routes a mid-turn /stop down the socket that owns the session_key.
EXPERIMENTAL: may change without a deprecation cycle until >=2 Class-1 platforms
validate it. See docs/relay-connector-contract.md.
"""
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, runtime_checkable
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
# Callback the transport invokes for each inbound normalized event.
InboundHandler = Callable[[MessageEvent], Awaitable[None]]
@runtime_checkable
class RelayTransport(Protocol):
"""Full gateway<->connector transport contract."""
async def connect(self) -> bool:
"""Open the connection to the connector; return True on success."""
...
async def disconnect(self) -> None:
"""Close the connection."""
...
async def handshake(self) -> CapabilityDescriptor:
"""Return the capability descriptor the connector advertises."""
...
def set_inbound_handler(self, handler: InboundHandler) -> None:
"""Register the callback invoked with each inbound MessageEvent."""
...
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Carry an outbound action (send/edit/typing) to the connector.
Returns a result dict; for ``op == "send"`` it carries
``success`` and optionally ``message_id`` / ``error``.
"""
...
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Proxy a chat-info lookup to the connector."""
...
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
"""Route a mid-turn /stop to the connector for ``session_key``.
The connector forwards it down the socket owned by the gateway
instance running that session (the /stop routing invariant). On the
gateway side this is the OUTBOUND direction; the actual task
cancellation happens when the connector echoes an interrupt inbound
(handled in Task 1.4).
"""
...
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Act on a shared-identity capability bound to a session (A2 outbound).
Some platforms hand the connector a credential that acts on the SHARED
bot identity (e.g. a Discord interaction follow-up token, valid ~15min).
Under A2 that credential NEVER reaches the gateway — the connector
stripped it at the edge and bound it in its capability vault keyed by
the session. To use it, the gateway issues a SEMANTIC action against the
session it is already in; it never names or holds a token.
The action dict carries:
``op`` == ``"follow_up"``
``session_key`` the session whose bound capability to wield
``kind`` the capability kind (e.g. ``"discord.interaction_token"``)
``content`` the message content to send via that capability
``metadata?`` optional extras
The connector resolves the real capability (``resolveOutboundCapability``
on its side), enforces the tenant match (tenant B can never wield tenant
A's capability), and egresses. Returns ``{success, message_id?, error?}``;
``success`` is False when the capability is absent/expired or the tenant
doesn't match — the gateway then has nothing to retry with (by design: a
leaked gateway holds zero capability material).
"""
...

View File

@@ -0,0 +1,267 @@
"""Production WebSocket RelayTransport — the gateway's live link to the connector.
The gateway dials OUT to the connector's relay endpoint over a WebSocket and
speaks the newline-delimited JSON frame protocol defined in the connector repo
(``gateway-gateway`` ``src/relay/protocol.ts``) and mirrored in
``docs/relay-connector-contract.md``:
gateway -> connector : hello, outbound, interrupt
connector -> gateway : descriptor, inbound, outbound_result, interrupt_inbound
Frames:
hello {type, platform, botId}
descriptor {type, descriptor} (handshake reply)
inbound {type, event, bufferId?} (a normalized MessageEvent)
outbound {type, requestId, action} (send/edit/typing/follow_up)
outbound_result {type, requestId, result}
interrupt {type, session_key, reason?} (gateway egresses /stop)
interrupt_inbound{type, session_key, chat_id} (connector -> owning gateway)
This is the concrete transport behind the ``RelayTransport`` Protocol; the
``RelayAdapter`` delegates all wire I/O to it. Outbound calls block on a
per-request future keyed by ``requestId`` until the matching ``outbound_result``
arrives. A background reader task pumps inbound frames to the registered handler
and resolves pending outbound futures.
EXPERIMENTAL: the frame schema may change without a deprecation cycle until at
least two Class-1 platforms validate it.
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from typing import Any, Dict, Optional
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import InboundHandler
logger = logging.getLogger(__name__)
try: # lazy/optional dep — mirrors gateway/platforms/feishu.py
import websockets
except ImportError: # pragma: no cover - exercised only when the extra is absent
websockets = None # type: ignore[assignment]
WEBSOCKETS_AVAILABLE = websockets is not None
# How long to wait for the handshake descriptor and for each outbound result.
_HANDSHAKE_TIMEOUT_S = 30.0
_OUTBOUND_TIMEOUT_S = 30.0
def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent:
"""Rebuild a MessageEvent from the connector's normalized inbound payload.
The connector emits SessionSource as the snake_case wire form (§3); map it
back onto the gateway dataclasses. Unknown message types fall back to TEXT.
"""
src = raw.get("source", {}) or {}
from gateway.config import Platform
platform = src.get("platform", "relay")
try:
platform_enum = Platform(platform)
except ValueError:
platform_enum = Platform.RELAY
source = SessionSource(
platform=platform_enum,
chat_id=src.get("chat_id", ""),
chat_type=src.get("chat_type", "dm"),
chat_name=src.get("chat_name"),
user_id=src.get("user_id"),
user_name=src.get("user_name"),
thread_id=src.get("thread_id"),
chat_topic=src.get("chat_topic"),
user_id_alt=src.get("user_id_alt"),
chat_id_alt=src.get("chat_id_alt"),
guild_id=src.get("guild_id"),
parent_chat_id=src.get("parent_chat_id"),
message_id=src.get("message_id"),
)
try:
msg_type = MessageType(raw.get("message_type", "text"))
except ValueError:
msg_type = MessageType.TEXT
return MessageEvent(
text=raw.get("text", ""),
message_type=msg_type,
source=source,
message_id=raw.get("message_id"),
reply_to_message_id=raw.get("reply_to_message_id"),
media_urls=raw.get("media_urls") or [],
)
class WebSocketRelayTransport:
"""RelayTransport over a WebSocket connection the gateway dials to the connector."""
def __init__(
self,
url: str,
platform: str,
bot_id: str,
*,
connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S,
outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S,
) -> None:
if not WEBSOCKETS_AVAILABLE:
raise RuntimeError(
"WebSocketRelayTransport requires the 'websockets' package "
"(install the messaging extra)."
)
self._url = url
self._platform = platform
self._bot_id = bot_id
self._connect_timeout_s = connect_timeout_s
self._outbound_timeout_s = outbound_timeout_s
self._ws: Any = None
self._reader: Optional[asyncio.Task[None]] = None
self._inbound: Optional[InboundHandler] = None
self._descriptor: Optional[CapabilityDescriptor] = None
self._descriptor_ready: asyncio.Future[CapabilityDescriptor] | None = None
# requestId -> future awaiting the matching outbound_result.
self._pending: Dict[str, asyncio.Future[Dict[str, Any]]] = {}
self._closing = False
# ── lifecycle ────────────────────────────────────────────────────────
async def connect(self) -> bool:
loop = asyncio.get_running_loop()
self._descriptor_ready = loop.create_future()
self._ws = await websockets.connect(self._url) # type: ignore[union-attr]
self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader")
# Send hello; the descriptor arrives via the reader and resolves handshake().
await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id})
return True
async def disconnect(self) -> None:
self._closing = True
if self._reader is not None:
self._reader.cancel()
try:
await self._reader
except (asyncio.CancelledError, Exception): # noqa: BLE001 - best-effort teardown
pass
self._reader = None
if self._ws is not None:
try:
await self._ws.close()
except Exception: # noqa: BLE001
pass
self._ws = None
# Fail any in-flight outbound waiters so callers don't hang.
for fut in self._pending.values():
if not fut.done():
fut.set_exception(RuntimeError("relay transport closed"))
self._pending.clear()
async def handshake(self) -> CapabilityDescriptor:
if self._descriptor is not None:
return self._descriptor
if self._descriptor_ready is None:
raise RuntimeError("handshake() called before connect()")
return await asyncio.wait_for(self._descriptor_ready, timeout=self._connect_timeout_s)
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
# ── outbound ─────────────────────────────────────────────────────────
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
return await self._request_response(action)
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
# follow_up rides the same outbound frame; the connector dispatches by
# action.op. Kept as a distinct method to satisfy the transport Protocol
# and to make the A2 call site explicit.
return await self._request_response(action)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
result = await self._request_response(
{"op": "get_chat_info", "chat_id": chat_id}, frame_type="outbound"
)
# The connector answers chat-info inside the outbound_result envelope.
info = result.get("chat_info") or result
return {"name": info.get("name", chat_id), "type": info.get("type", "dm")}
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
await self._send({"type": "interrupt", "session_key": session_key, "reason": reason})
async def _request_response(
self, action: Dict[str, Any], frame_type: str = "outbound"
) -> Dict[str, Any]:
if self._ws is None:
return {"success": False, "error": "relay transport not connected"}
request_id = uuid.uuid4().hex
loop = asyncio.get_running_loop()
fut: asyncio.Future[Dict[str, Any]] = loop.create_future()
self._pending[request_id] = fut
try:
await self._send({"type": frame_type, "requestId": request_id, "action": action})
return await asyncio.wait_for(fut, timeout=self._outbound_timeout_s)
except asyncio.TimeoutError:
return {"success": False, "error": "relay outbound timed out"}
finally:
self._pending.pop(request_id, None)
# ── wire I/O ─────────────────────────────────────────────────────────
async def _send(self, frame: Dict[str, Any]) -> None:
if self._ws is None:
raise RuntimeError("relay transport not connected")
await self._ws.send(json.dumps(frame) + "\n")
async def _read_loop(self) -> None:
assert self._ws is not None
buf = ""
try:
async for chunk in self._ws:
buf += chunk if isinstance(chunk, str) else chunk.decode("utf-8")
# Newline-delimited frames; keep any trailing partial line.
*lines, buf = buf.split("\n")
for line in lines:
if line.strip():
await self._handle_frame(line)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection is caller policy
if not self._closing:
logger.warning("relay ws read loop ended: %s", exc)
async def _handle_frame(self, line: str) -> None:
try:
frame = json.loads(line)
except json.JSONDecodeError:
logger.warning("relay: skipping malformed frame")
return
ftype = frame.get("type")
if ftype == "descriptor":
descriptor = CapabilityDescriptor.from_json(json.dumps(frame.get("descriptor", {})))
self._descriptor = descriptor
if self._descriptor_ready is not None and not self._descriptor_ready.done():
self._descriptor_ready.set_result(descriptor)
elif ftype == "inbound":
if self._inbound is not None:
event = _event_from_wire(frame.get("event", {}))
await self._inbound(event)
elif ftype == "outbound_result":
fut = self._pending.get(frame.get("requestId", ""))
if fut is not None and not fut.done():
fut.set_result(frame.get("result", {}))
elif ftype == "interrupt_inbound":
# Bridged into the adapter's interrupt path by the runner wiring.
handler = getattr(self, "_interrupt_inbound_handler", None)
if handler is not None:
await handler(frame.get("session_key", ""), frame.get("chat_id", ""))
else:
# hello/outbound/interrupt are gateway->connector; ignore if echoed.
pass
def set_interrupt_inbound_handler(self, handler: Any) -> None:
"""Register the callback for connector->gateway interrupt_inbound frames."""
self._interrupt_inbound_handler = handler

View File

@@ -4614,6 +4614,21 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"plugin discovery failed at gateway startup", exc_info=True,
)
# Register the generic relay adapter when a connector relay URL is
# configured (GATEWAY_RELAY_URL / gateway.relay_url). No URL -> no-op, so
# direct/single-tenant deployments are unaffected. When configured, the
# adapter dials the connector over a WebSocket, negotiates its capability
# descriptor at handshake, and bridges inbound/outbound like any platform.
try:
from gateway.relay import register_relay_adapter, relay_url
if register_relay_adapter():
logger.info("relay adapter registered (connector at %s)", relay_url())
except Exception:
logger.warning(
"relay adapter registration failed at gateway startup", exc_info=True,
)
# Register declarative shell hooks from cli-config.yaml. Gateway
# has no TTY, so consent has to come from one of the three opt-in
# channels (--accept-hooks on launch, HERMES_ACCEPT_HOOKS env var,

View File

@@ -1,136 +1,123 @@
# Photon iMessage platform plugin
This plugin connects Hermes Agent to iMessage (and other Spectrum
interfaces) through [Photon][photon] — a managed service that handles
iMessage line allocation, delivery, and abuse-prevention so users don't
have to run their own Mac relay.
This plugin connects Hermes Agent to iMessage (and WhatsApp Business +
future Spectrum interfaces) through [Photon][photon] — a managed
service that handles the iMessage line allocation, delivery, and
abuse-prevention layer so users don't have to run their own Mac
relay.
The free tier uses Photon's shared iMessage line pool and is the path we
recommend for everyone who doesn't already pay for a dedicated number.
The free tier uses Photon's shared iMessage line pool (`type: shared`)
and is the path we recommend for everyone who doesn't already pay for a
dedicated number.
## Architecture
Like Discord and Slack, Photon is a **persistent-connection** channel — no
public URL, no webhook, no signing secret. The `spectrum-ts` SDK holds a
long-lived **gRPC stream** to Photon for both directions. Because the SDK is
TypeScript-only, Hermes runs it inside a small supervised Node sidecar and
talks to it over loopback.
```
gRPC (spectrum-ts)
┌─────────────────────────┐ ◄───────────────► ┌──────────────────────┐
Photon Spectrum cloud │ app.messages │ Node sidecar
│ (iMessage line owner) │ space.send() (plugins/…/sidecar)
└─────────────────────────┘ ─────────────────────
GET /inbound (NDJSON) │ ▲ POST /send
inbound events /typing
┌──────────────────────┐
PhotonAdapter
│ (Python, in gateway) │
└──────────────────────┘
┌─────────────────────────┐ HMAC-signed POSTs ┌──────────────────┐
│ Photon Spectrum cloud │ ──────────────────────► │ Hermes Agent │
(iMessage line owner) │ │ (Python)
└─────────────────────────┘ JSON over loopback │
────────────────────── │ PhotonAdapter │
│ + aiohttp recv │
│ spectrum-ts
│ SDK (Node) spawns + super- │
vises ▼
┌─────────────────────────┐ ├──────────────────┤
Node sidecar ◄──── X-Hermes- ─ │ Node sidecar │
│ (plugins/.../sidecar) │ Sidecar-Token │ child process │
└─────────────────────────┘ └──────────────────┘
```
- **Inbound**: the sidecar consumes the SDK's `app.messages` gRPC stream,
normalizes each message, and streams it to the adapter over a loopback
`GET /inbound` (NDJSON). The adapter dedupes on `messageId` and dispatches
a `MessageEvent` to the gateway. It reconnects automatically if the stream
drops; the sidecar owns the gRPC reconnect to Photon.
- **Outbound**: `send` / `send_typing` are loopback POSTs to the sidecar,
authenticated with a shared `X-Hermes-Sidecar-Token`.
Inbound traffic is webhook-only — Hermes runs an aiohttp listener
that verifies `X-Spectrum-Signature` and dedupes on `message.id`.
Outbound traffic goes through a tiny Node sidecar that runs the
`spectrum-ts` SDK. Photon does not currently expose an HTTP
send-message endpoint; their own docs say:
> Pass `space.id` to `Space.send(...)` from a separate `spectrum-ts`
> SDK instance to reply. **No public HTTP send endpoint exists today.**
> — https://photon.codes/docs/webhooks/events
When Photon ships an HTTP send endpoint, `_sidecar_send` is the one
function that swaps and the sidecar disappears. The rest of the
plugin stays the same.
## First-time setup
```bash
# One-shot setup: device login (opens browser) + project + user + sidecar deps
# 1. One-shot setup: device login (opens browser) + project + user + sidecar deps
hermes photon setup --phone +15551234567
# Start the gateway
# 2. Expose your webhook URL to the public internet
# (cloudflared, ngrok, your gateway's public hostname, etc.)
# Then register it with Photon:
hermes photon webhook register https://your-host.example.com/photon/webhook
# 3. Save the signing secret it prints to ~/.hermes/.env
# as PHOTON_WEBHOOK_SECRET=...
# Photon only returns it ONCE.
# 4. Start the gateway
hermes gateway start --platform photon
```
`hermes photon setup` does, in order:
1. **Device login** (RFC 8628, `client_id=photon-cli`) — opens
`https://app.photon.codes/` for approval and stores the bearer token.
2. **Find or create** the `Hermes Agent` project on the Photon dashboard.
3. **Enable Spectrum**, read the project's `spectrumProjectId`, rotate the
project secret, and persist both.
4. **Register your phone number** as a Spectrum user (idempotent — skipped if
a user with that number already exists).
5. **Print the assigned iMessage line** — the number you text to reach your
agent.
6. **Install the sidecar deps** (`spectrum-ts`).
There is no separate `login` command; like every other Hermes channel,
onboarding goes through one setup surface. Re-running `setup` reuses an
existing token/project, so it's safe to run again to finish a partial setup.
Run `hermes photon status` to see what's configured.
`hermes photon setup` runs the RFC 8628 device-code login as its first
step — it opens `https://app.photon.codes/` for approval, then
provisions the Spectrum project + iMessage line. There is no separate
`login` command; like every other Hermes channel, onboarding goes
through one setup surface. Re-running `setup` reuses an existing token
and project, so it's safe to run again to finish a partial setup.
## Credentials
Runtime SDK credentials live in `~/.hermes/.env` (the same place every other
channel keeps its token), and the adapter reads them from the environment:
```bash
PHOTON_PROJECT_ID=<spectrumProjectId> # the SDK's projectId
PHOTON_PROJECT_SECRET=<projectSecret>
```
Management metadata lives in `~/.hermes/auth.json` under `credential_pool`:
Stored in `~/.hermes/auth.json` under `credential_pool`:
```jsonc
{
"credential_pool": {
"photon": [
{ "access_token": "<device-bearer>", "issued_at": ... }
{ "access_token": "<dashboard-bearer>", "issued_at": ... }
],
"photon_project": [
{
"dashboard_project_id": "<dashboard id>",
"spectrum_project_id": "<spectrumProjectId>",
"project_secret": "<projectSecret>",
"name": "Hermes Agent"
}
{ "project_id": "...", "project_secret": "...", "name": "Hermes Agent" }
]
}
}
```
> **Note on ids.** A Photon project has two identifiers: the dashboard `id`
> (used for management API calls) and the `spectrumProjectId` (what the SDK
> authenticates with). `PHOTON_PROJECT_ID` is the **spectrum** id.
The per-URL webhook signing secret is treated like an API key and
lives in `~/.hermes/.env` as `PHOTON_WEBHOOK_SECRET`.
## Configuration knobs
All env vars are documented in `plugin.yaml`. The most important:
All env vars are documented in `plugin.yaml`. The most important are:
| Env var | Default | Meaning |
|---------------------------|----------------------------|--------------------------------------|
| `PHOTON_PROJECT_ID` | from .env / auth.json | Spectrum project id (SDK `projectId`)|
| `PHOTON_PROJECT_SECRET` | from .env / auth.json | Project secret |
| `PHOTON_SIDECAR_PORT` | 8789 | Loopback port for the sidecar |
| `PHOTON_SIDECAR_AUTOSTART`| true | Spawn the sidecar on connect |
| `PHOTON_DASHBOARD_HOST` | https://app.photon.codes | Dashboard API host |
| `PHOTON_HOME_CHANNEL` | your number (set by setup) | Default space for cron delivery — a space id, or a bare E.164 number (resolved to a DM) |
| `PHOTON_ALLOWED_USERS` | your number (set by setup) | Comma-separated E.164 allowlist |
| `PHOTON_REQUIRE_MENTION` | false | Gate group chats on a wake word |
| `PHOTON_MAX_INLINE_ATTACHMENT_BYTES` | 20 MB | Max inbound attachment size the sidecar reads & inlines |
| Env var | Default | Meaning |
|--------------------------|--------------------|-----------------------------------------|
| `PHOTON_PROJECT_ID` | from auth.json | Spectrum project ID |
| `PHOTON_PROJECT_SECRET` | from auth.json | Spectrum project secret (HTTP Basic) |
| `PHOTON_WEBHOOK_SECRET` | (unset) | Signing secret returned at register |
| `PHOTON_WEBHOOK_PORT` | 8788 | Local port for the aiohttp listener |
| `PHOTON_WEBHOOK_PATH` | /photon/webhook | Path under which the listener mounts |
| `PHOTON_SIDECAR_PORT` | 8789 | Loopback port for sidecar control |
| `PHOTON_HOME_CHANNEL` | (unset) | Default space ID for cron delivery |
| `PHOTON_ALLOWED_USERS` | (unset) | Comma-separated E.164 allowlist |
## Attachments & limitations
## Limitations (current Photon API)
- **Inbound attachments are downloaded.** The sidecar reads the bytes
(`content.read()`) and base64-inlines them on the NDJSON event; the adapter
caches them to the shared media cache and populates `media_urls` /
`media_types`, so the agent sees the real image/file (vision included) —
parity with the BlueBubbles iMessage channel. Attachments larger than
`PHOTON_MAX_INLINE_ATTACHMENT_BYTES` (default 20 MB), or any byte read that
fails, fall back to a text marker (`[Photon attachment received: …]`) so the
agent still knows something arrived.
- **Outbound attachments are supported.** Images, voice notes, video, and
documents are sent via `space.send(attachment(...))` /
- **Inbound attachments are metadata only.** Inbound webhooks include the
filename + MIME type but no download URL. The plugin surfaces a
text marker (`[Photon attachment received: …]`) so the agent knows
something arrived, but cannot read the bytes. Photon's docs note
an attachment retrieval endpoint is on the roadmap.
- **Outbound attachments are supported.** Images, voice notes, video,
and documents are sent via `space.send(attachment(...))` /
`space.send(voice(...))` through the sidecar's `/send-attachment`
endpoint; a caption is delivered as a separate text bubble after the media.
- **Reactions, message effects, polls** — supported by `spectrum-ts` but not
yet exposed; the sidecar is the natural place to add them.
endpoint. A caption is delivered as a separate text bubble after the
media.
- **Reactions, message effects, polls** — not exposed yet; the
`spectrum-ts` SDK supports them, and the sidecar is the natural
place to add them when the agent has reason to use them.
[photon]: https://photon.codes/

View File

@@ -1,30 +1,32 @@
"""
Photon Spectrum (iMessage) platform adapter for Hermes Agent.
Both directions of traffic flow through a small supervised Node sidecar
(see ``sidecar/index.mjs``) that runs the ``spectrum-ts`` SDK — the SDK is
TypeScript-only and there is no public HTTP message API, so a sidecar is
unavoidable.
Inbound:
The SDK's ``app.messages`` is a long-lived **gRPC** stream. The sidecar
serializes each message to a normalized JSON event and streams it to this
adapter over a loopback ``GET /inbound`` (NDJSON). A background task here
consumes that stream, dedupes on ``messageId``, and dispatches a
``MessageEvent`` to the gateway via ``BasePlatformAdapter.handle_message``.
No webhook, no public URL, no signing secret.
Photon delivers signed JSON ``POST``s to a URL we register. The
adapter spins up an aiohttp server on ``PHOTON_WEBHOOK_PORT``,
verifies ``X-Spectrum-Signature`` (HMAC-SHA256 of
``v0:{timestamp}:{body}`` keyed by the per-URL signing secret),
rejects deliveries with a timestamp drift > 5 minutes, dedupes on
``message.id``, and dispatches a normalized ``MessageEvent`` to the
gateway runner via ``BasePlatformAdapter.handle_message``.
Outbound:
``send`` / ``send_typing`` are loopback POSTs to the sidecar's control
endpoints, authenticated with a shared bearer token. Outbound media
(images, voice notes, video, documents) goes through spectrum-ts'
``attachment()`` / ``voice()`` content builders via the sidecar's
``/send-attachment`` endpoint.
Photon does not currently expose a public HTTP send-message
endpoint, so the adapter spawns a small Node sidecar (see
``sidecar/index.mjs``) that runs the ``spectrum-ts`` SDK. Each
``send`` / ``send_typing`` / attachment call from Hermes is a
loopback POST to the sidecar with a shared bearer token. Outbound
media (images, voice notes, video, documents) goes through
spectrum-ts' ``attachment()`` / ``voice()`` content builders.
When Photon ships an HTTP send endpoint we can collapse the sidecar
into ``_send_via_http`` and drop the Node dependency entirely.
"""
from __future__ import annotations
import asyncio
import base64
import hashlib
import hmac
import json
import logging
import os
@@ -37,21 +39,21 @@ import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from typing import Any, Dict, Optional
if TYPE_CHECKING:
# Type checkers see ``httpx`` as the always-imported module, so every use
# site type-checks cleanly. The runtime fallback below keeps the optional
# dependency truly optional (each use site is guarded by HTTPX_AVAILABLE).
try:
import httpx
HTTPX_AVAILABLE = True
else:
try:
import httpx
HTTPX_AVAILABLE = True
except ImportError: # pragma: no cover - httpx is already a Hermes dep
HTTPX_AVAILABLE = False
httpx = None
except ImportError: # pragma: no cover - httpx is already a Hermes dep
HTTPX_AVAILABLE = False
httpx = None # type: ignore[assignment]
try:
from aiohttp import web
AIOHTTP_AVAILABLE = True
except ImportError:
AIOHTTP_AVAILABLE = False
web = None # type: ignore[assignment]
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
@@ -61,13 +63,21 @@ from gateway.platforms.base import (
SendResult,
)
from .auth import load_project_credentials
from .auth import (
DEFAULT_SPECTRUM_HOST,
load_project_credentials,
_spectrum_host,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
_DEFAULT_WEBHOOK_PORT = 8788
_DEFAULT_WEBHOOK_PATH = "/photon/webhook"
_DEFAULT_WEBHOOK_BIND = "0.0.0.0"
_DEFAULT_SIDECAR_PORT = 8789
_DEFAULT_SIDECAR_BIND = "127.0.0.1"
@@ -76,8 +86,11 @@ _DEFAULT_SIDECAR_BIND = "127.0.0.1"
# size to ~16 KB. Keep a conservative cap that matches BlueBubbles.
_MAX_MESSAGE_LENGTH = 8000
# Dedup parameters — the gRPC stream is at-least-once, and a sidecar
# reconnect can replay, so keep at least 1k ids for ~48h.
# Spec says reject deliveries older than ~5 minutes for replay protection.
_TIMESTAMP_DRIFT_SECONDS = 300
# Dedup parameters — keep at least 1k IDs for ~48h per Photon's
# at-least-once guidance.
_DEDUP_MAX_SIZE = 4000
_DEDUP_WINDOW_SECONDS = 48 * 3600
@@ -105,7 +118,7 @@ def _coerce_port(value: Any, default: int) -> int:
def check_requirements() -> bool:
"""Return True when both Python deps and the Node sidecar are available."""
if not HTTPX_AVAILABLE:
if not HTTPX_AVAILABLE or not AIOHTTP_AVAILABLE:
return False
if not shutil.which(os.getenv("PHOTON_NODE_BIN") or "node"):
return False
@@ -133,33 +146,61 @@ def is_connected(cfg: PlatformConfig) -> bool:
def _env_enablement() -> Optional[dict]:
"""Seed PlatformConfig.extra from env so env-only setups appear in status.
The special ``home_channel`` key is handled by the core plugin hook and
becomes a proper ``HomeChannel`` on ``PlatformConfig``.
"""
"""Seed PlatformConfig.extra from env so env-only setups appear in status."""
project_id, project_secret = load_project_credentials()
if not (project_id and project_secret):
return None
seed = {"project_id": project_id, "project_secret": project_secret}
home = os.getenv("PHOTON_HOME_CHANNEL", "").strip()
if home:
seed["home_channel"] = {
"chat_id": home,
"name": os.getenv("PHOTON_HOME_CHANNEL_NAME", "Home"),
}
return seed
return {
"project_id": project_id,
"project_secret": project_secret,
"webhook_port": _coerce_port(os.getenv("PHOTON_WEBHOOK_PORT"), _DEFAULT_WEBHOOK_PORT),
"webhook_path": os.getenv("PHOTON_WEBHOOK_PATH") or _DEFAULT_WEBHOOK_PATH,
}
# ---------------------------------------------------------------------------
# Signature verification
def verify_signature(
*,
body: bytes,
timestamp_header: str,
signature_header: str,
signing_secret: str,
now: Optional[float] = None,
drift: int = _TIMESTAMP_DRIFT_SECONDS,
) -> bool:
"""Constant-time verify a Photon webhook signature.
Returns True iff the timestamp is within ``drift`` of *now* AND
``signature_header == "v0=" + hmac_sha256(secret, "v0:{ts}:{body}")``.
Exposed at module scope so tests can exercise it without an adapter
instance.
"""
if not timestamp_header or not signature_header or not signing_secret:
return False
try:
ts = int(timestamp_header)
except ValueError:
return False
if abs((now or time.time()) - ts) > drift:
return False
if not signature_header.startswith("v0="):
return False
expected = hmac.new(
signing_secret.encode("utf-8"),
f"v0:{ts}:".encode("utf-8") + body,
hashlib.sha256,
).hexdigest()
return hmac.compare_digest(expected, signature_header[3:])
# ---------------------------------------------------------------------------
# Adapter
class PhotonAdapter(BasePlatformAdapter):
"""Bidirectional bridge to Photon Spectrum via the Node spectrum-ts sidecar.
Inbound: consume the sidecar's ``/inbound`` gRPC stream.
Outbound: loopback POSTs to the sidecar's control channel.
"""
"""Inbound: signed webhook on aiohttp. Outbound: Node sidecar via loopback HTTP."""
MAX_MESSAGE_LENGTH = _MAX_MESSAGE_LENGTH
@@ -168,8 +209,6 @@ class PhotonAdapter(BasePlatformAdapter):
extra = config.extra or {}
# Project credentials (env wins, then config.extra, then auth.json).
# ``project_id`` here is the project's spectrumProjectId — the value
# the spectrum-ts SDK authenticates with.
stored_id, stored_sec = load_project_credentials()
self._project_id: str = (
os.getenv("PHOTON_PROJECT_ID")
@@ -184,6 +223,27 @@ class PhotonAdapter(BasePlatformAdapter):
or ""
)
# Webhook receiver
self._webhook_port = _coerce_port(
extra.get("webhook_port") or os.getenv("PHOTON_WEBHOOK_PORT"),
_DEFAULT_WEBHOOK_PORT,
)
self._webhook_path = (
extra.get("webhook_path")
or os.getenv("PHOTON_WEBHOOK_PATH")
or _DEFAULT_WEBHOOK_PATH
)
self._webhook_bind = (
extra.get("webhook_bind")
or os.getenv("PHOTON_WEBHOOK_BIND")
or _DEFAULT_WEBHOOK_BIND
)
self._webhook_secret: str = (
os.getenv("PHOTON_WEBHOOK_SECRET")
or extra.get("webhook_secret")
or ""
)
# Sidecar
self._sidecar_port = _coerce_port(
extra.get("sidecar_port") or os.getenv("PHOTON_SIDECAR_PORT"),
@@ -199,13 +259,12 @@ class PhotonAdapter(BasePlatformAdapter):
self._node_bin = os.getenv("PHOTON_NODE_BIN") or shutil.which("node") or "node"
# Runtime state
self._runner: Optional["web.AppRunner"] = None
self._sidecar_proc: Optional[subprocess.Popen] = None
self._sidecar_supervisor_task: Optional[asyncio.Task] = None
self._inbound_task: Optional[asyncio.Task] = None
self._inbound_running = False
self._http_client: Optional["httpx.AsyncClient"] = None
# Lightweight in-memory dedup. The gRPC stream is at-least-once, so we
# may see the same messageId more than once (e.g. after a reconnect).
# Lightweight in-memory dedup. Photon's at-least-once guarantee
# means we WILL see the same message.id more than once.
self._seen_messages: Dict[str, float] = {}
# Group-chat mention gating (parity with BlueBubbles). When enabled,
@@ -286,6 +345,13 @@ class PhotonAdapter(BasePlatformAdapter):
# -- Connection lifecycle ---------------------------------------------
async def connect(self) -> bool:
if not AIOHTTP_AVAILABLE:
self._set_fatal_error(
"MISSING_DEP",
"aiohttp not installed. Run: pip install aiohttp",
retryable=False,
)
return False
if not HTTPX_AVAILABLE:
self._set_fatal_error(
"MISSING_DEP", "httpx not installed", retryable=False
@@ -300,11 +366,19 @@ class PhotonAdapter(BasePlatformAdapter):
)
return False
client = httpx.AsyncClient(timeout=30.0)
self._http_client = client
# Start the aiohttp receiver first; without it the sidecar would
# be able to forward inbound traffic to a closed port.
try:
await self._start_webhook_server()
except OSError as e:
self._set_fatal_error(
"PORT_IN_USE",
f"webhook port {self._webhook_port} unavailable: {e}",
retryable=True,
)
return False
# The sidecar holds the gRPC stream for BOTH directions, so it is
# required now (not just for outbound).
# Spin up the Node sidecar (required for outbound).
if self._autostart_sidecar:
try:
await self._start_sidecar()
@@ -314,39 +388,23 @@ class PhotonAdapter(BasePlatformAdapter):
f"failed to start Photon sidecar: {e}",
retryable=True,
)
await client.aclose()
self._http_client = None
await self._stop_webhook_server()
return False
else:
logger.warning(
"[photon] sidecar autostart disabled — inbound + outbound will fail"
)
# Start consuming the inbound gRPC stream from the sidecar.
self._inbound_running = True
self._inbound_task = asyncio.get_event_loop().create_task(
self._inbound_loop()
)
logger.info("[photon] sidecar autostart disabled — outbound will fail")
self._http_client = httpx.AsyncClient(timeout=30.0)
self._mark_connected()
logger.info(
"[photon] connected — sidecar on %s:%d, streaming inbound over gRPC",
"[photon] connected — webhook at %s:%d%s, sidecar on %s:%d",
self._webhook_bind, self._webhook_port, self._webhook_path,
self._sidecar_bind, self._sidecar_port,
)
return True
async def disconnect(self) -> None:
self._inbound_running = False
if self._inbound_task is not None:
self._inbound_task.cancel()
try:
await self._inbound_task
except asyncio.CancelledError:
pass
except Exception:
pass
self._inbound_task = None
await self._stop_sidecar()
await self._stop_webhook_server()
if self._http_client is not None:
try:
await self._http_client.aclose()
@@ -355,61 +413,68 @@ class PhotonAdapter(BasePlatformAdapter):
self._http_client = None
self._mark_disconnected()
# -- Inbound stream consumer ------------------------------------------
# -- Webhook server ----------------------------------------------------
async def _inbound_loop(self) -> None:
"""Consume the sidecar's ``/inbound`` NDJSON stream, with reconnect.
async def _start_webhook_server(self) -> None:
app = web.Application()
app.router.add_post(self._webhook_path, self._handle_webhook)
app.router.add_get("/healthz", lambda _: web.Response(text="ok"))
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._webhook_bind, self._webhook_port)
await site.start()
The sidecar owns the gRPC reconnect/heartbeat to Photon; this loop
only has to re-open the loopback HTTP stream if it drops (e.g. the
sidecar restarts).
"""
client = self._http_client
if client is None:
return
url = f"http://{self._sidecar_bind}:{self._sidecar_port}/inbound"
headers = {"X-Hermes-Sidecar-Token": self._sidecar_token}
backoff = 1.0
while self._inbound_running:
async def _stop_webhook_server(self) -> None:
if self._runner is not None:
try:
async with client.stream(
"GET", url, headers=headers, timeout=None,
) as resp:
if resp.status_code != 200:
raise RuntimeError(f"/inbound returned {resp.status_code}")
backoff = 1.0 # reset on a successful connect
async for line in resp.aiter_lines():
if not self._inbound_running:
break
line = line.strip()
if not line:
continue # heartbeat
await self._on_inbound_line(line)
except asyncio.CancelledError:
raise
except Exception as e:
if not self._inbound_running:
break
logger.warning(
"[photon] inbound stream dropped (%s); reconnecting in %.1fs",
e, backoff,
)
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 30.0)
await self._runner.cleanup()
except Exception:
pass
self._runner = None
async def _handle_webhook(self, request: "web.Request") -> "web.Response":
body = await request.read()
if self._webhook_secret:
ts = request.headers.get("X-Spectrum-Timestamp", "")
sig = request.headers.get("X-Spectrum-Signature", "")
if not verify_signature(
body=body,
timestamp_header=ts,
signature_header=sig,
signing_secret=self._webhook_secret,
):
logger.warning("[photon] rejected webhook with bad signature")
return web.Response(status=401, text="invalid signature")
else:
logger.warning(
"[photon] PHOTON_WEBHOOK_SECRET unset — accepting unsigned "
"deliveries. Set the per-URL signing secret returned by "
"register-webhook to enable verification."
)
async def _on_inbound_line(self, line: str) -> None:
try:
event = json.loads(line)
payload = json.loads(body or b"{}")
except json.JSONDecodeError:
logger.debug("[photon] skipping non-JSON inbound line")
return
msg_id = event.get("messageId")
if msg_id and self._is_duplicate(msg_id):
return
return web.Response(status=400, text="invalid json")
if payload.get("event") != "messages":
# Photon currently emits only `messages`; any future event
# types are ack'd 200 so they don't retry.
return web.Response(text="ok")
msg = payload.get("message") or {}
msg_id = msg.get("id")
if not msg_id:
return web.Response(status=400, text="missing message.id")
if self._is_duplicate(msg_id):
return web.Response(text="ok (dup)")
try:
await self._dispatch_inbound(event)
await self._dispatch_inbound(payload)
except Exception:
logger.exception("[photon] inbound dispatch failed")
# 200 anyway — we own the dedup; failing here would cause
# Photon to retry the same id.
return web.Response(text="ok")
def _is_duplicate(self, msg_id: str) -> bool:
now = time.time()
@@ -423,77 +488,44 @@ class PhotonAdapter(BasePlatformAdapter):
self._seen_messages[msg_id] = now
return False
async def _dispatch_inbound(self, event: Dict[str, Any]) -> None:
"""Normalize a sidecar inbound event and dispatch it to the gateway.
Event shape (from ``sidecar/index.mjs``)::
{
"messageId": "...",
"platform": "iMessage",
"space": {"id": "...", "type": "dm"|"group", "phone": "+E164"},
"sender": {"id": "+E164"},
"content": {"type": "text", "text": "..."}
| {"type": "attachment", "id", "name", "mimeType",
"size", "data"?, "encoding"?},
"timestamp": "2026-05-14T19:06:32.000Z"
Attachment content carries the bytes inline as base64 ``data`` (with
``encoding == "base64"``) when the sidecar could read them within its
size cap; otherwise only metadata is present and we surface a marker.
}
"""
space = event.get("space") or {}
sender = event.get("sender") or {}
content = event.get("content") or {}
async def _dispatch_inbound(self, payload: Dict[str, Any]) -> None:
msg = payload.get("message") or {}
space = msg.get("space") or payload.get("space") or {}
sender = msg.get("sender") or {}
content = msg.get("content") or {}
space_id = space.get("id") or ""
sender_id = sender.get("id") or ""
if not space_id:
logger.warning("[photon] inbound missing space.id")
return
# iMessage spaces carry their type directly — no id string-sniffing.
chat_type = "group" if space.get("type") == "group" else "dm"
sender_id = sender.get("id") or space.get("phone") or space_id
# Space type — Photon documents iMessage DM ids as `any;-;+E164`
# and group ids as `any;+;<chat-guid>`. Use that as the
# heuristic; everything else is treated as DM.
chat_type = "group" if ";+;" in space_id else "dm"
ts_str = event.get("timestamp") or ""
# Timestamp — ISO 8601 from the platform.
ts_str = msg.get("timestamp") or ""
try:
timestamp = (
datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
if ts_str
else datetime.now(tz=timezone.utc)
)
timestamp = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
except ValueError:
timestamp = datetime.now(tz=timezone.utc)
# Media attachments (local cached paths) handed to the agent via the
# gateway's image-routing path, exactly like the BlueBubbles channel.
media_urls: List[str] = []
media_types: List[str] = []
ctype = content.get("type")
if ctype == "text":
# Content normalization. Spectrum is a discriminated union;
# text vs attachment metadata. Attachments are metadata-only
# today (no download URL) — log + carry the name so the agent
# at least knows something was sent.
if content.get("type") == "text":
text = content.get("text") or ""
mtype = MessageType.TEXT
elif ctype == "attachment":
elif content.get("type") == "attachment":
name = content.get("name") or "(unnamed)"
mime = content.get("mimeType") or ""
text = f"[Photon attachment received: {name} ({mime}) — no download URL yet]"
mtype = _attachment_message_type(mime)
cached = _cache_inbound_attachment(content, name, mime)
if cached:
media_urls.append(cached)
media_types.append(mime or "application/octet-stream")
# The real bytes are attached, so the agent sees the media
# itself — a short marker is enough text, and it keeps group
# mention-gating consistent with plain messages.
text = "(attachment)"
else:
# No bytes (over the sidecar cap, a failed read, or a caching
# failure) — fall back to a metadata marker so the agent still
# knows something arrived.
text = f"[Photon attachment received: {name} ({mime})]"
else:
text = f"[Photon content type not handled: {ctype}]"
text = f"[Photon content type not handled: {content.get('type')}]"
mtype = MessageType.TEXT
# Group-mention gating (parity with BlueBubbles). In group chats with
@@ -513,20 +545,18 @@ class PhotonAdapter(BasePlatformAdapter):
chat_id=space_id,
chat_name=space_id,
chat_type=chat_type,
user_id=sender_id,
user_id=sender_id or space_id,
user_name=sender_id or None,
)
message_event = MessageEvent(
event = MessageEvent(
text=text,
message_type=mtype,
source=source,
message_id=event.get("messageId"),
raw_message=event,
message_id=msg.get("id"),
raw_message=payload,
timestamp=timestamp,
media_urls=media_urls,
media_types=media_types,
)
await self.handle_message(message_event)
await self.handle_message(event)
# -- Sidecar lifecycle -------------------------------------------------
@@ -640,7 +670,7 @@ class PhotonAdapter(BasePlatformAdapter):
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
return await self._sidecar_send(chat_id, content)
return await self._sidecar_send(chat_id, content, reply_to=reply_to)
# -- Outbound media (parity with the BlueBubbles iMessage channel) -----
#
@@ -666,7 +696,7 @@ class PhotonAdapter(BasePlatformAdapter):
# Couldn't fetch the URL — fall back to sending it as text.
return await super().send_image(chat_id, image_url, caption, reply_to)
return await self._sidecar_send_attachment(
chat_id, local_path, caption=caption,
chat_id, local_path, caption=caption, reply_to=reply_to,
)
async def send_image_file(
@@ -679,7 +709,7 @@ class PhotonAdapter(BasePlatformAdapter):
**kwargs,
) -> SendResult:
return await self._sidecar_send_attachment(
chat_id, image_path, caption=caption,
chat_id, image_path, caption=caption, reply_to=reply_to,
)
async def send_voice(
@@ -692,7 +722,7 @@ class PhotonAdapter(BasePlatformAdapter):
**kwargs,
) -> SendResult:
return await self._sidecar_send_attachment(
chat_id, audio_path, caption=caption, kind="voice",
chat_id, audio_path, caption=caption, reply_to=reply_to, kind="voice",
)
async def send_video(
@@ -705,7 +735,7 @@ class PhotonAdapter(BasePlatformAdapter):
**kwargs,
) -> SendResult:
return await self._sidecar_send_attachment(
chat_id, video_path, caption=caption,
chat_id, video_path, caption=caption, reply_to=reply_to,
)
async def send_document(
@@ -719,7 +749,7 @@ class PhotonAdapter(BasePlatformAdapter):
**kwargs,
) -> SendResult:
return await self._sidecar_send_attachment(
chat_id, file_path, name=file_name, caption=caption,
chat_id, file_path, name=file_name, caption=caption, reply_to=reply_to,
)
async def send_animation(
@@ -737,29 +767,23 @@ class PhotonAdapter(BasePlatformAdapter):
async def send_typing(self, chat_id: str, metadata=None) -> None:
try:
await self._sidecar_call(
"/typing", {"spaceId": chat_id, "state": "start"}
)
await self._sidecar_call("/typing", {"spaceId": chat_id})
except Exception as e:
logger.debug("[photon] send_typing failed: %s", e)
async def stop_typing(self, chat_id: str) -> None:
try:
await self._sidecar_call(
"/typing", {"spaceId": chat_id, "state": "stop"}
)
except Exception as e:
logger.debug("[photon] stop_typing failed: %s", e)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Return whatever we know about a Spectrum space id.
Photon's ``space.id`` is opaque; the inbound event also carries the
DM/group type, but here we only have the id, so infer conservatively.
Photon's `space.id` is opaque (`any;-;+E164` for DMs,
`any;+;<guid>` for groups). We surface that shape directly so
the gateway has something to show in session pickers / logs.
"""
return {"name": chat_id, "type": "dm", "id": chat_id}
chat_type = "group" if ";+;" in chat_id else "dm"
return {"name": chat_id, "type": chat_type, "id": chat_id}
async def _sidecar_send(self, space_id: str, text: str) -> SendResult:
async def _sidecar_send(
self, space_id: str, text: str, *, reply_to: Optional[str] = None,
) -> SendResult:
if len(text) > self.MAX_MESSAGE_LENGTH:
logger.warning(
"[photon] truncating outbound from %d to %d chars",
@@ -767,6 +791,8 @@ class PhotonAdapter(BasePlatformAdapter):
)
text = text[: self.MAX_MESSAGE_LENGTH]
body: Dict[str, Any] = {"spaceId": space_id, "text": text}
if reply_to:
body["replyTo"] = reply_to
try:
data = await self._sidecar_call("/send", body)
except Exception as e:
@@ -781,6 +807,7 @@ class PhotonAdapter(BasePlatformAdapter):
name: Optional[str] = None,
mime_type: Optional[str] = None,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
kind: str = "attachment",
) -> SendResult:
"""POST a local file to the sidecar's ``/send-attachment`` endpoint.
@@ -815,6 +842,8 @@ class PhotonAdapter(BasePlatformAdapter):
body["mimeType"] = mime_type
if caption:
body["caption"] = caption
if reply_to:
body["replyTo"] = reply_to
try:
data = await self._sidecar_call("/send-attachment", body)
except Exception as e:
@@ -858,81 +887,11 @@ def _attachment_message_type(mime: str) -> MessageType:
return MessageType.DOCUMENT
# MIME → file-extension maps for caching inbound attachment bytes. These mirror
# the BlueBubbles iMessage channel so both adapters name cached media the same.
_IMAGE_EXT_BY_MIME = {
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"image/webp": ".webp",
"image/heic": ".jpg",
"image/heif": ".jpg",
"image/tiff": ".jpg",
}
_AUDIO_EXT_BY_MIME = {
"audio/mp3": ".mp3",
"audio/mpeg": ".mp3",
"audio/ogg": ".ogg",
"audio/wav": ".wav",
"audio/x-caf": ".mp3",
"audio/mp4": ".m4a",
"audio/aac": ".m4a",
}
def _cache_inbound_attachment(
content: Dict[str, Any], name: str, mime: str
) -> Optional[str]:
"""Decode a base64-inlined inbound attachment and cache it locally.
The sidecar inlines the attachment bytes as ``content["data"]`` (base64).
We decode them and route to the shared media cache by MIME type, returning
the cached absolute path so the caller can populate ``media_urls`` (which
the gateway then hands to the model). Returns ``None`` when there are no
bytes (over the sidecar's inline cap or a failed read) or when caching
fails, so the caller can fall back to a text marker.
"""
data_b64 = content.get("data")
if not data_b64:
return None
try:
raw = base64.b64decode(data_b64)
except (ValueError, TypeError) as exc:
logger.warning("[photon] failed to decode inbound attachment bytes: %s", exc)
return None
from gateway.platforms.base import (
cache_audio_from_bytes,
cache_document_from_bytes,
cache_image_from_bytes,
)
mime = (mime or "").lower()
# Prefer the real extension from the filename; fall back to the MIME map.
suffix = Path(name).suffix if name else ""
try:
if mime.startswith("image/"):
ext = suffix or _IMAGE_EXT_BY_MIME.get(mime, ".jpg")
try:
return cache_image_from_bytes(raw, ext)
except ValueError:
# Bytes don't look like a supported image (e.g. HEIC magic) —
# still deliver them as a document rather than dropping them.
return cache_document_from_bytes(raw, name)
if mime.startswith("audio/"):
ext = suffix or _AUDIO_EXT_BY_MIME.get(mime, ".mp3")
return cache_audio_from_bytes(raw, ext)
# Video, application/*, and everything else → document cache.
return cache_document_from_bytes(raw, name)
except Exception as exc:
logger.warning("[photon] failed to cache inbound attachment %s: %s", name, exc)
return None
# ---------------------------------------------------------------------------
# Standalone (out-of-process) send for cron deliveries when the gateway
# is not co-resident. Reuses a live sidecar already listening on the
# configured port (cron processes cannot spawn the sidecar themselves).
# is not co-resident. Spins up an ephemeral sidecar call by spawning
# the existing sidecar binary one-shot; if a live sidecar is already
# listening on the configured port we reuse it.
async def _standalone_send(
pconfig: PlatformConfig,
@@ -1021,7 +980,7 @@ def register(ctx) -> None:
ctx.register_platform(
name="photon",
label="iMessage via Photon",
label="Photon iMessage",
adapter_factory=lambda cfg: PhotonAdapter(cfg),
check_fn=check_requirements,
validate_config=validate_config,
@@ -1052,7 +1011,7 @@ def register(ctx) -> None:
"Treat replies like regular text messages — short, friendly, no "
"markdown rendering. Recipient identifiers are E.164 phone "
"numbers; never expose them in responses unless the user asked. "
"Attachments arrive as metadata only."
"Attachments arrive as metadata only (no download URL yet)."
),
)

View File

@@ -1,37 +1,27 @@
"""
Photon Dashboard API client + device-code login flow.
Photon Dashboard + Spectrum API client and device-code login flow.
This module is pure Python — it intentionally does not depend on
``spectrum-ts``. Every management-plane operation (login, find/create
project, enable Spectrum, rotate the project secret, register a user,
list the assigned iMessage line) talks to Photon's **Dashboard API** on a
single host, exactly like the official Photon CLI (``photon-hq/cli``):
``spectrum-ts``. All management-plane operations (login, create
project, create user, register webhook) talk to Photon's HTTP API
directly:
Dashboard API https://app.photon.codes/api/...
OAuth 2.0 device flow, Bearer access token
OAuth bearer token from device flow
A Photon project carries two distinct identifiers:
Spectrum API https://spectrum.photon.codes/projects/{id}/...
HTTP Basic with (projectId, projectSecret)
* ``id`` — the Dashboard project id (used in API paths)
* ``spectrumProjectId`` — the Spectrum Cloud project id, populated when
Spectrum is enabled on the project
The webhook receiver + Node sidecar in ``adapter.py`` consume the
credentials this module persists to ``~/.hermes/auth.json``.
The ``spectrum-ts`` SDK (run by the Node sidecar) authenticates to Spectrum
Cloud with ``(spectrumProjectId, projectSecret)`` — so the value we persist
as ``PHOTON_PROJECT_ID`` for the runtime is the **spectrumProjectId**, not
the Dashboard ``id``. The Dashboard ``id`` is kept only for management
calls.
Credential storage mirrors every other Hermes channel:
* runtime SDK creds -> ``~/.hermes/.env`` (``PHOTON_PROJECT_ID`` =
spectrumProjectId, ``PHOTON_PROJECT_SECRET``) via ``save_env_value``
* management metadata -> ``~/.hermes/auth.json`` under
``credential_pool.photon`` (device token) and
``credential_pool.photon_project`` (dashboard id, spectrum id, name)
Reference: https://github.com/photon-hq/cli and
https://photon.codes/docs/api-reference/device-login/request-device-+-user-code
Reference docs (read at integration time):
https://photon.codes/docs/api-reference/introduction
https://photon.codes/docs/api-reference/device-login/request-device-+-user-code
https://photon.codes/docs/api-reference/device-login/exchange-device-code-for-token
https://photon.codes/docs/api-reference/projects/create-project
https://photon.codes/docs/api-reference/users/create-user
https://photon.codes/docs/webhooks/overview
"""
from __future__ import annotations
@@ -42,7 +32,7 @@ import re
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, Optional, Tuple
try:
import httpx
@@ -61,20 +51,17 @@ class PhotonDashboardAuthError(RuntimeError):
# Hosted Photon allowlists registered device clients on the device-code
# endpoint — an unregistered client_id is rejected with
# `400 {"error":"invalid_client"}`. Use Photon's published CLI device
# client (matches `CLI_CLIENT_ID` in photon-hq/cli) until the dashboard API
# registers Hermes as its own client_id.
# client until the dashboard API registers Hermes as its own client_id.
DEFAULT_CLIENT_ID = "photon-cli"
DEFAULT_SCOPE = "openid profile email"
DEFAULT_DASHBOARD_HOST = "https://app.photon.codes"
DEFAULT_SPECTRUM_HOST = "https://spectrum.photon.codes"
# Default name of the project Hermes provisions for the operator.
DEFAULT_PROJECT_NAME = "Hermes Agent"
# Polling defaults per RFC 8628. Photon overrides via `interval` /
# `expires_in` in the device-code response — those win.
# Polling defaults per RFC 8628. Photon may override via `interval` /
# `expires_in` fields in the device-code response — those win.
DEFAULT_POLL_INTERVAL = 5
DEFAULT_POLL_TIMEOUT = 1800 # 30 min, matching the CLI's fallback
DEFAULT_POLL_TIMEOUT = 900 # 15 minutes is conservative; Photon returns expires_in
E164_RE = re.compile(r"^\+[1-9]\d{6,14}$")
@@ -117,7 +104,7 @@ def _save_auth(data: Dict[str, Any]) -> None:
def load_photon_token() -> Optional[str]:
"""Return the device-flow bearer token stored by ``login()`` or ``None``."""
"""Return the bearer token stored by ``login()`` or ``None``."""
auth = _load_auth()
pool = auth.get("credential_pool", {}).get("photon") or []
if isinstance(pool, list) and pool:
@@ -141,13 +128,7 @@ def store_photon_token(token: str) -> None:
def load_project_credentials() -> Tuple[Optional[str], Optional[str]]:
"""Return the runtime SDK creds ``(spectrum_project_id, project_secret)``.
Precedence: process env (``~/.hermes/.env`` is loaded into the gateway's
environment at startup) wins, then ``auth.json`` for offline / status
use. This is the pair the Node sidecar feeds to ``spectrum-ts`` — the id
is the **spectrumProjectId**, not the Dashboard id.
"""
"""Return ``(project_id, project_secret)`` from auth.json + env override."""
env_id = os.getenv("PHOTON_PROJECT_ID")
env_sec = os.getenv("PHOTON_PROJECT_SECRET")
if env_id and env_sec:
@@ -156,72 +137,24 @@ def load_project_credentials() -> Tuple[Optional[str], Optional[str]]:
proj = auth.get("credential_pool", {}).get("photon_project") or []
if isinstance(proj, list) and proj:
entry = proj[0]
# back-compat: old records used "project_id" for the spectrum id
sid = entry.get("spectrum_project_id") or entry.get("project_id")
return (env_id or sid, env_sec or entry.get("project_secret"))
return (
env_id or entry.get("project_id"),
env_sec or entry.get("project_secret"),
)
return env_id, env_sec
def load_dashboard_project_id() -> Optional[str]:
"""Return the Dashboard project id (for management API calls)."""
env_id = os.getenv("PHOTON_DASHBOARD_PROJECT_ID")
if env_id:
return env_id
def store_project_credentials(project_id: str, project_secret: str, **extra: Any) -> None:
"""Persist the Spectrum project's id+secret under ``credential_pool.photon_project``."""
auth = _load_auth()
proj = auth.get("credential_pool", {}).get("photon_project") or []
if isinstance(proj, list) and proj:
return proj[0].get("dashboard_project_id") or proj[0].get("project_id")
return None
def store_project_credentials(
*,
spectrum_project_id: str,
project_secret: str,
dashboard_project_id: Optional[str] = None,
name: Optional[str] = None,
) -> None:
"""Persist project credentials to both .env (runtime) and auth.json (mgmt).
The runtime SDK creds land in ``~/.hermes/.env`` via the same
``save_env_value`` helper every other channel uses, so the gateway picks
them up from the environment with zero adapter changes. A copy of the
non-secret ids (plus the secret, for offline ``status``) is written to
``auth.json`` so management commands work even when ``.env`` hasn't been
loaded into the current process.
"""
auth = _load_auth()
record: Dict[str, Any] = {
"spectrum_project_id": spectrum_project_id,
record = {
"project_id": project_id,
"project_secret": project_secret,
"issued_at": int(time.time()),
}
if dashboard_project_id:
record["dashboard_project_id"] = dashboard_project_id
if name:
record["name"] = name
record.update(extra)
auth.setdefault("credential_pool", {})["photon_project"] = [record]
_save_auth(auth)
_persist_runtime_env(spectrum_project_id, project_secret)
def _persist_runtime_env(spectrum_project_id: str, project_secret: str) -> None:
"""Write the SDK creds to ``~/.hermes/.env`` (canonical runtime store).
Isolated in its own helper so the secret value flows straight into
``save_env_value`` without ever being bound to a printable local in a
caller — same CodeQL-clean-flow rationale as the rest of this module.
"""
try:
from hermes_cli.config import save_env_value
except ImportError:
logger.warning("photon: hermes_cli.config unavailable — skipping .env write")
return
try:
save_env_value("PHOTON_PROJECT_ID", spectrum_project_id)
save_env_value("PHOTON_PROJECT_SECRET", project_secret)
except Exception as e: # pragma: no cover - defensive
logger.warning("photon: could not write project creds to .env: %s", e)
# ---------------------------------------------------------------------------
@@ -248,8 +181,8 @@ def _dashboard_host() -> str:
return (os.getenv("PHOTON_DASHBOARD_HOST") or DEFAULT_DASHBOARD_HOST).rstrip("/")
def _bearer(token: str) -> Dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def _spectrum_host() -> str:
return (os.getenv("PHOTON_API_HOST") or DEFAULT_SPECTRUM_HOST).rstrip("/")
def request_device_code(
@@ -285,22 +218,16 @@ def poll_for_token(
) -> str:
"""Poll ``/api/auth/device/token`` until the user approves.
Mirrors the official CLI's polling loop: sleep first, then poll;
``authorization_pending`` keeps the interval, ``slow_down`` adds 5s,
HTTP 429 adds 10s, and ``access_denied`` / ``expired_token`` abort.
The bearer token comes from the response body's top-level
``access_token`` (better-auth device-grant shape), with
``session.access_token`` and the ``set-auth-token`` header kept as
fallbacks for API drift.
Returns the bearer token from the ``set-auth-token`` response header
(Photon's documented mechanism). Falls back to ``session.access_token``
in the JSON body if the header is absent — see the API spec.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon device login")
url = f"{_dashboard_host()}/api/auth/device/token"
deadline = time.time() + (timeout or code.expires_in or DEFAULT_POLL_TIMEOUT)
sleep = interval if interval is not None else (code.interval or DEFAULT_POLL_INTERVAL)
sleep = interval or code.interval or DEFAULT_POLL_INTERVAL
while time.time() < deadline:
time.sleep(sleep)
try:
resp = httpx.post(
url,
@@ -313,6 +240,7 @@ def poll_for_token(
)
except httpx.RequestError as e:
logger.warning("photon: device-token poll failed: %s", e)
time.sleep(sleep)
continue
if resp.status_code == 200:
body: Dict[str, Any] = {}
@@ -331,35 +259,34 @@ def poll_for_token(
"data.access_token, accessToken, or set-auth-token)."
)
return candidates[0].token
if resp.status_code == 429:
# RFC 8628 §3.5 — treat 429 as slow_down.
sleep += 10
if on_pending:
_safe(on_pending)
continue
if resp.status_code == 400:
body = {}
# RFC 8628 §3.5 — error codes are returned with 400.
body: Dict[str, Any] = {}
try:
body = resp.json() or {}
except json.JSONDecodeError:
pass
err = body.get("error") or body.get("message") or ""
if err == "authorization_pending":
if err in ("authorization_pending", "slow_down"):
if on_pending:
_safe(on_pending)
continue
if err == "slow_down":
sleep += 5
if on_pending:
_safe(on_pending)
try:
on_pending()
except Exception:
pass
if err == "slow_down":
sleep += 5
time.sleep(sleep)
continue
if err in ("expired_token", "access_denied"):
raise RuntimeError(f"Photon login failed: {err}")
# Unknown error — surface it
raise RuntimeError(f"Photon device token error: {err or resp.text}")
# Unexpected status; log and retry
logger.warning(
"photon: device-token unexpected status %s: %s",
resp.status_code, resp.text[:200],
)
time.sleep(sleep)
raise TimeoutError("Photon device login timed out")
@@ -499,13 +426,6 @@ def _validated_dashboard_token(candidates: list) -> str:
raise RuntimeError("Photon did not return a usable dashboard token")
def _safe(fn: Callable[[], None]) -> None:
try:
fn()
except Exception:
pass
def login_device_flow(
*,
client_id: str = DEFAULT_CLIENT_ID,
@@ -514,12 +434,15 @@ def login_device_flow(
) -> str:
"""Run the full device-code login flow and persist the token.
Returns the bearer token. ``on_user_code`` receives the
:class:`DeviceCode` so callers can print it + optionally open a browser.
Returns the bearer token. ``on_user_code`` is a callback receiving the
:class:`DeviceCode` so callers can print + optionally open the browser.
"""
code = request_device_code(client_id=client_id)
if on_user_code:
_safe(lambda: on_user_code(code))
try:
on_user_code(code)
except Exception:
pass
if open_browser:
try:
import webbrowser
@@ -538,335 +461,280 @@ def login_device_flow(
return token
def get_session(token: str) -> Dict[str, Any]:
"""GET ``/api/auth/get-session`` — confirm the token + fetch the user."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/auth/get-session"
resp = httpx.get(url, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
return resp.json() or {}
# ---------------------------------------------------------------------------
# Dashboard API: projects
def _unwrap_list(data: Any) -> List[Dict[str, Any]]:
if isinstance(data, list):
return data
if isinstance(data, dict):
for key in ("data", "projects", "users", "lines", "items"):
inner = data.get(key)
if isinstance(inner, list):
return inner
return []
def list_projects(token: str) -> List[Dict[str, Any]]:
"""GET ``/api/projects`` — return the caller's projects."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects"
resp = httpx.get(url, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
return _unwrap_list(resp.json())
def find_project_by_name(token: str, name: str) -> Optional[Dict[str, Any]]:
"""Return the first project whose name matches (case-insensitive)."""
target = (name or "").strip().lower()
for proj in list_projects(token):
if (proj.get("name") or "").strip().lower() == target:
return proj
return None
def get_project(token: str, project_id: str) -> Dict[str, Any]:
"""GET ``/api/projects/{id}`` — includes ``spectrum`` + ``spectrumProjectId``."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects/{project_id}"
resp = httpx.get(url, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
return resp.json() or {}
# Dashboard API: create project
def create_project(
token: str,
*,
name: str = DEFAULT_PROJECT_NAME,
name: str,
location: str = "United States",
platforms: Optional[list] = None,
) -> Dict[str, Any]:
"""POST ``/api/projects`` with ``spectrum: true`` and return ``{success, id}``."""
"""POST ``/api/projects/`` with ``spectrum: true`` and return the response.
The response includes ``spectrumProjectId`` and ``projectSecret`` — those
are the HTTP Basic credentials for the Spectrum API. Photon only
returns ``projectSecret`` to project owners at creation time.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon project creation")
url = f"{_dashboard_host()}/api/projects"
url = f"{_dashboard_host()}/api/projects/"
body: Dict[str, Any] = {
"name": name,
"location": location,
"spectrum": True,
"template": False,
"observability": False,
"platforms": platforms or ["imessage"],
}
resp = httpx.post(url, json=body, headers=_bearer(token), timeout=30.0)
resp = httpx.post(
url,
json=body,
headers={"Authorization": f"Bearer {token}"},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json() or {}
if data.get("error"):
raise RuntimeError(f"Photon create-project failed: {data['error']}")
if not data.get("id"):
raise RuntimeError("Photon create-project did not return a project id")
return data
def ensure_spectrum_enabled(token: str, project_id: str) -> Dict[str, Any]:
"""Enable Spectrum on the project if needed; return the project dict.
The dashboard exposes Spectrum as a toggle, so we only flip it when
``spectrum`` is currently false, then re-fetch to pick up the freshly
populated ``spectrumProjectId``.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
proj = get_project(token, project_id)
if not proj.get("spectrum"):
url = f"{_dashboard_host()}/api/projects/{project_id}/spectrum/toggle"
resp = httpx.post(url, json={}, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
proj = get_project(token, project_id)
if not proj.get("spectrumProjectId"):
raise RuntimeError(
"Spectrum is enabled but the project has no spectrumProjectId yet — "
"retry in a moment, or enable Spectrum from the dashboard."
)
return proj
def regenerate_project_secret(token: str, project_id: str) -> str:
"""POST ``/api/projects/{id}/regenerate-secret`` → the new project secret.
This is the only way to read a project secret (the dashboard shows it
exactly once), so callers should persist the returned value immediately.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects/{project_id}/regenerate-secret"
resp = httpx.post(url, json={}, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
data = resp.json() or {}
if data.get("error"):
raise RuntimeError(f"Photon regenerate-secret failed: {data['error']}")
secret = data.get("projectSecret")
if not secret:
raise RuntimeError("Photon regenerate-secret returned no projectSecret")
return str(secret)
return resp.json()
# ---------------------------------------------------------------------------
# Dashboard API: spectrum users
def _normalize_phone(phone: str) -> str:
"""Reduce a phone string to ``+`` and digits for dedup comparison."""
return re.sub(r"[^\d+]", "", phone or "")
def list_users(token: str, project_id: str) -> List[Dict[str, Any]]:
"""GET ``/api/projects/{id}/spectrum/users`` → ``SpectrumUser[]``."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects/{project_id}/spectrum/users"
resp = httpx.get(url, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
return _unwrap_list(resp.json())
def find_user_by_phone(
token: str, project_id: str, phone_number: str,
) -> Optional[Dict[str, Any]]:
"""Return an existing Spectrum user with the given phone number, or None."""
target = _normalize_phone(phone_number)
for user in list_users(token, project_id):
if _normalize_phone(user.get("phoneNumber") or "") == target:
return user
return None
# Spectrum API: create user
def create_user(
token: str,
project_id: str,
project_secret: str,
*,
phone_number: str,
user_type: str = "shared",
first_name: Optional[str] = None,
last_name: Optional[str] = None,
email: Optional[str] = None,
send_invite: bool = False,
assigned_phone_number: Optional[str] = None,
) -> Dict[str, Any]:
"""POST ``/api/projects/{id}/spectrum/users`` and return the created user."""
"""POST ``/projects/{id}/users/`` on the Spectrum API.
For free users we always pass ``type=shared``; Photon's Cosmos pool
assigns the iMessage line. ``assigned_phone_number`` is only valid
for the paid ``dedicated`` mode.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon user creation")
if not E164_RE.match(phone_number):
raise ValueError(
f"phone_number must be E.164 (e.g. +15551234567); got {phone_number!r}"
)
url = f"{_dashboard_host()}/api/projects/{project_id}/spectrum/users"
body: Dict[str, Any] = {"phoneNumber": phone_number, "sendInvite": send_invite}
url = f"{_spectrum_host()}/projects/{project_id}/users/"
body: Dict[str, Any] = {"type": user_type, "phoneNumber": phone_number}
if first_name:
body["firstName"] = first_name
if last_name:
body["lastName"] = last_name
if email:
body["email"] = email
resp = httpx.post(url, json=body, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
data = resp.json() or {}
if data.get("error"):
raise RuntimeError(f"Photon create-user failed: {data['error']}")
return data.get("user") or data
def register_user_if_absent(
token: str,
project_id: str,
*,
phone_number: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
email: Optional[str] = None,
) -> Tuple[Dict[str, Any], bool]:
"""Idempotently register a Spectrum user.
Returns ``(user, created)`` — ``created`` is False when a user with the
same phone number already exists (the official CLI does no dedup, so we
add it here to make ``setup`` safely re-runnable).
"""
existing = find_user_by_phone(token, project_id, phone_number)
if existing is not None:
return existing, False
user = create_user(
token, project_id,
phone_number=phone_number,
first_name=first_name,
last_name=last_name,
email=email,
)
return user, True
def user_assigned_line(user: Optional[Dict[str, Any]]) -> Optional[str]:
"""Return the iMessage number a Spectrum user is assigned to text on.
This is the user's ``assignedPhoneNumber`` (the dashboard's "TEXTS ON"
column) — i.e. the number to text to reach the agent, as opposed to the
user's own ``phoneNumber``. On shared-number plans there is no dedicated
entry in ``/lines``, so this per-user field is the source of truth.
Returns ``None`` when unset (e.g. a freshly created, not-yet-assigned user).
"""
if not user:
return None
val = user.get("assignedPhoneNumber")
return str(val) if val else None
# ---------------------------------------------------------------------------
# Dashboard API: iMessage lines (the assigned number inventory)
def list_lines(token: str, project_id: str) -> List[Dict[str, Any]]:
"""GET ``/api/projects/{id}/lines`` → ``[{id, platform, phoneNumber, status}]``."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects/{project_id}/lines"
resp = httpx.get(url, headers=_bearer(token), timeout=30.0)
resp.raise_for_status()
return _unwrap_list(resp.json())
def add_line(
token: str, project_id: str, *, platform: str = "imessage",
) -> Dict[str, Any]:
"""POST ``/api/projects/{id}/lines`` to provision a new line."""
if httpx is None:
raise RuntimeError("httpx is required for Photon")
url = f"{_dashboard_host()}/api/projects/{project_id}/lines"
if assigned_phone_number:
body["assignedPhoneNumber"] = assigned_phone_number
resp = httpx.post(
url, json={"platform": platform}, headers=_bearer(token), timeout=30.0,
url,
json=body,
auth=(project_id, project_secret),
timeout=30.0,
)
resp.raise_for_status()
data = resp.json() or {}
if data.get("error"):
raise RuntimeError(f"Photon add-line failed: {data['error']}")
return data.get("line") or data
def get_imessage_line(
token: str, project_id: str, *, create_if_missing: bool = True,
) -> Optional[Dict[str, Any]]:
"""Return the project's iMessage line (the number to text the agent).
If none exists and ``create_if_missing`` is set, provision one. Returns
``None`` if there is no line and provisioning failed.
"""
for line in list_lines(token, project_id):
if (line.get("platform") or "").lower() == "imessage":
return line
if create_if_missing:
try:
return add_line(token, project_id, platform="imessage")
except Exception as e:
logger.warning("photon: could not auto-provision iMessage line: %s", e)
return None
return None
if not data.get("succeed"):
raise RuntimeError(
f"Photon create-user failed: {data.get('message') or data}"
)
return data.get("data") or {}
# ---------------------------------------------------------------------------
# Credential status (display-only — never emits raw secret material)
# Spectrum API: webhook registration
#
# Endpoints from https://photon.codes/docs/webhooks/overview:
# POST /projects/{id}/webhooks/ register, returns signing secret ONCE
# GET /projects/{id}/webhooks/ list
# DELETE /projects/{id}/webhooks/{wid} remove
def register_webhook(
project_id: str, project_secret: str, *, webhook_url: str,
) -> Dict[str, Any]:
"""Register a webhook URL with Photon and return the API response.
Photon returns the per-URL signing secret exactly once in this
response, so callers who need to persist it should hand the
response to :func:`persist_webhook_signing_secret` immediately —
that helper writes the value into ``~/.hermes/.env`` (mode 0o600,
existing entries preserved) without the secret value ever needing
to leave this module.
"""
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook registration")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/"
resp = httpx.post(
url,
json={"webhookUrl": webhook_url},
auth=(project_id, project_secret),
timeout=30.0,
)
resp.raise_for_status()
data = resp.json() or {}
if not data.get("succeed"):
raise RuntimeError(
f"Photon register-webhook failed: {data.get('message') or data}"
)
return data.get("data") or {}
def print_credential_summary(emit: Any = print) -> None:
"""Pretty-print the credential status table via the *emit* callback.
Every secret-bearing read is reduced to a display literal inside this
function (``"✓ stored"`` / ``"✗ missing"`` / a non-secret id); the
callback only ever receives the assembled banner string, so no tainted
value escapes into the caller's scope.
Same isolation rationale as :func:`persist_webhook_signing_secret`:
all secret-bearing reads happen inside this function; the *emit*
callback only ever receives display literals like ``"✓ stored"``
or a project UUID. No tainted variable ever escapes into the
caller's scope. Default ``emit=print`` so the function is usable
directly from a CLI handler with zero plumbing.
"""
# Resolve every credential read into a plain display string FIRST,
# in a tight block. The intermediate `labels` dict only ever stores
# literals from a finite set ("✓ stored" / "✗ missing" / "✓ set" /
# "⚠ unset — verification disabled" / a project UUID) — never a
# credential's raw bytes. We then assemble the whole banner into
# one string and call emit() exactly once with that string, so the
# static taint analyzer sees a single sink that consumes only a
# joined literal blob.
labels: Dict[str, str] = {}
labels["device_token"] = (
"✓ stored" if load_photon_token()
else "✗ missing (run `hermes photon setup`)"
)
sid, sec = load_project_credentials()
labels["spectrum_project_id"] = sid if sid else "✗ missing"
labels["dashboard_project_id"] = load_dashboard_project_id() or ""
if load_photon_token():
labels["device_token"] = "✓ stored"
else:
labels["device_token"] = "✗ missing (run `hermes photon setup`)"
pid, sec = load_project_credentials()
labels["project_id"] = pid if pid else "✗ missing"
labels["project_key"] = "✓ stored" if sec else "✗ missing"
if os.getenv("PHOTON_WEBHOOK_SECRET"):
labels["webhook_key"] = "✓ set"
else:
labels["webhook_key"] = "⚠ unset — verification disabled"
rows = [
"Photon iMessage status",
"──────────────────────",
" device token : " + labels["device_token"],
" dashboard project : " + labels["dashboard_project_id"],
" spectrum project id : " + labels["spectrum_project_id"],
" project secret : " + labels["project_key"],
" project id : " + labels["project_id"],
" project key : " + labels["project_key"],
" webhook key : " + labels["webhook_key"],
]
emit("\n".join(rows))
def credential_summary() -> Dict[str, str]:
"""Return a fully pre-formatted credential status dict (no raw secrets)."""
"""Return a fully pre-formatted credential status dict.
Caller-safe: every value is one of ``"✓ stored"`` / ``"✗ missing"``
/ ``"⚠ unset — verification disabled"`` / ``"✓ set"`` literals, or a
UUID for the project id. No secret-bearing string ever leaves this
function — read-and-bool-cast happens entirely inside the closure.
"""
def _present_token() -> str:
return (
"✓ stored" if load_photon_token()
else "✗ missing (run `hermes photon setup`)"
)
return "✓ stored" if load_photon_token() else "✗ missing (run `hermes photon setup`)"
def _present_spectrum_id() -> str:
sid, _sec = load_project_credentials()
return sid or "✗ missing"
def _present_project_id() -> str:
pid, _sec = load_project_credentials()
return pid or "✗ missing"
def _present_secret() -> str:
_sid, sec = load_project_credentials()
def _present_project_secret() -> str:
_pid, sec = load_project_credentials()
return "✓ stored" if sec else "✗ missing"
def _present_webhook_secret() -> str:
return "✓ set" if os.getenv("PHOTON_WEBHOOK_SECRET") else "⚠ unset — verification disabled"
return {
"device_token": _present_token(),
"dashboard_project_id": load_dashboard_project_id() or "",
"spectrum_project_id": _present_spectrum_id(),
"project_key": _present_secret(),
"project_id": _present_project_id(),
"project_key": _present_project_secret(),
"webhook_key": _present_webhook_secret(),
}
def persist_webhook_signing_secret(
webhook_data: Dict[str, Any],
*,
on_summary: Optional[Any] = None,
) -> bool:
"""Persist a webhook signing secret via Hermes' canonical .env writer.
Delegates to :func:`hermes_cli.config.save_env_value` — the same
helper that backs every other API-key persistence path in Hermes
Agent (OpenAI key, Anthropic key, Telegram token, ...). The secret
value is read directly from ``webhook_data['signingSecret']`` (or
``['secret']`` fallback) and handed to that helper without ever
being bound to a local in any module that prints or logs.
Returns ``True`` on success, ``False`` if the response had no
secret OR the write failed. The optional ``on_summary`` callable
receives a plain string with no credential material, suitable for
printing — e.g. ``"Wrote to /home/u/.hermes/.env"`` or
``"register response: {redacted dict json}"``. We do the
formatting here so callers stay clear of the taint flow CodeQL
tracks through functions that touch secrets.
"""
if not isinstance(webhook_data, dict):
return False
has_secret = bool(webhook_data.get("signingSecret") or webhook_data.get("secret"))
redacted = {
k: ("<redacted>" if k in ("signingSecret", "secret") else v)
for k, v in webhook_data.items()
}
if on_summary is not None:
try:
on_summary("webhook registration response (redacted):")
on_summary(json.dumps(redacted, indent=2))
except Exception:
pass
if not has_secret:
return False
try:
from hermes_cli.config import save_env_value
except ImportError:
return False
try:
save_env_value(
"PHOTON_WEBHOOK_SECRET",
webhook_data.get("signingSecret") or webhook_data.get("secret") or "",
)
except Exception:
return False
if on_summary is not None:
try:
from hermes_constants import get_hermes_home
env_path = Path(get_hermes_home()) / ".env"
except Exception:
env_path = Path(os.path.expanduser("~/.hermes")) / ".env"
try:
on_summary(f"signing key saved to {env_path}")
on_summary("(Photon only returns this once — keep the file safe)")
except Exception:
pass
return True
def list_webhooks(project_id: str, project_secret: str) -> list:
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook listing")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/"
resp = httpx.get(url, auth=(project_id, project_secret), timeout=30.0)
resp.raise_for_status()
data = resp.json() or {}
return data.get("data") or []
def delete_webhook(
project_id: str, project_secret: str, *, webhook_id: str,
) -> None:
if httpx is None:
raise RuntimeError("httpx is required for Photon webhook deletion")
url = f"{_spectrum_host()}/projects/{project_id}/webhooks/{webhook_id}"
resp = httpx.delete(url, auth=(project_id, project_secret), timeout=30.0)
if resp.status_code not in (200, 204, 404):
resp.raise_for_status()

View File

@@ -7,26 +7,25 @@ Subcommands:
setup full first-time setup (device login + project + user + sidecar)
status show login + project + sidecar dep state
install-sidecar npm install inside plugins/platforms/photon/sidecar/
webhook register register the local webhook URL with Photon
webhook list list registered webhooks
webhook delete delete a webhook by id
The device-code login runs automatically as the first step of ``setup``;
there is no standalone ``login`` verb (matching how every other Hermes
gateway channel onboards through a single setup surface).
Photon uses the spectrum-ts gRPC stream for inbound — there is no webhook
to register, so there are no webhook subcommands.
"""
from __future__ import annotations
import argparse
import getpass
import json
import os
import shutil
import subprocess
import sys
from pathlib import Path
from hermes_cli.colors import Colors, color
from . import auth as photon_auth
_SIDECAR_DIR = Path(__file__).parent / "sidecar"
@@ -39,14 +38,9 @@ def register_cli(parser: argparse.ArgumentParser) -> None:
"""Wire up `hermes photon ...` subcommands."""
subs = parser.add_subparsers(dest="photon_command", required=False)
p_setup = subs.add_parser(
"setup",
help="First-time setup (device login + project + user + sidecar)",
)
p_setup.add_argument("--project-name", default=None,
help="Project name (default: 'Hermes Agent')")
p_setup.add_argument("--phone", default=None,
help="Your E.164 phone number (e.g. +15551234567)")
p_setup = subs.add_parser("setup", help="First-time setup (device login + project + user + sidecar)")
p_setup.add_argument("--project-name", default=None, help="Project name (default: 'Hermes Agent')")
p_setup.add_argument("--phone", default=None, help="Your E.164 phone number (e.g. +15551234567)")
p_setup.add_argument("--first-name", default=None)
p_setup.add_argument("--last-name", default=None)
p_setup.add_argument("--email", default=None)
@@ -58,6 +52,14 @@ def register_cli(parser: argparse.ArgumentParser) -> None:
subs.add_parser("status", help="Show login + project + sidecar dep state")
subs.add_parser("install-sidecar", help="Run npm install inside the sidecar directory")
p_hook = subs.add_parser("webhook", help="Manage Photon webhook registrations")
hook_subs = p_hook.add_subparsers(dest="photon_webhook_command", required=True)
p_hook_reg = hook_subs.add_parser("register", help="Register a webhook URL")
p_hook_reg.add_argument("url", help="Publicly reachable URL Photon should POST to")
hook_subs.add_parser("list", help="List registered webhooks for the current project")
p_hook_del = hook_subs.add_parser("delete", help="Delete a webhook by id")
p_hook_del.add_argument("webhook_id")
parser.set_defaults(func=dispatch)
@@ -75,6 +77,8 @@ def dispatch(args: argparse.Namespace) -> int:
return _cmd_status(args)
if sub == "install-sidecar":
return _cmd_install_sidecar(args)
if sub == "webhook":
return _cmd_webhook(args)
print(f"unknown subcommand: {sub}", file=sys.stderr)
return 2
@@ -118,7 +122,7 @@ def _cmd_setup(args: argparse.Namespace) -> int:
# 1. Login (skip if we already have a token).
token = photon_auth.load_photon_token()
if not token:
print("[1/5] No Photon token found — running device login...")
print("[1/4] No Photon token found — running device login...")
rc = _run_device_login(args)
if rc != 0:
return rc
@@ -127,163 +131,85 @@ def _cmd_setup(args: argparse.Namespace) -> int:
print("login completed but token was not stored", file=sys.stderr)
return 1
else:
print("[1/5] Reusing existing Photon token")
print("[1/4] Reusing existing Photon token")
# 2. Find or create the "Hermes Agent" project.
name = args.project_name or photon_auth.DEFAULT_PROJECT_NAME
dashboard_id = photon_auth.load_dashboard_project_id()
try:
if dashboard_id:
print("[2/5] Reusing configured Photon project")
else:
existing = photon_auth.find_project_by_name(token, name)
if existing and existing.get("id"):
dashboard_id = existing["id"]
print(f"[2/5] Found existing project '{name}'")
else:
print(f"[2/5] Creating Photon project '{name}'...")
created = photon_auth.create_project(token, name=name)
dashboard_id = created.get("id")
print(" ✓ project created")
except Exception as e:
print(f"project setup failed: {e}", file=sys.stderr)
return 1
if not dashboard_id:
print("could not resolve a Photon project id", file=sys.stderr)
return 1
# 3. Enable Spectrum, fetch the spectrum project id, rotate the secret,
# and persist both (runtime creds -> ~/.hermes/.env, ids -> auth.json).
try:
print("[3/5] Enabling Spectrum and provisioning credentials...")
proj = photon_auth.ensure_spectrum_enabled(token, dashboard_id)
spectrum_id = proj.get("spectrumProjectId")
if not spectrum_id:
print("spectrum provisioning failed: no spectrum project id", file=sys.stderr)
return 1
spectrum_id = str(spectrum_id)
secret = photon_auth.regenerate_project_secret(token, dashboard_id)
photon_auth.store_project_credentials(
spectrum_project_id=spectrum_id,
project_secret=secret,
dashboard_project_id=dashboard_id,
name=name,
)
# spectrum_id is an opaque non-secret id; safe to show.
print(f" ✓ Spectrum enabled (project id {spectrum_id}) — secret saved")
except Exception as e:
print(f"spectrum provisioning failed: {e}", file=sys.stderr)
return 1
# 4. Register the operator's phone number as a Spectrum user (idempotent).
phone = args.phone or _prompt(
color(
"[4/5] Your iMessage phone number (E.164, e.g. +15551234567): ",
Colors.CYAN,
)
)
agent_number = None
if not phone:
print(" Skipped user registration (no phone given). Re-run with --phone later.")
# 2. Create (or surface existing) project.
existing_id, existing_secret = photon_auth.load_project_credentials()
project_id: str
project_secret: str
if existing_id and existing_secret:
project_id, project_secret = existing_id, existing_secret
# `project_id` is a Photon-assigned UUID, not a secret — but we
# keep the print terse to avoid CodeQL flow noise.
print("[2/4] Reusing existing Photon project")
else:
# Name/email are optional and never prompted for — pass --first-name /
# --email if you want them sent to the dashboard.
first_name = args.first_name
email = args.email
name = args.project_name or "Hermes Agent"
print(f"[2/4] Creating Photon project '{name}' (spectrum=true, imessage)...")
try:
user, created = photon_auth.register_user_if_absent(
token, dashboard_id,
phone_number=phone,
first_name=first_name,
last_name=args.last_name,
email=email,
data = photon_auth.create_project(token, name=name)
except Exception as e:
print(f"create-project failed: {e}", file=sys.stderr)
return 1
project_id = data.get("spectrumProjectId") or data.get("id") or ""
project_secret = data.get("projectSecret") or ""
if not project_id or not project_secret:
print(
"create-project did not return spectrumProjectId + "
"projectSecret. Re-run after enabling Spectrum on the "
"project, or open https://app.photon.codes/ to fetch the "
"secret manually.",
file=sys.stderr,
)
except ValueError as e:
print(f" invalid phone number: {e}", file=sys.stderr)
return 1
except Exception as e:
print(f" user registration failed: {e}", file=sys.stderr)
return 1
print(" ✓ phone registered" if created else " ✓ phone already registered")
# The number to text the agent is the user's assigned iMessage line
# (the dashboard's "TEXTS ON" column). On shared-number plans there is
# no dedicated entry in /lines, so this per-user field is the source of
# truth — and we already have it from the (reused) user object.
agent_number = photon_auth.user_assigned_line(user)
# Allowlist the operator and make their DM the cron home channel —
# otherwise the gateway denies their own inbound messages
# ("Unauthorized user") and has no default space for cron delivery.
_autoconfigure_access(phone)
photon_auth.store_project_credentials(project_id, project_secret, name=name)
print(" ✓ project provisioned (run `hermes photon status` to see the id)")
# 5. Surface the agent's iMessage number (the number to text the agent).
if not agent_number:
# No per-user assignment — fall back to a dedicated line if the project
# has one provisioned in its line inventory.
# 3. Create a Spectrum user for the operator.
phone = args.phone or _prompt(
"Your iMessage phone number (E.164, e.g. +15551234567): "
)
if not phone:
print("[3/4] Skipped user creation (no phone given). Re-run with --phone later.")
else:
print("[3/4] Creating shared Spectrum user...")
try:
line = photon_auth.get_imessage_line(token, dashboard_id)
if line:
agent_number = line.get("phoneNumber")
photon_auth.create_user(
project_id, project_secret,
phone_number=phone,
first_name=args.first_name,
last_name=args.last_name,
email=args.email,
)
except Exception as e:
print(f" (could not fetch the assigned line: {e})", file=sys.stderr)
if agent_number:
print()
print(color("┌─ Your agent's iMessage number ───────────────────────────────", Colors.GREEN))
print(
color("│ 📱 ", Colors.GREEN)
+ color(str(agent_number), Colors.GREEN, Colors.BOLD)
)
print(color("│ Text this number from your phone to talk to your agent.", Colors.GREEN))
print(color("└──────────────────────────────────────────────────────────────", Colors.GREEN))
else:
print(" No iMessage line assigned yet — check the Photon dashboard.")
print(f"create-user failed: {e}", file=sys.stderr)
return 1
print(" ✓ user created — check `hermes photon status` or the dashboard for the assigned iMessage line")
# 6. Sidecar deps (spectrum-ts).
# 4. Sidecar deps.
if args.skip_sidecar_install:
print("[5/5] Skipping sidecar npm install (--skip-sidecar-install)")
print("[4/4] Skipping sidecar npm install (--skip-sidecar-install)")
else:
print("[5/5] Installing Node sidecar deps (spectrum-ts)...")
print("[4/4] Installing Node sidecar deps (spectrum-ts)...")
rc = _install_sidecar()
if rc != 0:
return rc
print()
print("✓ Photon setup complete.")
print(" Start the gateway: hermes gateway start --platform photon")
print(" Next: register a webhook URL Photon can reach:")
print(" hermes photon webhook register https://YOUR-PUBLIC-URL/photon/webhook")
print(" Then start the gateway:")
print(" hermes gateway start --platform photon")
return 0
def _autoconfigure_access(phone: str) -> None:
"""Allowlist the operator and set their DM as the cron home channel.
Writes ``PHOTON_ALLOWED_USERS`` (so the gateway authorizes the operator's
own inbound messages instead of denying them) and ``PHOTON_HOME_CHANNEL``
(the default space for cron delivery) to the operator's E.164 number. Each
is only filled when unset, so a hand-tuned allowlist / home channel is
never clobbered on a re-run.
"""
try:
from hermes_cli.config import get_env_value, save_env_value
except ImportError:
return
for key, label in (
("PHOTON_ALLOWED_USERS", "allowlisted your number"),
("PHOTON_HOME_CHANNEL", "set your DM as the cron home channel"),
):
try:
if get_env_value(key):
print(f" {key} already set — leaving it as-is.")
continue
save_env_value(key, phone)
print(f"{label} ({key})")
except Exception as e:
print(f" could not set {key}: {e}", file=sys.stderr)
def _cmd_status(_args: argparse.Namespace) -> int:
# Defer the credential rows to auth.print_credential_summary — its emit
# Defer the whole table to auth.print_credential_summary — its emit
# callback is the only sink that sees credential-derived strings, so
# cli.py keeps zero taint flow according to CodeQL.
photon_auth.print_credential_summary(print)
# The two non-credential rows live here so the helper stays purely
# about credentials.
node_bin = os.getenv("PHOTON_NODE_BIN") or shutil.which("node")
sidecar_installed = (_SIDECAR_DIR / "node_modules").exists()
print(f" node binary : {node_bin or '✗ missing (install Node 18+)'}")
@@ -292,7 +218,8 @@ def _cmd_status(_args: argparse.Namespace) -> int:
def _cmd_install_sidecar(_args: argparse.Namespace) -> int:
return _install_sidecar()
rc = _install_sidecar()
return rc
def _install_sidecar() -> int:
@@ -315,6 +242,64 @@ def _install_sidecar() -> int:
return proc.returncode
def _cmd_webhook(args: argparse.Namespace) -> int:
sub = getattr(args, "photon_webhook_command", None)
project_id, project_secret = photon_auth.load_project_credentials()
if not (project_id and project_secret):
print(
"no Photon project configured — run `hermes photon setup` first",
file=sys.stderr,
)
return 1
if sub == "register":
try:
data = photon_auth.register_webhook(
project_id, project_secret, webhook_url=args.url
)
except Exception as e:
print(f"register failed: {e}", file=sys.stderr)
return 1
# The helper does all the formatting + writing; cli.py never
# touches the signing-secret value, the path it was written
# to, or even the redacted-response dict. on_summary is a
# plain printer callback.
ok = photon_auth.persist_webhook_signing_secret(data, on_summary=print)
if not ok:
print(
"‼ Photon returned no signing secret in the response, "
"or the file write failed. Inspect your home directory "
"permissions and re-run; do not retry without first "
"deleting the orphaned webhook from the Photon dashboard.",
file=sys.stderr,
)
return 1
return 0
if sub == "list":
try:
data = photon_auth.list_webhooks(project_id, project_secret)
except Exception as e:
print(f"list failed: {e}", file=sys.stderr)
return 1
print(json.dumps(data, indent=2))
return 0
if sub == "delete":
try:
photon_auth.delete_webhook(
project_id, project_secret, webhook_id=args.webhook_id
)
except Exception as e:
print(f"delete failed: {e}", file=sys.stderr)
return 1
print(f"deleted webhook {args.webhook_id}")
return 0
print(f"unknown webhook subcommand: {sub}", file=sys.stderr)
return 2
# ---------------------------------------------------------------------------
# Gateway-setup entry point
#

View File

@@ -1,37 +1,52 @@
name: photon-platform
label: iMessage via Photon
label: Photon iMessage
kind: platform
version: 0.2.0
version: 0.1.0
description: >
Photon Spectrum gateway adapter for Hermes Agent.
Connects to iMessage (and other Spectrum interfaces) through Photon's
managed Spectrum platform. Both directions run over the `spectrum-ts`
SDK's long-lived gRPC stream via a small supervised Node sidecar —
inbound messages arrive on the SDK's `app.messages` stream (no webhook,
no public URL, no signing secret), and outbound messages are sent over
the same sidecar.
managed Spectrum platform. Inbound messages arrive as signed webhooks
on a local aiohttp server; outbound messages are sent via a small
supervised Node sidecar that runs the `spectrum-ts` SDK (Photon does
not currently expose a public HTTP send endpoint).
The plugin ships with a `hermes photon` CLI for the one-time device
login + project + user setup. Runtime credentials are written to
``~/.hermes/.env`` (``PHOTON_PROJECT_ID`` = the Spectrum project id,
``PHOTON_PROJECT_SECRET``) like every other channel, with management
metadata (device token, dashboard project id) in ``~/.hermes/auth.json``.
Photon's free shared-line model lets users get started without a paid plan.
The plugin ships with a `hermes photon` CLI for the one-time login
+ project + user setup, persists Spectrum credentials to
``~/.hermes/auth.json`` under ``credential_pool.photon`` (token) and
``credential_pool.photon_project`` (project id + secret), and exposes
Photon's free shared-line model so users can get started without a
paid plan.
author: NousResearch
requires_env:
- name: PHOTON_PROJECT_ID
description: "Spectrum project id (the project's spectrumProjectId; set by `hermes photon setup`)"
prompt: "Photon Spectrum project id"
description: "Spectrum project ID (set by `hermes photon setup`)"
prompt: "Photon Spectrum project ID"
url: "https://app.photon.codes/"
password: false
- name: PHOTON_PROJECT_SECRET
description: "Project secret paired with the Spectrum project id (set by `hermes photon setup`)"
prompt: "Photon project secret"
description: "Spectrum project secret (set by `hermes photon setup`)"
prompt: "Photon Spectrum project secret"
url: "https://app.photon.codes/"
password: true
optional_env:
- name: PHOTON_WEBHOOK_SECRET
description: "Per-URL HMAC-SHA256 signing secret returned at webhook registration"
prompt: "Photon webhook signing secret"
password: true
- name: PHOTON_WEBHOOK_PORT
description: "Local port the webhook receiver listens on (default 8788)"
prompt: "Webhook receiver port"
password: false
- name: PHOTON_WEBHOOK_PATH
description: "Path the webhook receiver listens on (default /photon/webhook)"
prompt: "Webhook receiver path"
password: false
- name: PHOTON_WEBHOOK_BIND
description: "Bind address for the webhook receiver (default 0.0.0.0)"
prompt: "Webhook bind address"
password: false
- name: PHOTON_SIDECAR_PORT
description: "Loopback port for the Node sidecar control + inbound channel (default 8789)"
description: "Loopback port for the Node sidecar control channel (default 8789)"
prompt: "Sidecar control port"
password: false
- name: PHOTON_SIDECAR_AUTOSTART
@@ -42,8 +57,12 @@ optional_env:
description: "Path to the node binary (default: shutil.which('node'))"
prompt: "Node executable path"
password: false
- name: PHOTON_API_HOST
description: "Spectrum management API host (default https://spectrum.photon.codes)"
prompt: "Spectrum API host"
password: false
- name: PHOTON_DASHBOARD_HOST
description: "Photon Dashboard API host (default https://app.photon.codes)"
description: "Dashboard API host (default https://app.photon.codes)"
prompt: "Dashboard host"
password: false
- name: PHOTON_ALLOWED_USERS
@@ -63,8 +82,8 @@ optional_env:
prompt: "Group mention patterns"
password: false
- name: PHOTON_HOME_CHANNEL
description: "Default Photon target for cron / notification delivery: Spectrum space id, DM GUID, or bare E.164 phone number"
prompt: "Home Photon target"
description: "Default Spectrum space ID for cron / notification delivery"
prompt: "Home space ID"
password: false
- name: PHOTON_HOME_CHANNEL_NAME
description: "Human label for the home channel"

View File

@@ -1,46 +1,40 @@
// Hermes Agent — Photon Spectrum sidecar
//
// Spawned by `plugins/platforms/photon/adapter.py` to bridge BOTH directions
// of messaging to Photon's Spectrum platform via the `spectrum-ts` SDK (the
// SDK is TypeScript-only, so a Node sidecar is unavoidable — there is no
// Python SDK and no public HTTP message API).
// Spawned by `plugins/platforms/photon/adapter.py` to bridge outbound
// messaging to Photon's Spectrum platform. Inbound messages go directly
// from Photon's webhook to Hermes' Python aiohttp receiver — this
// sidecar handles ONLY outbound calls (which require the spectrum-ts
// SDK because Photon has no public HTTP send endpoint today).
//
// Inbound (gRPC -> Hermes): the SDK's `app.messages` async iterator is a
// long-lived gRPC stream. We serialize each `[space, message]` to a
// normalized JSON event and stream it to the Python adapter over a
// loopback `GET /inbound` (NDJSON). We pause pulling from the stream while
// no consumer is attached so a backlog isn't pulled-and-lost before the
// gateway connects.
// Outbound (Hermes -> gRPC): `/send` drives `space.send(...)`; `/typing`
// sends the documented `typing("start" | "stop")` content builder.
//
// Protocol (all requests require `X-Hermes-Sidecar-Token: ${TOKEN}`):
// - GET /inbound -> 200 NDJSON stream; one JSON event per line, blank
// lines are heartbeats. One consumer at a time.
// - POST /healthz -> {"ok": true}
// - POST /send -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "text": "..."}
// - POST /send-attachment -> {"ok": true, "messageId": "..."}
// Protocol:
// - The sidecar listens on http://127.0.0.1:${PORT} (loopback only)
// - Each request must include `X-Hermes-Sidecar-Token: ${TOKEN}`
// - POST /healthz -> {"ok": true}
// - POST /send -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "text": "...", "replyTo": "..." | null}
// - POST /send-attachment -> {"ok": true, "messageId": "..."}
// body: {"spaceId": "...", "path": "...", "name": "..." | null,
// "mimeType": "..." | null, "caption": "..." | null,
// "kind": "attachment" | "voice"}
// - POST /typing -> {"ok": true}
// body: {"spaceId": "...", "state": "start" | "stop"}
// - POST /shutdown -> {"ok": true}; then process exits
// "kind": "attachment" | "voice", "replyTo": "..." | null}
// - POST /typing -> {"ok": true}
// body: {"spaceId": "..."}
// - POST /shutdown -> {"ok": true}; then process exits
//
// On SIGINT/SIGTERM the sidecar calls `app.stop()` (3s graceful) before
// exiting. Logs go to stderr; Python supervises restart.
// exiting. Errors are logged to stderr; Python supervises restart.
//
// Env vars (required):
// PHOTON_PROJECT_ID (== the project's spectrumProjectId)
// Env vars (all required):
// PHOTON_PROJECT_ID
// PHOTON_PROJECT_SECRET
// PHOTON_SIDECAR_PORT
// PHOTON_SIDECAR_TOKEN
//
// Optional:
// PHOTON_SIDECAR_BIND (default 127.0.0.1)
// PHOTON_SIDECAR_BIND (default 127.0.0.1)
// PHOTON_API_HOST (passed through to spectrum-ts if its config
// honours it)
import http from "node:http";
import { once } from "node:events";
const projectId = process.env.PHOTON_PROJECT_ID;
const projectSecret = process.env.PHOTON_PROJECT_SECRET;
@@ -48,17 +42,6 @@ const port = parseInt(process.env.PHOTON_SIDECAR_PORT || "8789", 10);
const bind = process.env.PHOTON_SIDECAR_BIND || "127.0.0.1";
const sharedToken = process.env.PHOTON_SIDECAR_TOKEN;
// Inbound attachments are read into memory and base64-inlined on the NDJSON
// event so the Python adapter can cache the real bytes (and the agent can see
// the image). Cap the size we inline — above it we forward metadata only and
// the adapter surfaces a text marker, so one large video can't balloon a
// single NDJSON line. Override via PHOTON_MAX_INLINE_ATTACHMENT_BYTES.
const MAX_INLINE_ATTACHMENT_BYTES =
Number(process.env.PHOTON_MAX_INLINE_ATTACHMENT_BYTES) || 20 * 1024 * 1024;
const DM_CHAT_GUID_RE = /^any;-;(\+\d{6,})$/;
const E164_RE = /^\+\d{6,}$/;
const MAX_KNOWN_SPACES = 2048;
if (!projectId || !projectSecret || !sharedToken) {
console.error(
"photon-sidecar: PHOTON_PROJECT_ID, PHOTON_PROJECT_SECRET and " +
@@ -69,15 +52,9 @@ if (!projectId || !projectSecret || !sharedToken) {
// Lazy-load spectrum-ts so a missing install fails with a clear message
// instead of a cryptic module-resolution error during import.
let Spectrum, imessage, attachment, voice, spectrumText, spectrumTyping;
let Spectrum, imessage, attachment, voice;
try {
({
Spectrum,
attachment,
voice,
text: spectrumText,
typing: spectrumTyping,
} = await import("spectrum-ts"));
({ Spectrum, attachment, voice } = await import("spectrum-ts"));
({ imessage } = await import("spectrum-ts/providers/imessage"));
} catch (e) {
console.error(
@@ -94,168 +71,17 @@ const app = await Spectrum({
providers: [imessage.config()],
});
// ---------------------------------------------------------------------------
// Inbound: forward `app.messages` (gRPC stream) to the Python consumer.
// At most one Python consumer is attached at a time (the gateway adapter).
let consumerRes = null;
let consumerWaiters = [];
const knownSpaces = new Map();
function rememberKnownSpace(id, space) {
if (!id || typeof id !== "string" || !space) return;
if (knownSpaces.has(id)) knownSpaces.delete(id);
knownSpaces.set(id, space);
if (knownSpaces.size > MAX_KNOWN_SPACES) {
const oldest = knownSpaces.keys().next().value;
if (oldest) knownSpaces.delete(oldest);
}
}
function phoneTargetFromSpaceId(spaceId) {
if (typeof spaceId !== "string") return null;
if (E164_RE.test(spaceId)) return spaceId;
const dmGuid = spaceId.match(DM_CHAT_GUID_RE);
return dmGuid ? dmGuid[1] : null;
}
function rememberInboundSpace(space, message) {
const msgSpace = message?.space || {};
const ids = [space?.id, msgSpace.id];
for (const id of ids) {
rememberKnownSpace(id, space);
const phone = phoneTargetFromSpaceId(id);
if (phone) rememberKnownSpace(phone, space);
}
}
function waitForConsumer() {
if (consumerRes) return Promise.resolve();
return new Promise((resolve) => consumerWaiters.push(resolve));
}
function setConsumer(res) {
consumerRes = res;
const waiters = consumerWaiters;
consumerWaiters = [];
for (const resolve of waiters) resolve();
}
function clearConsumer(res) {
if (consumerRes === res) consumerRes = null;
}
// Write one NDJSON line to the active consumer. Blocks until a consumer is
// connected; if the write fails (consumer vanished mid-flight) we wait for a
// new consumer and retry, so a message is never silently dropped here.
async function deliver(line) {
for (;;) {
await waitForConsumer();
const res = consumerRes;
if (!res) continue;
try {
const flushed = res.write(line + "\n");
if (!flushed) await once(res, "drain");
return;
} catch {
clearConsumer(res);
}
}
}
async function normalizeContent(content) {
if (!content || typeof content !== "object") {
return { type: "unknown" };
}
if (content.type === "text") {
return { type: "text", text: content.text || "" };
}
if (content.type === "attachment") {
const meta = {
type: "attachment",
id: content.id ?? null,
name: content.name ?? null,
mimeType: content.mimeType ?? null,
size: typeof content.size === "number" ? content.size : null,
};
// Read the bytes eagerly and base64-inline them as `data` so the Python
// adapter can cache the real file (the agent then sees the image itself).
// The spectrum-ts attachment object may not outlive this stream
// iteration, so a lazy/on-demand fetch isn't safe. Over-cap attachments
// (when size is known up front) are forwarded as metadata only and the
// adapter falls back to a text marker. A read failure must never break
// the inbound loop — we just drop `data` and forward metadata.
if (meta.size !== null && meta.size > MAX_INLINE_ATTACHMENT_BYTES) {
console.error(
`photon-sidecar: attachment ${meta.name ?? meta.id} (${meta.size} bytes) ` +
`exceeds inline cap ${MAX_INLINE_ATTACHMENT_BYTES}; forwarding metadata only`
);
return meta;
}
if (typeof content.read === "function") {
try {
const buf = await content.read();
// Guard the case where size was unknown but the bytes turn out to be
// over the cap.
if (buf && buf.length > MAX_INLINE_ATTACHMENT_BYTES) {
console.error(
`photon-sidecar: attachment ${meta.name ?? meta.id} (${buf.length} bytes) ` +
`exceeds inline cap after read; forwarding metadata only`
);
return meta;
}
meta.data = Buffer.from(buf).toString("base64");
meta.encoding = "base64";
} catch (e) {
console.error(
"photon-sidecar: failed to read attachment bytes " +
"(forwarding metadata only): " +
(e && e.stack ? e.stack : String(e))
);
}
}
return meta;
}
return { type: content.type || "unknown" };
}
async function normalizeEvent(space, message) {
try {
const msgSpace = message.space || {};
const ts = message.timestamp;
return {
messageId: message.id ?? null,
platform: message.platform || space.__platform || "iMessage",
space: {
id: space.id ?? msgSpace.id ?? null,
// iMessage spaces carry `type` ("dm"|"group") and `phone` directly.
type: space.type ?? msgSpace.type ?? "dm",
phone: space.phone ?? msgSpace.phone ?? null,
},
sender: { id: message.sender ? message.sender.id : null },
content: await normalizeContent(message.content),
timestamp:
ts instanceof Date ? ts.toISOString() : ts ? String(ts) : null,
};
} catch (e) {
console.error(
"photon-sidecar: failed to normalize inbound message: " + String(e)
);
return null;
}
}
// Drain the inbound stream — Photon's webhook is the canonical inbound
// path, but we still consume `app.messages` so spectrum-ts' internal
// reconnect/heartbeat logic keeps running. Each event is logged at
// debug level; everything else is a no-op here.
(async () => {
try {
for await (const [space, message] of app.messages) {
// Only forward inbound messages (ignore our own outbound echoes).
if (message && message.direction && message.direction !== "inbound") {
continue;
}
rememberInboundSpace(space, message);
const event = await normalizeEvent(space, message);
if (!event) continue;
await deliver(JSON.stringify(event));
for await (const [, message] of app.messages) {
console.error(
`photon-sidecar: drained inbound from ${message.platform} ` +
`space=${message.space?.id}`
);
}
} catch (e) {
console.error(
@@ -265,9 +91,6 @@ async function normalizeEvent(space, message) {
}
})();
// ---------------------------------------------------------------------------
// HTTP control + inbound server (loopback only).
async function readBody(req) {
const chunks = [];
for await (const chunk of req) chunks.push(chunk);
@@ -307,73 +130,27 @@ function ok(res, data) {
res.end(JSON.stringify({ ok: true, ...data }));
}
function handleInbound(req, res) {
res.statusCode = 200;
res.setHeader("Content-Type", "application/x-ndjson");
res.setHeader("Cache-Control", "no-store");
res.setHeader("Connection", "keep-alive");
// One consumer at a time — a fresh connection (e.g. after a reconnect)
// supersedes the previous one.
if (consumerRes && consumerRes !== res) {
try {
consumerRes.end();
} catch {
/* ignore */
}
}
setConsumer(res);
// Heartbeat keeps the socket warm through idle periods and lets the Python
// side detect a dead pipe promptly.
const heartbeat = setInterval(() => {
try {
res.write("\n");
} catch {
/* ignore */
}
}, 25000);
const cleanup = () => {
clearInterval(heartbeat);
clearConsumer(res);
};
req.on("close", cleanup);
req.on("aborted", cleanup);
res.on("error", cleanup);
}
async function resolveSpace(spaceId) {
const cached = knownSpaces.get(spaceId);
if (cached) return cached;
const phoneTarget = phoneTargetFromSpaceId(spaceId);
// A bare E.164 phone number addresses a DM. Resolve the user, then the (DM)
// space — `imessage(app).user(phone)` -> `im.space(user)` — so callers can
// pass just "+1..." (e.g. PHOTON_HOME_CHANNEL for cron delivery) instead of
// an opaque inbound space id. Photon also represents DM chat ids as
// `any;-;+1...`; normalize those through the same path so replies to inbound
// DMs still resolve after Python stores the inbound `space.id`.
if (phoneTarget && imessage) {
try {
const im = imessage(app);
const user = await im.user(phoneTarget);
const space = await im.space(user);
rememberKnownSpace(spaceId, space);
rememberKnownSpace(phoneTarget, space);
rememberKnownSpace(space?.id, space);
return space;
} catch (e) {
console.error(
"photon-sidecar: phone->DM resolution failed: " +
(e && e.stack ? e.stack : String(e))
);
// spectrum-ts exposes the same Space methods via `app.space(spaceId)` /
// narrowed helpers; we fall back through a few accessor shapes to
// tolerate small SDK API drift.
if (typeof app.space === "function") {
return await app.space(spaceId);
}
if (app.spaces && typeof app.spaces.get === "function") {
return await app.spaces.get(spaceId);
}
// Last resort — the platform-narrowed helper.
if (imessage) {
const im = imessage(app);
if (typeof im.space === "function") {
try {
return await im.space({ id: spaceId });
} catch {
/* fall through */
}
}
}
// No cache hit and not a phone/DM target. spectrum-ts exposes no API to
// rehydrate an arbitrary opaque space id: a Space is only obtained from the
// inbound `[space, message]` stream (cached above in `knownSpaces`) or
// reconstructed for a DM from its phone number. So a group space whose cache
// entry was lost — e.g. after a sidecar restart with no fresh inbound message
// in that group — cannot be resolved here; a new inbound message in the group
// re-warms the cache. DMs are unaffected (reconstructed from the phone).
throw new Error(`unable to resolve space id ${spaceId}`);
}
@@ -381,10 +158,6 @@ const server = http.createServer(async (req, res) => {
if (req.headers["x-hermes-sidecar-token"] !== sharedToken) {
return unauthorized(res);
}
// Long-lived inbound NDJSON stream.
if (req.method === "GET" && req.url === "/inbound") {
return handleInbound(req, res);
}
if (req.method !== "POST") {
res.statusCode = 405;
return res.end();
@@ -400,16 +173,18 @@ const server = http.createServer(async (req, res) => {
}
const body = await readBody(req);
if (req.url === "/send") {
const { spaceId, text } = body || {};
const { spaceId, text, replyTo } = body || {};
if (!spaceId || typeof text !== "string") {
return badRequest(res, "spaceId and text are required");
}
const space = await resolveSpace(spaceId);
const result = await space.send(spectrumText(text));
return ok(res, { messageId: result?.id || null });
const result = replyTo
? await space.send(text, { replyTo })
: await space.send(text);
return ok(res, { messageId: result?.id || result?.messageId || null });
}
if (req.url === "/send-attachment") {
const { spaceId, path, name, mimeType, caption, kind } =
const { spaceId, path, name, mimeType, caption, kind, replyTo } =
body || {};
if (!spaceId || typeof path !== "string" || !path) {
return badRequest(res, "spaceId and path are required");
@@ -427,13 +202,16 @@ const server = http.createServer(async (req, res) => {
? voice(path, Object.keys(opts).length ? opts : undefined)
: attachment(path, Object.keys(opts).length ? opts : undefined);
const result = await space.send(builder);
const sendOpts = replyTo ? { replyTo } : undefined;
const result = sendOpts
? await space.send(builder, sendOpts)
: await space.send(builder);
// iMessage delivers the caption as a separate bubble; send it
// after the media so the attachment renders first.
if (caption && typeof caption === "string") {
try {
await space.send(spectrumText(caption));
await space.send(caption);
} catch (e) {
console.error(
"photon-sidecar: attachment sent but caption failed: " +
@@ -441,16 +219,17 @@ const server = http.createServer(async (req, res) => {
);
}
}
return ok(res, { messageId: result?.id || null });
return ok(res, { messageId: result?.id || result?.messageId || null });
}
if (req.url === "/typing") {
const { spaceId, state = "start" } = body || {};
const { spaceId } = body || {};
if (!spaceId) return badRequest(res, "spaceId is required");
if (state !== "start" && state !== "stop") {
return badRequest(res, "state must be start or stop");
}
const space = await resolveSpace(spaceId);
await space.send(spectrumTyping(state));
if (typeof space.typing === "function") {
await space.typing();
} else if (typeof space.setTyping === "function") {
await space.setTyping(true);
}
return ok(res, {});
}
res.statusCode = 404;

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
{
"name": "@hermes-agent/photon-sidecar",
"private": true,
"version": "0.2.0",
"version": "0.1.0",
"description": "Spectrum-ts bridge for the Hermes Agent Photon platform plugin.",
"type": "module",
"main": "index.mjs",
@@ -12,6 +12,6 @@
"node": ">=18.17"
},
"dependencies": {
"spectrum-ts": "^1.17.1"
"spectrum-ts": "^0.1.0"
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
name: simplex-platform
label: SimpleX Chat
kind: platform
version: 1.1.0
version: 1.0.0
description: >
SimpleX Chat gateway adapter for Hermes Agent.
Connects to a local simplex-chat daemon via WebSocket and relays
@@ -9,7 +9,7 @@ description: >
SimpleX is decentralised and assigns no persistent user IDs —
every contact is an opaque internal ID generated at connection
time, making it one of the most private messengers available.
author: Mibayy, jooray
author: Mibayy
# ``requires_env`` and ``optional_env`` entries are surfaced in the
# ``hermes config`` UI via the platform-plugin env var injector in
# ``hermes_cli/config.py``.
@@ -27,18 +27,6 @@ optional_env:
description: "Allow any contact to talk to the bot (dev only — disables allowlist)"
prompt: "Allow all contacts? (true/false)"
password: false
- name: SIMPLEX_AUTO_ACCEPT
description: "Auto-accept incoming contact requests (default: true)"
prompt: "Auto-accept contact requests? (true/false)"
password: false
- name: SIMPLEX_GROUP_ALLOWED
description: >-
Comma-separated SimpleX group IDs the bot should participate in, or
'*' to allow any group. Omit to ignore group messages entirely
(safer default — a bot in a group otherwise processes every
member's traffic).
prompt: "Allowed group IDs (comma-separated, or '*' for any)"
password: false
- name: SIMPLEX_HOME_CHANNEL
description: "Default contact/group ID for cron / notification delivery"
prompt: "Home channel contact/group ID (or empty)"
@@ -47,10 +35,3 @@ optional_env:
description: "Human label for the home channel (defaults to the ID)"
prompt: "Home channel display name (or empty)"
password: false
- name: HERMES_SIMPLEX_TEXT_BATCH_DELAY
description: >-
Quiet-period seconds (default: 0.8) used to concatenate rapid-fire
inbound text messages into a single MessageEvent — same pattern as
Telegram's text batching.
prompt: "Text batch flush delay in seconds (default 0.8)"
password: false

View File

@@ -45,7 +45,6 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
# Auto-extracted from noreply emails + manual overrides
AUTHOR_MAP = {
"zhuhaoyu0909@icloud.com": "underthestars-zhy",
"raysun12142006@gmail.com": "yanxue06",
"alberto.regalado@ymail.com": "ARegalado1",
"alchemistchaos@protonmail.com": "AlchemistChaos", # co-author only
@@ -973,7 +972,6 @@ AUTHOR_MAP = {
"draixagent@gmail.com": "draix",
"martin.alca@gmail.com": "draix",
"junminliu@gmail.com": "JimLiu",
"juraj@bednar.io": "jooray",
"jarvischer@gmail.com": "maxchernin",
"levantam.98.2324@gmail.com": "LVT382009",
"zhurongcheng@rcrai.com": "heykb",

View File

View File

@@ -0,0 +1,75 @@
"""Test-only in-memory stub connector implementing RelayTransport.
MUST stay under tests/ — never under plugins/ or gateway/ (a CI guard in
test_no_stub_leak.py asserts this). It lets Phase 1 prove the gateway side of
the relay end-to-end with zero dependency on the real (Node) connector.
The stub:
- hands back a fixed CapabilityDescriptor at handshake,
- lets a test push synthetic inbound MessageEvents (push_inbound),
- records every outbound action (sent/interrupts) for assertions,
- answers get_chat_info from a small fixture map.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import InboundHandler
class StubConnector:
"""In-memory RelayTransport for tests."""
def __init__(self, descriptor: CapabilityDescriptor) -> None:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
self.follow_ups: List[Dict[str, Any]] = []
self.chat_info: Dict[str, Dict[str, Any]] = {}
# Canned result for the next send_outbound (override per-test).
self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"}
# Canned result for the next send_follow_up (override per-test). Default
# mimics a resolved capability egress; set success=False to simulate an
# absent/expired capability or a tenant mismatch on the connector side.
self.next_follow_up_result: Dict[str, Any] = {"success": True, "message_id": "f1"}
async def connect(self) -> bool:
self.connected = True
return True
async def disconnect(self) -> None:
self.connected = False
async def handshake(self) -> CapabilityDescriptor:
return self._descriptor
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
return dict(self.next_send_result)
return {"success": True}
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
return self.chat_info.get(chat_id, {"name": chat_id, "type": "dm"})
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
self.interrupts.append({"session_key": session_key, "reason": reason})
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.follow_ups.append(action)
return dict(self.next_follow_up_result)
# ── test driver ──────────────────────────────────────────────────────
async def push_inbound(self, event: MessageEvent) -> None:
"""Simulate the connector delivering a normalized inbound event."""
if self._inbound is None:
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
await self._inbound(event)

View File

@@ -0,0 +1,184 @@
"""Cross-repo contract conformance: docs/relay-connector-contract.md ⟷ Python.
The contract doc is the formal interface the connector repo
(NousResearch/gateway-gateway) implements against. The connector's TypeScript
structs are hand-mirrored from the doc, so if the Python source of truth drifts
from the doc, the two repos silently diverge and the handshake / session-keying
breaks only at integration time.
These tests make the doc ⟷ code relationship an enforced invariant:
* Every ``CapabilityDescriptor`` field (§2 table) is documented with the
correct required/optional flag, and the doc lists no fields the dataclass
lacks.
* Every ``SessionSource`` wire key (what ``to_dict()`` actually serializes)
is named in the contract doc's §3 discriminator section, and every
discriminator the doc calls out as a column header exists on the dataclass.
They are invariants, NOT change-detector snapshots: they assert the *relation*
between two artifacts that must move together, not a frozen list of names. Add
a field to the descriptor and the doc, and the test stays green; add it to only
one, and CI fails — which is exactly the lockstep guarantee the plan's
Cross-Repo Coordination Checklist calls for.
"""
from __future__ import annotations
import re
from pathlib import Path
import pytest
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.session import SessionSource
# Repo root: tests/gateway/relay/ -> repo root is parents[3]
_CONTRACT_DOC = (
Path(__file__).resolve().parents[3] / "docs" / "relay-connector-contract.md"
)
def _doc_text() -> str:
assert _CONTRACT_DOC.exists(), (
f"Contract doc missing at {_CONTRACT_DOC}. It is the formal cross-repo "
f"interface (Phase 1, Task 1.5) and must ship with the relay adapter."
)
return _CONTRACT_DOC.read_text(encoding="utf-8")
def _parse_descriptor_table(text: str) -> dict[str, bool]:
"""Parse §2's markdown table → {field_name: required}.
Rows look like: ``| `field` | type | yes|no | meaning |``. Returns a map of
field name to whether the Required column says "yes".
"""
fields: dict[str, bool] = {}
# Restrict to the §2 section so §3/§4 tables don't bleed in.
section = text.split("## 2. CapabilityDescriptor", 1)[-1].split("## 3.", 1)[0]
row_re = re.compile(r"^\|\s*`([a-z_]+)`\s*\|[^|]*\|\s*(yes|no)\s*\|", re.M)
for name, required in row_re.findall(section):
fields[name] = required.strip() == "yes"
return fields
def test_descriptor_fields_match_contract_doc():
"""§2 table ⟷ CapabilityDescriptor dataclass, names + required/optional."""
documented = _parse_descriptor_table(_doc_text())
assert documented, "Failed to parse any descriptor fields from the §2 table."
dc_fields = CapabilityDescriptor.__dataclass_fields__ # type: ignore[attr-defined]
# A dataclass field is "required" iff it has no default and no default_factory.
import dataclasses
code_required = {
name
for name, f in dc_fields.items()
if f.default is dataclasses.MISSING
and f.default_factory is dataclasses.MISSING # type: ignore[misc]
}
code_names = set(dc_fields.keys())
doc_names = set(documented.keys())
missing_from_doc = code_names - doc_names
assert not missing_from_doc, (
f"CapabilityDescriptor fields missing from the §2 contract-doc table: "
f"{sorted(missing_from_doc)}. Document them so the connector mirrors them."
)
extra_in_doc = doc_names - code_names
assert not extra_in_doc, (
f"Contract-doc §2 table documents fields the dataclass does not have: "
f"{sorted(extra_in_doc)}. Remove them or add them to descriptor.py."
)
# Required/optional must agree, so the connector knows which fields it may omit.
for name, doc_required in documented.items():
assert doc_required == (name in code_required), (
f"Field '{name}': contract doc says required={doc_required}, but the "
f"dataclass says required={name in code_required}. Reconcile them."
)
def _session_source_wire_keys() -> set[str]:
"""Keys ``SessionSource.to_dict()`` can emit (the actual wire surface).
Build a maximally-populated source so conditionally-included keys (the
``if self.x:`` branches in ``to_dict``) all appear.
"""
from gateway.config import Platform
src = SessionSource(
platform=Platform.DISCORD,
chat_id="c",
chat_name="n",
chat_type="channel",
user_id="u",
user_name="un",
thread_id="t",
chat_topic="topic",
user_id_alt="ua",
chat_id_alt="ca",
guild_id="g",
parent_chat_id="p",
message_id="m",
)
return set(src.to_dict().keys())
def test_session_source_wire_keys_documented_in_contract():
"""Every wire key SessionSource.to_dict() emits is named in the contract doc.
The doc enumerates discriminators in prose + a per-platform table (§3) rather
than a strict field table, so this asserts presence-by-name: a wire key the
connector must populate but which appears nowhere in the doc is a silent gap.
"""
text = _doc_text()
# Limit to §3 (the MessageEvent / SessionSource section).
section = text.split("## 3. Inbound", 1)[-1].split("## 4.", 1)[0]
wire_keys = _session_source_wire_keys()
# Keys that are self-evidently covered by the §3 narrative/table.
# We assert each wire key appears as a backticked token or table cell.
undocumented = sorted(k for k in wire_keys if k not in section)
assert not undocumented, (
f"SessionSource wire keys absent from the §3 contract-doc section: "
f"{undocumented}. The connector normalizes events into these keys; if the "
f"doc doesn't name them the connector author can't know to populate them. "
f"Document them (prose or the discriminator table)."
)
def test_internal_only_session_fields_stay_off_the_wire():
"""Guard the inverse: fields deliberately NOT serialized must not leak.
``is_bot`` is an internal author-classification flag that today is NOT in
``to_dict()`` (so the connector's TS contract correctly omits it). If someone
adds it to the wire without updating the contract doc + connector, this flips
and forces the conversation. This documents the intentional omission.
"""
wire_keys = _session_source_wire_keys()
assert "is_bot" not in wire_keys, (
"is_bot is now serialized by SessionSource.to_dict(). If this is "
"intentional, add it to docs/relay-connector-contract.md §3 and the "
"connector's SessionSource interface, then update this guard."
)
@pytest.mark.parametrize("discriminator", ["chat_id", "chat_type", "user_id", "thread_id", "guild_id"])
def test_discord_telegram_discriminator_columns_present(discriminator):
"""§3's per-platform table headers must exist as SessionSource fields.
These five columns drive build_session_key() and are the #1 High-severity
risk surface (Discord guild_id collision). If the doc advertises a
discriminator column the dataclass can't carry, the connector has nowhere to
put it.
"""
assert discriminator in SessionSource.__dataclass_fields__, ( # type: ignore[attr-defined]
f"Contract doc §3 lists '{discriminator}' as a session discriminator, "
f"but SessionSource has no such field."
)
# And it must be reachable on the wire (chat_type is always emitted; the rest
# are conditional but still possible keys).
assert discriminator in _session_source_wire_keys(), (
f"Discriminator '{discriminator}' never appears in SessionSource.to_dict() "
f"output — the connector cannot transmit it to the gateway."
)

View File

@@ -0,0 +1,66 @@
"""Tests for the experimental CapabilityDescriptor (relay Phase 0, Task 0.2)."""
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def _telegram_descriptor(**overrides) -> CapabilityDescriptor:
base = dict(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
pii_safe=False,
)
base.update(overrides)
return CapabilityDescriptor(**base)
def test_descriptor_roundtrips_json():
d = _telegram_descriptor()
assert CapabilityDescriptor.from_json(d.to_json()) == d
def test_descriptor_is_frozen():
d = _telegram_descriptor()
try:
d.max_message_length = 1 # type: ignore[misc]
except Exception as exc: # FrozenInstanceError
assert "cannot assign" in str(exc) or "frozen" in str(exc).lower()
else: # pragma: no cover
raise AssertionError("descriptor should be immutable (frozen)")
def test_from_json_ignores_unknown_keys():
"""A newer connector may send fields this gateway doesn't know — those are
dropped, not fatal (forward-compat during the experimental phase)."""
d = _telegram_descriptor()
raw = d.to_json()[:-1] + ', "future_field": "ignored"}'
restored = CapabilityDescriptor.from_json(raw)
assert restored == d
def test_from_json_fills_optional_defaults():
"""Optional fields (emoji/platform_hint/pii_safe) fall back to defaults."""
minimal = (
'{"contract_version": 1, "platform": "x", "label": "X", '
'"max_message_length": 2000, "supports_draft_streaming": false, '
'"supports_edit": false, "supports_threads": false, '
'"markdown_dialect": "plain", "len_unit": "chars"}'
)
d = CapabilityDescriptor.from_json(minimal)
assert d.pii_safe is False
assert d.platform_hint == ""
assert d.emoji == "\U0001f50c"
def test_module_is_marked_experimental():
import gateway.relay.descriptor as m
assert "EXPERIMENTAL" in (m.__doc__ or "")

View File

@@ -0,0 +1,64 @@
"""Descriptor <- PlatformEntry projection (relay Phase 0, Task 0.3).
Proves the CapabilityDescriptor is a projection of the existing PlatformEntry,
not a parallel concept: the entry's label/limit/emoji/hint/pii fields carry
straight through.
"""
from gateway.platform_registry import PlatformEntry
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def _entry(**overrides) -> PlatformEntry:
base = dict(
name="telegram",
label="Telegram",
adapter_factory=lambda cfg: None,
check_fn=lambda: True,
max_message_length=4096,
pii_safe=False,
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
)
base.update(overrides)
return PlatformEntry(**base)
def test_projection_carries_platform_entry_fields():
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
assert d.contract_version == CONTRACT_VERSION
assert d.platform == "telegram"
assert d.label == "Telegram"
assert d.max_message_length == 4096
assert d.emoji == "\u2708\ufe0f"
assert d.platform_hint == "You are on Telegram."
assert d.pii_safe is False
assert d.len_unit == "utf16"
def test_zero_max_length_maps_to_4096_default():
"""PlatformEntry.max_message_length == 0 means 'no limit'; the descriptor
carries a concrete bound matching the stream_consumer default."""
d = CapabilityDescriptor.from_platform_entry(_entry(max_message_length=0))
assert d.max_message_length == 4096
def test_runtime_capabilities_supplied_by_caller():
"""PlatformEntry doesn't encode draft/edit/thread/markdown behavior — those
come from the caller (the connector, reading the live adapter)."""
d = CapabilityDescriptor.from_platform_entry(
_entry(),
supports_draft_streaming=True,
supports_edit=False,
supports_threads=True,
markdown_dialect="discord",
)
assert d.supports_draft_streaming is True
assert d.supports_edit is False
assert d.supports_threads is True
assert d.markdown_dialect == "discord"
def test_projection_roundtrips_through_json():
d = CapabilityDescriptor.from_platform_entry(_entry(), len_unit="utf16")
assert CapabilityDescriptor.from_json(d.to_json()) == d

View File

@@ -0,0 +1,44 @@
"""CI guard: the test-only StubConnector must never leak into production paths.
The relay stub connector lives under tests/ and exists only to prove the
gateway side of the relay without the real (Node) connector. If it ever appears
under gateway/ or plugins/, that's a production leak — fail loudly.
"""
from __future__ import annotations
import pathlib
import re
_REPO_ROOT = pathlib.Path(__file__).resolve().parents[3]
_FORBIDDEN_DIRS = ("gateway", "plugins")
# Match actual code leaks (imports / class definitions), not prose mentions in
# docstrings/comments. A production file that *imports* the stub or *defines*
# StubConnector is a real leak; a docstring that references the stub's path as
# documentation is not.
_LEAK_PATTERNS = (
re.compile(r"^\s*(?:from|import)\s+.*stub_connector", re.MULTILINE),
re.compile(r"^\s*(?:from|import)\s+.*\bStubConnector\b", re.MULTILINE),
re.compile(r"^\s*class\s+StubConnector\b", re.MULTILINE),
)
def test_stub_connector_does_not_leak_into_production_paths():
offenders: list[str] = []
for top in _FORBIDDEN_DIRS:
base = _REPO_ROOT / top
if not base.is_dir():
continue
for path in base.rglob("*.py"):
try:
text = path.read_text(encoding="utf-8", errors="ignore")
except OSError: # pragma: no cover
continue
for pat in _LEAK_PATTERNS:
if pat.search(text):
offenders.append(
f"{path.relative_to(_REPO_ROOT)} matches {pat.pattern!r}"
)
assert not offenders, (
"relay test stub leaked into production paths:\n " + "\n ".join(offenders)
)

View File

@@ -0,0 +1,77 @@
"""RelayAdapter capability-advertisement tests (relay Phase 1, Task 1.1)."""
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
def make_desc(**kw) -> CapabilityDescriptor:
base = dict(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="",
pii_safe=False,
)
base.update(kw)
return CapabilityDescriptor(**base)
def _adapter(**desc_kw) -> RelayAdapter:
return RelayAdapter(PlatformConfig(), make_desc(**desc_kw))
def test_relay_platform_member_exists():
assert Platform("relay") is Platform.RELAY
def test_advertises_descriptor_max_length():
a = _adapter(max_message_length=2000)
assert a.MAX_MESSAGE_LENGTH == 2000
def test_supports_draft_streaming_follows_descriptor():
assert _adapter(supports_draft_streaming=False).supports_draft_streaming() is False
assert _adapter(supports_draft_streaming=True).supports_draft_streaming() is True
def test_len_fn_utf16_counts_code_units():
a = _adapter(len_unit="utf16")
# An astral-plane emoji is two UTF-16 code units.
assert a.message_len_fn("\U0001f600") == 2
def test_len_fn_chars_uses_builtin_len():
a = _adapter(len_unit="chars")
assert a.message_len_fn("\U0001f600") == 1
def test_is_a_base_platform_adapter():
# stream_consumer's isinstance(adapter, BasePlatformAdapter) guard must pass.
from gateway.platforms.base import BasePlatformAdapter
assert isinstance(_adapter(), BasePlatformAdapter)
@pytest.mark.asyncio
async def test_connect_without_transport_raises():
a = _adapter()
with pytest.raises(RuntimeError, match="no transport"):
await a.connect()
@pytest.mark.asyncio
async def test_send_without_transport_returns_failure():
a = _adapter()
result = await a.send("chat1", "hello")
assert result.success is False
assert result.error == "no transport"

View File

@@ -0,0 +1,117 @@
"""A2 outbound capability action: the token-less ``follow_up`` op.
Proves the gateway can act on a shared-identity capability (e.g. a Discord
interaction follow-up token) WITHOUT ever holding the credential: it names the
session it is in plus the capability ``kind``, and the connector resolves the
real value from its vault and egresses. See gateway/relay/transport.py
(send_follow_up) and docs/relay-connector-contract.md §4.
The gateway side is what's exercised here (against the stub connector); the
connector's resolve + tenant-match enforcement lives in the connector repo
(resolveOutboundCapability). The key gateway-side guarantees:
- the wire action carries NO token (only session_key + kind + content),
- success/failure surfaces from the connector's resolve result,
- a failed resolve (absent/expired/tenant mismatch) returns success=False
with nothing for the gateway to retry with.
"""
from __future__ import annotations
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _discord_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def wired():
stub = StubConnector(_discord_descriptor())
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_follow_up_round_trips_without_a_token(wired):
adapter, stub = wired
await adapter.connect()
stub.next_follow_up_result = {"success": True, "message_id": "fu-7"}
result = await adapter.send_follow_up(
session_key="agent:main:discord:group:chanA:userX",
kind="discord.interaction_token",
content="here is your follow-up",
)
assert result.success is True
assert result.message_id == "fu-7"
assert len(stub.follow_ups) == 1
action = stub.follow_ups[0]
assert action["op"] == "follow_up"
assert action["session_key"] == "agent:main:discord:group:chanA:userX"
assert action["kind"] == "discord.interaction_token"
assert action["content"] == "here is your follow-up"
@pytest.mark.asyncio
async def test_follow_up_wire_action_carries_no_credential(wired):
"""The action dict must carry only session refs — no credential VALUE.
Note the capability ``kind`` legitimately names the credential type
(e.g. ``"discord.interaction_token"``) — that's a reference, not the secret.
The guarantee is structural: the action has exactly the token-less semantic
fields, and no field holds an actual credential value.
"""
adapter, stub = wired
await adapter.connect()
await adapter.send_follow_up(
session_key="sess-1", kind="discord.interaction_token", content="x", metadata={"a": 1}
)
action = stub.follow_ups[0]
# Exactly the token-less semantic fields (+ metadata); no value/secret field.
assert set(action.keys()) == {"op", "session_key", "kind", "content", "metadata"}
# No field NAMES a credential carrier (the kind string is a type ref, allowed).
assert "value" not in action
assert "token" not in action
assert "secret" not in action
assert "credential" not in action
@pytest.mark.asyncio
async def test_follow_up_failure_surfaces_when_capability_unresolvable(wired):
"""Connector couldn't resolve (absent/expired/tenant mismatch) -> success=False."""
adapter, stub = wired
await adapter.connect()
stub.next_follow_up_result = {"success": False, "error": "capability absent or tenant mismatch"}
result = await adapter.send_follow_up(
session_key="sess-1", kind="discord.interaction_token", content="x"
)
assert result.success is False
assert result.message_id is None
assert "tenant mismatch" in (result.error or "")
@pytest.mark.asyncio
async def test_follow_up_without_transport_fails_cleanly():
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=None)
result = await adapter.send_follow_up(session_key="s", kind="k", content="c")
assert result.success is False
assert result.error == "no transport"

View File

@@ -0,0 +1,69 @@
"""Relay /stop interrupt routing (relay Phase 1, Task 1.4).
Proves a connector-delivered mid-turn interrupt reaches the existing per-session
interrupt mechanism and cancels exactly the targeted session_key's turn — never
a sibling's. Mirrors the isolation discipline of test_stop_thread_sibling.py.
"""
from __future__ import annotations
import asyncio
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _desc() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def adapter():
return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc()))
@pytest.mark.asyncio
async def test_interrupt_sets_only_target_session_event(adapter):
key_a = "agent:main:discord:group:chanA:userX"
key_b = "agent:main:discord:group:chanB:userY"
ev_a = asyncio.Event()
ev_b = asyncio.Event()
adapter._active_sessions[key_a] = ev_a
adapter._active_sessions[key_b] = ev_b
await adapter.on_interrupt(key_a, chat_id="chanA")
assert ev_a.is_set() is True, "target session's interrupt Event must be set"
assert ev_b.is_set() is False, "sibling session must be untouched"
@pytest.mark.asyncio
async def test_interrupt_unknown_session_is_noop(adapter):
# No active session for this key — must not raise.
await adapter.on_interrupt("agent:main:discord:group:nope:userZ", chat_id="nope")
@pytest.mark.asyncio
async def test_outbound_interrupt_reaches_connector(adapter):
"""The gateway-side /stop egress: send_interrupt is carried to the connector
so it can forward down the socket owning the session_key."""
stub = adapter._transport
await stub.send_interrupt("agent:main:discord:group:chanA:userX", reason="stop")
assert stub.interrupts == [
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
]

View File

@@ -0,0 +1,66 @@
"""RelayAdapter registration via the platform registry.
The relay platform is registered when a connector relay URL is configured
(``GATEWAY_RELAY_URL`` env or ``gateway.relay_url`` in config.yaml) — the same
config-driven shape as ``gateway.proxy_url``, not a separate feature flag. With
no URL configured, registration is a no-op so direct/single-tenant deployments
are unaffected. ``force=True`` registers a transport-less adapter for tests.
"""
from __future__ import annotations
import pytest
from gateway.config import PlatformConfig
from gateway.platform_registry import platform_registry
from gateway.relay import register_relay_adapter, relay_url
from gateway.relay.adapter import RelayAdapter
@pytest.fixture(autouse=True)
def _clean_registry(monkeypatch):
"""Each test starts/ends with no 'relay' entry and a clean relay env."""
monkeypatch.delenv("GATEWAY_RELAY_URL", raising=False)
monkeypatch.delenv("GATEWAY_RELAY_PLATFORM", raising=False)
monkeypatch.delenv("GATEWAY_RELAY_BOT_ID", raising=False)
platform_registry.unregister("relay")
yield
platform_registry.unregister("relay")
def test_off_when_no_url_configured(monkeypatch):
# No GATEWAY_RELAY_URL and (assuming) no gateway.relay_url in config.
monkeypatch.setattr("gateway.relay.relay_url", lambda: None)
assert register_relay_adapter() is False
assert platform_registry.is_registered("relay") is False
def test_registers_when_url_configured(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay")
assert relay_url() == "wss://connector.example/relay"
assert register_relay_adapter() is True
assert platform_registry.is_registered("relay") is True
def test_explicit_url_arg_registers():
assert register_relay_adapter(url="wss://connector.example/relay") is True
assert platform_registry.is_registered("relay") is True
def test_force_registers_without_url():
assert register_relay_adapter(force=True) is True
assert platform_registry.is_registered("relay") is True
def test_trailing_slash_stripped(monkeypatch):
monkeypatch.setenv("GATEWAY_RELAY_URL", "wss://connector.example/relay/")
assert relay_url() == "wss://connector.example/relay"
def test_create_adapter_yields_relay_adapter():
# force=True builds a transport-less adapter (no live dial in unit tests).
register_relay_adapter(force=True)
adapter = platform_registry.create_adapter("relay", PlatformConfig())
assert isinstance(adapter, RelayAdapter)
# Placeholder descriptor until handshake negotiates the real one.
assert adapter.descriptor.platform == "relay"

View File

@@ -0,0 +1,122 @@
"""End-to-end relay round-trip against the in-memory stub connector.
Proves the gateway side of the relay works with no real connector:
- connect() registers the inbound handler,
- a connector-delivered MessageEvent reaches the adapter's message path,
- SessionSource discriminators (guild_id) drive build_session_key isolation,
- an outbound send round-trips through the transport.
These target the transport contract + session-key derivation (Task 1.2's gate),
not the full agent turn — handle_message is patched to capture the event.
"""
from __future__ import annotations
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _discord_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
emoji="\U0001f47e",
platform_hint="You are on Discord.",
pii_safe=False,
)
def _discord_event(guild_id: str, channel_id: str, user_id: str, text: str) -> MessageEvent:
"""Synthetic inbound the connector would build from a discord.js message."""
source = SessionSource(
platform=Platform.DISCORD,
chat_id=channel_id,
chat_type="group",
user_id=user_id,
guild_id=guild_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
@pytest.fixture
def wired():
stub = StubConnector(_discord_descriptor())
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_connect_registers_inbound_handler(wired):
adapter, stub = wired
assert stub._inbound is None
ok = await adapter.connect()
assert ok is True
assert stub.connected is True
assert stub._inbound is not None
@pytest.mark.asyncio
async def test_inbound_event_reaches_adapter(wired, monkeypatch):
adapter, stub = wired
captured = []
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
await adapter.connect()
ev = _discord_event("guildA", "chan1", "userX", "hello")
await stub.push_inbound(ev)
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_two_guilds_isolate_into_distinct_session_keys(wired):
adapter, _ = wired
ev_a = _discord_event("guildA", "chan1", "userX", "hi from A")
ev_b = _discord_event("guildB", "chan2", "userX", "hi from B")
key_a = build_session_key(ev_a.source)
key_b = build_session_key(ev_b.source)
assert key_a != key_b
# Same guild + channel + user collapses to one session.
ev_a2 = _discord_event("guildA", "chan1", "userX", "again")
assert build_session_key(ev_a2.source) == key_a
@pytest.mark.asyncio
async def test_outbound_send_round_trips(wired):
adapter, stub = wired
await adapter.connect()
stub.next_send_result = {"success": True, "message_id": "msg-42"}
result = await adapter.send("chan1", "a reply", metadata={"k": "v"})
assert result.success is True
assert result.message_id == "msg-42"
assert len(stub.sent) == 1
assert stub.sent[0]["op"] == "send"
assert stub.sent[0]["chat_id"] == "chan1"
assert stub.sent[0]["content"] == "a reply"
@pytest.mark.asyncio
async def test_get_chat_info_proxied_to_connector(wired):
adapter, stub = wired
stub.chat_info["chan1"] = {"name": "general", "type": "group"}
info = await adapter.get_chat_info("chan1")
assert info == {"name": "general", "type": "group"}
async def _async_capture(sink, event):
sink.append(event)
return None

View File

@@ -0,0 +1,167 @@
"""End-to-end relay round-trip for Telegram against the in-memory stub.
Companion to ``test_relay_roundtrip.py`` (Discord). Proves the relay generalizes
beyond Discord — the Phase 1 exit gate requires *both* Telegram and Discord
descriptors to round-trip and their inbound ``MessageEvent``s to drive
``build_session_key()`` correctly.
Telegram's discriminator profile differs from Discord's, which is the point:
- No ``guild_id``; isolation between chats comes from ``chat_id`` alone.
- Forum topics live inside ONE ``chat_id`` and isolate by ``thread_id`` (the
Telegram analog of Discord's per-guild isolation).
- Forum/thread sessions are shared across participants by default
(``thread_sessions_per_user=False``) — user_id is NOT appended in a thread.
- ``len_unit="utf16"`` (Telegram counts UTF-16 code units) and
``markdown_dialect="markdown_v2"`` — distinct from Discord's chars/discord.
If the descriptor or session-keying only worked for Discord, these fail.
"""
from __future__ import annotations
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _telegram_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="telegram",
label="Telegram",
max_message_length=4096,
supports_draft_streaming=True, # Telegram DMs support sendMessageDraft
supports_edit=True,
supports_threads=True, # forum topics
markdown_dialect="markdown_v2",
len_unit="utf16",
emoji="\u2708\ufe0f",
platform_hint="You are on Telegram.",
pii_safe=False,
)
def _tg_group_event(chat_id: str, user_id: str, text: str, thread_id: str | None = None) -> MessageEvent:
"""Synthetic inbound the connector would build from a Telegram update.
A plain group message has no thread_id; a forum-topic message carries the
topic id as thread_id (no guild_id — Telegram has no guild concept).
"""
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="forum" if thread_id else "group",
user_id=user_id,
thread_id=thread_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
def _tg_dm_event(chat_id: str, user_id: str, text: str) -> MessageEvent:
source = SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type="dm",
user_id=user_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
@pytest.fixture
def wired():
desc = _telegram_descriptor()
stub = StubConnector(desc)
adapter = RelayAdapter(PlatformConfig(), desc, transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_telegram_descriptor_round_trips_through_stub(wired):
"""The connector's handshake descriptor for Telegram survives JSON + the
adapter configures itself from it (utf16 length unit, 4096 limit)."""
adapter, stub = wired
desc = _telegram_descriptor()
assert CapabilityDescriptor.from_json(desc.to_json()) == desc
# Adapter reflects the descriptor's capability profile.
assert adapter.MAX_MESSAGE_LENGTH == 4096
assert adapter.supports_draft_streaming() is True
# utf16 length unit selects a non-default len fn (Telegram counts UTF-16).
assert adapter.message_len_fn is not len
@pytest.mark.asyncio
async def test_inbound_telegram_event_reaches_adapter(wired, monkeypatch):
adapter, stub = wired
captured: list[MessageEvent] = []
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
await adapter.connect()
await stub.push_inbound(_tg_group_event("chat-100", "userX", "hello"))
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.platform == Platform.TELEGRAM
assert captured[0].source.guild_id is None # Telegram has no guild
@pytest.mark.asyncio
async def test_two_telegram_chats_isolate_by_chat_id(wired):
"""No guild_id on Telegram — two distinct chats must still isolate, keyed
on chat_id alone (the Discord-guild role is played by chat_id here)."""
ev_a = _tg_group_event("chat-A", "userX", "hi A")
ev_b = _tg_group_event("chat-B", "userX", "hi B")
key_a = build_session_key(ev_a.source)
key_b = build_session_key(ev_b.source)
assert key_a != key_b
# Same chat + same user collapses to one session.
ev_a2 = _tg_group_event("chat-A", "userX", "again")
assert build_session_key(ev_a2.source) == key_a
@pytest.mark.asyncio
async def test_forum_topics_isolate_by_thread_id_within_one_chat(wired):
"""Telegram forum topics share a single chat_id and isolate by thread_id —
the Telegram analog of Discord per-guild isolation. Two topics in the same
forum must NOT collide, and (threads shared across participants by default)
a second user in the same topic shares the session."""
topic1 = _tg_group_event("forum-1", "userX", "in topic 1", thread_id="t-1")
topic2 = _tg_group_event("forum-1", "userX", "in topic 2", thread_id="t-2")
k1 = build_session_key(topic1.source)
k2 = build_session_key(topic2.source)
assert k1 != k2, "two forum topics in one chat must not share a session"
# Same chat, no topic → distinct from any topic session.
plain = _tg_group_event("forum-1", "userX", "no topic")
assert build_session_key(plain.source) not in {k1, k2}
# Threads are shared across participants by default: a different user in the
# same topic lands on the SAME session key (user_id not appended in threads).
topic1_other_user = _tg_group_event("forum-1", "userY", "me too", thread_id="t-1")
assert build_session_key(topic1_other_user.source) == k1
@pytest.mark.asyncio
async def test_telegram_dm_isolates_by_chat_id(wired):
dm_a = _tg_dm_event("dm-111", "userX", "hey")
dm_b = _tg_dm_event("dm-222", "userY", "yo")
assert build_session_key(dm_a.source) != build_session_key(dm_b.source)
assert build_session_key(dm_a.source).startswith("agent:main:telegram:dm:")
@pytest.mark.asyncio
async def test_outbound_send_round_trips_telegram(wired):
adapter, stub = wired
await adapter.connect()
stub.next_send_result = {"success": True, "message_id": "tg-77"}
result = await adapter.send("chat-100", "a reply")
assert result.success is True
assert result.message_id == "tg-77"
assert stub.sent[0]["op"] == "send"
assert stub.sent[0]["chat_id"] == "chat-100"
async def _async_capture(sink, event):
sink.append(event)
return None

View File

@@ -0,0 +1,91 @@
"""Invariant: the relay path sheds platform crypto — it re-validates nothing.
Under the A2 trust model (see docs/relay-connector-contract.md §6), the
*connector* is the sole crypto/identity boundary: it verifies/decrypts every
inbound platform payload at the edge (it holds the tenant secrets), normalizes
it to a tenant-scoped ``MessageEvent``, and forwards only the sanitized event.
The gateway re-validates nothing — it cannot, without being handed the shared
signing secret, which would itself be the leak on a shared bot.
The relay package therefore MUST NOT import or call platform signature/crypto
verification (Discord ed25519, Twilio HMAC, WeCom BizMsgCrypt, generic webhook
signature checks). Those live in the *direct* platform adapters
(``gateway/platforms/*``) which serve non-relay deployments; the relay receives
already-trusted events. This test fails if someone bolts re-validation onto the
relay path, re-coupling the gateway to platform secrets it must never hold.
It is an invariant (asserts the *relation* "relay imports no crypto"), not a
change-detector snapshot of a frozen import list.
"""
from __future__ import annotations
import ast
import re
from pathlib import Path
# gateway/relay package directory: tests/gateway/relay/ -> repo root parents[3].
_REPO_ROOT = Path(__file__).resolve().parents[3]
_RELAY_PKG = _REPO_ROOT / "gateway" / "relay"
# Modules / symbols that mean "platform crypto re-validation". If the relay path
# imports any of these it has re-coupled the gateway to a platform secret.
_FORBIDDEN_MODULE_TOKENS = (
"wecom_crypto",
"wecom_callback",
"webhook", # gateway.platforms.webhook holds signature verification
)
_FORBIDDEN_SYMBOL_RE = re.compile(
r"(ed25519|verify_key|verifykey|verify_signature|verify_ed25519|"
r"verify_webhook|bizmsg|hmac|x[-_]signature)",
re.IGNORECASE,
)
def _relay_py_files() -> list[Path]:
assert _RELAY_PKG.is_dir(), f"relay package missing at {_RELAY_PKG}"
return sorted(_RELAY_PKG.glob("*.py"))
def test_relay_package_imports_no_platform_crypto():
"""No module in gateway/relay imports a platform-crypto / verification module."""
offenders: list[str] = []
for path in _relay_py_files():
tree = ast.parse(path.read_text(encoding="utf-8"), filename=str(path))
for node in ast.walk(tree):
mods: list[str] = []
if isinstance(node, ast.Import):
mods = [alias.name for alias in node.names]
elif isinstance(node, ast.ImportFrom):
mods = [node.module or ""]
mods += [f"{node.module or ''}.{a.name}" for a in node.names]
for mod in mods:
if any(tok in mod for tok in _FORBIDDEN_MODULE_TOKENS):
offenders.append(f"{path.name}: imports '{mod}'")
assert not offenders, (
"The relay path must re-validate NOTHING (A2: connector is the sole "
"crypto boundary). Found platform-crypto imports in the relay package:\n "
+ "\n ".join(offenders)
+ "\nMove verification to the connector edge; the gateway trusts the "
"normalized MessageEvent. See docs/relay-connector-contract.md §6."
)
def test_relay_package_calls_no_signature_verification():
"""No relay module references a signature/crypto-verification symbol by name."""
offenders: list[str] = []
for path in _relay_py_files():
for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1):
# Skip comments / docstrings-as-prose: only flag code-like usage.
stripped = line.strip()
if stripped.startswith("#"):
continue
m = _FORBIDDEN_SYMBOL_RE.search(line)
if m:
offenders.append(f"{path.name}:{lineno}: '{m.group(0)}' in: {stripped[:80]}")
assert not offenders, (
"The relay path must not perform platform signature/crypto verification "
"(A2). Found verification-symbol references:\n "
+ "\n ".join(offenders)
+ "\nThe connector verifies at the edge; the gateway re-validates nothing."
)

View File

@@ -0,0 +1,179 @@
"""WebSocketRelayTransport against a real in-process WebSocket server.
Exercises the production transport over an actual ``websockets`` server (no
mock socket): handshake (hello -> descriptor), inbound frame -> handler,
outbound request/response correlation, and follow_up routing. Proves the wire
framing (newline-delimited JSON) and the request/response future plumbing work
end to end on a live socket.
Skipped cleanly if the optional ``websockets`` dependency is absent.
"""
from __future__ import annotations
import asyncio
import json
import pytest
import pytest_asyncio
from gateway.relay.ws_transport import WebSocketRelayTransport, WEBSOCKETS_AVAILABLE
pytestmark = pytest.mark.skipif(not WEBSOCKETS_AVAILABLE, reason="websockets not installed")
if WEBSOCKETS_AVAILABLE:
import websockets
DESCRIPTOR = {
"contract_version": 1,
"platform": "discord",
"label": "Discord",
"max_message_length": 2000,
"supports_draft_streaming": False,
"supports_edit": True,
"supports_threads": True,
"markdown_dialect": "discord",
"len_unit": "chars",
}
class _StubConnectorServer:
"""Minimal connector: answers hello with a descriptor, echoes outbound."""
def __init__(self):
self.received: list[dict] = []
self._server = None
self.url = ""
# Push channel: tests set this to a frame dict to deliver inbound.
self._to_push: list[dict] = []
async def start(self):
self._server = await websockets.serve(self._handle, "127.0.0.1", 0)
sock = next(iter(self._server.sockets))
port = sock.getsockname()[1]
self.url = f"ws://127.0.0.1:{port}"
async def stop(self):
if self._server is not None:
self._server.close()
await self._server.wait_closed()
async def _handle(self, ws):
async for raw in ws:
for line in str(raw).split("\n"):
if not line.strip():
continue
frame = json.loads(line)
self.received.append(frame)
await self._on_frame(ws, frame)
async def _on_frame(self, ws, frame):
ftype = frame.get("type")
if ftype == "hello":
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
# Deliver any queued inbound frames right after handshake.
for f in self._to_push:
await ws.send(json.dumps(f) + "\n")
elif ftype == "outbound":
action = frame.get("action", {})
# Echo a successful result correlated by requestId.
result = {"success": True, "message_id": f"srv-{action.get('op')}"}
await ws.send(
json.dumps({"type": "outbound_result", "requestId": frame["requestId"], "result": result})
+ "\n"
)
@pytest_asyncio.fixture
async def server():
srv = _StubConnectorServer()
await srv.start()
yield srv
await srv.stop()
@pytest.mark.asyncio
async def test_handshake_negotiates_descriptor(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
desc = await t.handshake()
assert desc.platform == "discord"
assert desc.max_message_length == 2000
# The hello carried the platform + botId.
hello = next(f for f in server.received if f["type"] == "hello")
assert hello["platform"] == "discord"
assert hello["botId"] == "appShared"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_inbound_frame_reaches_handler(server):
server._to_push = [
{
"type": "inbound",
"event": {
"text": "hello from connector",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
},
"bufferId": "buf-1",
}
]
received = []
t = WebSocketRelayTransport(server.url, "discord", "appShared")
t.set_inbound_handler(lambda ev: received.append(ev) or asyncio.sleep(0))
await t.connect()
try:
await t.handshake()
# Give the reader a tick to deliver the pushed inbound frame.
await asyncio.sleep(0.05)
assert len(received) == 1
assert received[0].text == "hello from connector"
assert received[0].source.guild_id == "guildA"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_outbound_round_trips_with_correlation(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
result = await t.send_outbound({"op": "send", "chat_id": "chan1", "content": "hi"})
assert result["success"] is True
assert result["message_id"] == "srv-send"
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_follow_up_round_trips(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
result = await t.send_follow_up(
{"op": "follow_up", "session_key": "s1", "kind": "discord.interaction_token", "content": "fu"}
)
assert result["success"] is True
assert result["message_id"] == "srv-follow_up"
# The follow_up rode an outbound frame the connector saw.
outbound = [f for f in server.received if f["type"] == "outbound"]
assert any(f["action"]["op"] == "follow_up" for f in outbound)
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_disconnect_fails_pending_waiters_cleanly(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared", outbound_timeout_s=5)
await t.connect()
await t.handshake()
await t.disconnect()
# After disconnect, an outbound returns a structured failure rather than hanging.
result = await t.send_outbound({"op": "send", "chat_id": "c", "content": "x"})
assert result["success"] is False

View File

@@ -98,6 +98,8 @@ def test_checker_returns_true_when_configured(platform, checker, monkeypatch):
mock_config.extra = {"app_id": "app", "app_secret": "sec"}
elif platform == Platform.DINGTALK:
mock_config.extra = {"client_id": "id", "client_secret": "sec"}
elif platform == Platform.RELAY:
mock_config.extra = {"relay_url": "wss://connector.example/relay"}
else:
pytest.skip(f"No synthetic config defined for {platform.value}")

View File

@@ -0,0 +1,99 @@
"""Phase 0 regression harness for the relay/connector work.
Locks the *behavioral contract* that the future ``RelayAdapter`` must reproduce:
the gateway's ``stream_consumer`` and ``BasePlatformAdapter`` read per-platform
capabilities through a small, stable surface. A relay adapter that exposes the
same surface (``MAX_MESSAGE_LENGTH`` attribute, ``message_len_fn`` property,
``supports_draft_streaming`` probe, and only the abstract methods) slots into
the existing consumer with no consumer changes.
These are deliberately *behavioral* (construct an adapter, drive the code,
assert the observable outcome) rather than source-string snapshots, per the
repo's "don't write change-detector tests" rule. They pass on ``main`` before
any ``RelayAdapter`` exists — they describe the contract, not the relay.
"""
import inspect
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter
class _MinAdapter(BasePlatformAdapter):
"""Smallest concrete adapter: implements exactly the abstract methods."""
async def connect(self): # pragma: no cover - not called
return True
async def disconnect(self): # pragma: no cover - not called
return None
async def send(self, *args, **kwargs): # pragma: no cover - not called
return None
async def get_chat_info(self, chat_id): # pragma: no cover - not called
return {}
def _make() -> BasePlatformAdapter:
return _MinAdapter(PlatformConfig(), Platform.LOCAL)
def test_abstract_methods_are_the_known_set():
"""The relay adapter must implement exactly this set of abstract methods.
Everything else on BasePlatformAdapter has a default, so ONE generic
RelayAdapter overriding the right subset is feasible without per-platform
gateway classes. If a new abstractmethod is added here, the relay design
(and the cross-repo contract) must be revisited — hence the lock.
NOTE: this is four methods, not three — ``get_chat_info`` is abstract too
(defined far below the connect/disconnect/send cluster in base.py). The
RelayAdapter must implement it (proxying a chat-info lookup to the
connector, or returning a descriptor-derived stub).
"""
abstract = {
name
for name, member in inspect.getmembers(BasePlatformAdapter)
if getattr(member, "__isabstractmethod__", False)
}
assert abstract == {"connect", "disconnect", "send", "get_chat_info"}
def test_message_len_fn_defaults_to_len():
"""message_len_fn is the per-platform length-unit hook (Telegram overrides
it for UTF-16). The default is plain ``len``; the relay adapter will
override it from its negotiated descriptor's ``len_unit``."""
inst = _make()
assert inst.message_len_fn("hello") == 5
def test_supports_draft_streaming_defaults_false():
"""Draft streaming is opt-in per platform; the consumer falls back to the
edit-based path when False. The relay adapter flips this from its
descriptor's ``supports_draft_streaming`` flag."""
inst = _make()
assert inst.supports_draft_streaming() is False
def test_stream_consumer_reads_max_message_length_by_attribute():
"""The consumer resolves the per-platform char limit by reading the
adapter's ``MAX_MESSAGE_LENGTH`` attribute (defaulting to 4096 when
absent). The relay adapter exposes this as an attribute set from its
descriptor — so a relay adapter that sets the attribute is chunked
correctly with no consumer change.
"""
from gateway import stream_consumer
class _NoLimit:
pass
class _WithLimit:
MAX_MESSAGE_LENGTH = 1234
assert getattr(_NoLimit(), "MAX_MESSAGE_LENGTH", 4096) == 4096
assert getattr(_WithLimit(), "MAX_MESSAGE_LENGTH", 4096) == 1234
# The consumer depends on BasePlatformAdapter for the message_len_fn
# isinstance guard (import-level contract the relay adapter satisfies by
# subclassing BasePlatformAdapter).
assert stream_consumer._BasePlatformAdapter is BasePlatformAdapter

View File

@@ -205,13 +205,6 @@ def test_corr_id_pending_set_self_trims():
@pytest.mark.asyncio
async def test_send_dm():
"""DMs use the bare ``@<id> text`` chat-command form.
The bracketed form ``@[<id>] text`` is what the daemon's man page
documents, but in practice both addressing styles route through
the same chat-command parser; bare ``@<id>`` matches what every
Hermes deployment has been using in production for months.
"""
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True, extra={"ws_url": "ws://localhost:5225"})
adapter = SimplexAdapter(cfg)
@@ -229,14 +222,6 @@ async def test_send_dm():
@pytest.mark.asyncio
async def test_send_group():
"""Groups use the structured ``/_send #<id> json [...]`` form.
The bracket chat-command form ``#[<id>] text`` *looks* like an exact
ID match in the daemon docs but is parsed as a display-name lookup
— so messages to groups whose display name isn't literally the ID
silently drop. The structured ``/_send`` form addresses by numeric
ID and survives newlines/quoting through ``json.dumps``.
"""
from gateway.config import PlatformConfig
cfg = PlatformConfig(enabled=True, extra={"ws_url": "ws://localhost:5225"})
adapter = SimplexAdapter(cfg)
@@ -246,11 +231,7 @@ async def test_send_group():
result = await adapter.send("group:grp-99", "Hello, group!")
payload = json.loads(mock_ws.send.call_args[0][0])
assert payload["cmd"].startswith("/_send #grp-99 json ")
msg_content = json.loads(payload["cmd"].split(" json ", 1)[1])[0][
"msgContent"
]
assert msg_content == {"type": "text", "text": "Hello, group!"}
assert payload["cmd"] == "#[grp-99] Hello, group!"
assert result.success is True

View File

@@ -1,8 +1,8 @@
"""Tests for the Photon auth module (device login + dashboard API)."""
"""Tests for the Photon auth module (device login + project + user creation)."""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any, Dict
@@ -36,91 +36,51 @@ class _FakeResponse:
raise RuntimeError(f"HTTP {self.status_code}")
_PHOTON_ENV = (
"PHOTON_PROJECT_ID",
"PHOTON_PROJECT_SECRET",
"PHOTON_DASHBOARD_PROJECT_ID",
)
@pytest.fixture
def tmp_hermes_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch):
def tmp_hermes_home(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path:
home = tmp_path / "hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
for key in _PHOTON_ENV:
monkeypatch.delenv(key, raising=False)
yield home
# save_env_value() mutates os.environ directly, so scrub any leakage.
for key in _PHOTON_ENV:
os.environ.pop(key, None)
# The auth module memoises by reading get_hermes_home at call time
# so the env var is what matters.
return home
# ---------------------------------------------------------------------------
# Credential storage
def test_store_and_load_photon_token(tmp_hermes_home: Path) -> None:
photon_auth.store_photon_token("abc123def456")
assert photon_auth.load_photon_token() == "abc123def456"
auth_json = json.loads((tmp_hermes_home / "auth.json").read_text())
assert "credential_pool" in auth_json
assert auth_json["credential_pool"]["photon"][0]["access_token"] == "abc123def456"
def test_store_project_credentials_round_trip(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
# Don't touch .env / os.environ here — exercise the auth.json path.
monkeypatch.setattr(photon_auth, "_persist_runtime_env", lambda *a, **k: None)
def test_store_and_load_project_credentials(tmp_hermes_home: Path) -> None:
photon_auth.store_project_credentials(
spectrum_project_id="sp-123",
project_secret="secret-key",
dashboard_project_id="dash-456",
name="Hermes Agent",
"proj-uuid", "secret-key", name="Test Project",
)
for key in _PHOTON_ENV:
monkeypatch.delenv(key, raising=False)
sid, secret = photon_auth.load_project_credentials()
assert sid == "sp-123"
pid, secret = photon_auth.load_project_credentials()
assert pid == "proj-uuid"
assert secret == "secret-key"
assert photon_auth.load_dashboard_project_id() == "dash-456"
def test_store_project_credentials_writes_env(tmp_hermes_home: Path) -> None:
photon_auth.store_project_credentials(
spectrum_project_id="sp-789",
project_secret="sek-ret",
dashboard_project_id="dash-1",
)
env_text = (tmp_hermes_home / ".env").read_text()
assert "PHOTON_PROJECT_ID=sp-789" in env_text
assert "PHOTON_PROJECT_SECRET=sek-ret" in env_text
def test_load_project_credentials_env_override(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(photon_auth, "_persist_runtime_env", lambda *a, **k: None)
photon_auth.store_project_credentials(
spectrum_project_id="from-file", project_secret="secret-file",
)
photon_auth.store_project_credentials("from-file", "secret-file")
monkeypatch.setenv("PHOTON_PROJECT_ID", "from-env")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "secret-env")
sid, secret = photon_auth.load_project_credentials()
assert sid == "from-env"
pid, secret = photon_auth.load_project_credentials()
assert pid == "from-env"
assert secret == "secret-env"
# ---------------------------------------------------------------------------
# Device login flow
def test_request_device_code_uses_photon_cli(monkeypatch: pytest.MonkeyPatch) -> None:
def test_request_device_code(monkeypatch: pytest.MonkeyPatch) -> None:
captured: Dict[str, Any] = {}
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
captured["url"] = url
captured["body"] = kwargs.get("json")
captured["body"] = json
return _FakeResponse(json_body={
"device_code": "dev-code-xyz",
"user_code": "ABCD-1234",
@@ -135,6 +95,7 @@ def test_request_device_code_uses_photon_cli(monkeypatch: pytest.MonkeyPatch) ->
code = photon_auth.request_device_code()
assert code.device_code == "dev-code-xyz"
assert code.user_code == "ABCD-1234"
assert code.expires_in == 600
assert "/api/auth/device/code" in captured["url"]
# Hosted Photon allowlists registered device clients — an unregistered
# client_id is rejected with 400 invalid_client. We use Photon's published
@@ -143,298 +104,187 @@ def test_request_device_code_uses_photon_cli(monkeypatch: pytest.MonkeyPatch) ->
assert captured["body"]["scope"] == "openid profile email"
def _device_code() -> "photon_auth.DeviceCode":
return photon_auth.DeviceCode(
def test_poll_for_token_via_header(monkeypatch: pytest.MonkeyPatch) -> None:
"""Token from set-auth-token header is the documented mechanism."""
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=200,
json_body={"session": {}, "user": {}},
headers={"set-auth-token": "bearer-xyz"},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
token = photon_auth.poll_for_token(code, interval=0, timeout=2)
assert token == "bearer-xyz"
def test_poll_for_token_body_access_token(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(status=200, json_body={"access_token": "tok-body"})
def test_poll_for_token_via_body_fallback(monkeypatch: pytest.MonkeyPatch) -> None:
"""If the header is absent we fall back to session.access_token."""
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=200,
json_body={"session": {"access_token": "from-body"}, "user": {}},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
assert photon_auth.poll_for_token(_device_code(), interval=0, timeout=2) == "tok-body"
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
assert photon_auth.poll_for_token(code, interval=0, timeout=2) == "from-body"
def test_poll_for_token_session_fallback(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(status=200, json_body={"session": {"access_token": "tok-sess"}})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
assert photon_auth.poll_for_token(_device_code(), interval=0, timeout=2) == "tok-sess"
def test_poll_for_token_header_fallback(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(status=200, json_body={}, headers={"set-auth-token": "tok-hdr"})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
assert photon_auth.poll_for_token(_device_code(), interval=0, timeout=2) == "tok-hdr"
def test_poll_for_token_pending_then_success(monkeypatch: pytest.MonkeyPatch) -> None:
calls = {"n": 0}
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
calls["n"] += 1
if calls["n"] == 1:
return _FakeResponse(status=400, json_body={"error": "authorization_pending"})
return _FakeResponse(status=200, json_body={"access_token": "tok-eventual"})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
assert photon_auth.poll_for_token(_device_code(), interval=0, timeout=5) == "tok-eventual"
assert calls["n"] == 2
def test_poll_for_token_access_denied(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(status=400, json_body={"error": "access_denied"})
def test_poll_for_token_propagates_access_denied(
monkeypatch: pytest.MonkeyPatch,
) -> None:
def fake_post(url: str, *, json: Dict[str, Any], timeout: float) -> _FakeResponse:
return _FakeResponse(
status=400, json_body={"error": "access_denied"},
)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
code = photon_auth.DeviceCode(
device_code="d", user_code="u",
verification_uri="https://x", verification_uri_complete=None,
expires_in=10, interval=0,
)
with pytest.raises(RuntimeError, match="access_denied"):
photon_auth.poll_for_token(_device_code(), interval=0, timeout=2)
photon_auth.poll_for_token(code, interval=0, timeout=2)
# ---------------------------------------------------------------------------
# Projects
def test_list_projects_unwraps_list(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body=[{"id": "p1", "name": "Hermes Agent"}])
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
projects = photon_auth.list_projects("tok")
assert projects[0]["id"] == "p1"
def test_find_project_by_name_case_insensitive(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body={"data": [
{"id": "p1", "name": "Other"},
{"id": "p2", "name": "hermes agent"},
]})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
proj = photon_auth.find_project_by_name("tok", "Hermes Agent")
assert proj is not None and proj["id"] == "p2"
def test_create_project_sends_spectrum_true(monkeypatch: pytest.MonkeyPatch) -> None:
captured: Dict[str, Any] = {}
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
captured["url"] = url
captured["body"] = kwargs.get("json")
captured["headers"] = kwargs.get("headers")
return _FakeResponse(json_body={"success": True, "id": "new-proj"})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
data = photon_auth.create_project("tok", name="Hermes Agent")
assert data["id"] == "new-proj"
assert captured["body"]["spectrum"] is True
assert captured["body"]["name"] == "Hermes Agent"
assert captured["headers"]["Authorization"] == "Bearer tok"
assert captured["url"].endswith("/api/projects")
def test_create_project_raises_without_id(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body={"success": True})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
with pytest.raises(RuntimeError, match="project id"):
photon_auth.create_project("tok")
def test_ensure_spectrum_enabled_toggles_when_off(monkeypatch: pytest.MonkeyPatch) -> None:
get_calls = {"n": 0}
posted = {"toggle": False}
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
get_calls["n"] += 1
if get_calls["n"] == 1:
return _FakeResponse(json_body={"id": "p", "spectrum": False, "spectrumProjectId": None})
return _FakeResponse(json_body={"id": "p", "spectrum": True, "spectrumProjectId": "sp-1"})
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
if url.endswith("/spectrum/toggle"):
posted["toggle"] = True
return _FakeResponse(json_body={"success": True})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
proj = photon_auth.ensure_spectrum_enabled("tok", "p")
assert posted["toggle"] is True
assert proj["spectrumProjectId"] == "sp-1"
def test_ensure_spectrum_enabled_skips_toggle_when_on(monkeypatch: pytest.MonkeyPatch) -> None:
posted = {"toggle": False}
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body={"id": "p", "spectrum": True, "spectrumProjectId": "sp-1"})
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
if url.endswith("/spectrum/toggle"):
posted["toggle"] = True
return _FakeResponse(json_body={"success": True})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
proj = photon_auth.ensure_spectrum_enabled("tok", "p")
assert posted["toggle"] is False
assert proj["spectrumProjectId"] == "sp-1"
def test_regenerate_project_secret(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
assert url.endswith("/regenerate-secret")
return _FakeResponse(json_body={"success": True, "projectSecret": "rotated"})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
assert photon_auth.regenerate_project_secret("tok", "p") == "rotated"
# ---------------------------------------------------------------------------
# Users
def test_create_user_rejects_invalid_phone() -> None:
with pytest.raises(ValueError, match="E.164"):
photon_auth.create_user("tok", "proj", phone_number="not-a-number")
photon_auth.create_user(
"proj", "secret", phone_number="not-a-number",
)
def test_create_user_posts_dashboard_shape(monkeypatch: pytest.MonkeyPatch) -> None:
def test_create_user_posts_shared_type(monkeypatch: pytest.MonkeyPatch) -> None:
captured: Dict[str, Any] = {}
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
def fake_post(url: str, *, json: Dict[str, Any], auth: tuple, timeout: float) -> _FakeResponse:
captured["url"] = url
captured["body"] = kwargs.get("json")
captured["headers"] = kwargs.get("headers")
return _FakeResponse(json_body={"success": True, "user": {
"id": "user-uuid", "phoneNumber": "+15551234567",
}})
captured["body"] = json
captured["auth"] = auth
return _FakeResponse(json_body={
"succeed": True,
"data": {
"id": "user-uuid",
"phoneNumber": "+15551234567",
"assignedPhoneNumber": "+15559999999",
},
})
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
user = photon_auth.create_user("tok", "proj-id", phone_number="+15551234567")
assert user["id"] == "user-uuid"
user = photon_auth.create_user(
"proj-id", "proj-secret",
phone_number="+15551234567",
)
assert user["assignedPhoneNumber"] == "+15559999999"
assert captured["auth"] == ("proj-id", "proj-secret")
assert captured["body"]["type"] == "shared"
assert captured["body"]["phoneNumber"] == "+15551234567"
assert captured["headers"]["Authorization"] == "Bearer tok"
assert "/projects/proj-id/spectrum/users" in captured["url"]
assert "/projects/proj-id/users/" in captured["url"]
def test_register_user_if_absent_dedup(monkeypatch: pytest.MonkeyPatch) -> None:
posted = {"n": 0}
def test_register_webhook_surfaces_secret(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_post(url: str, *, json: Dict[str, Any], auth: tuple, timeout: float) -> _FakeResponse:
return _FakeResponse(json_body={
"succeed": True,
"data": {
"id": "wh-uuid",
"webhookUrl": json["webhookUrl"],
"signingSecret": "0" * 64,
},
})
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body=[{
"id": "u1",
"phoneNumber": "+1 (555) 123-4567",
"assignedPhoneNumber": "+16282679185",
}])
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
posted["n"] += 1
return _FakeResponse(json_body={"success": True, "user": {}})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
# Same number, different formatting — should match and NOT create.
user, created = photon_auth.register_user_if_absent(
"tok", "proj", phone_number="+15551234567",
data = photon_auth.register_webhook(
"proj", "secret", webhook_url="https://x.example.com/hook",
)
assert created is False
assert user["id"] == "u1"
assert posted["n"] == 0
# The reused user carries the assigned iMessage line ("TEXTS ON").
assert photon_auth.user_assigned_line(user) == "+16282679185"
assert data["signingSecret"] == "0" * 64
assert data["webhookUrl"] == "https://x.example.com/hook"
def test_user_assigned_line() -> None:
assert (
photon_auth.user_assigned_line({"assignedPhoneNumber": "+16282679185"})
== "+16282679185"
)
# Own number present but no assignment yet (e.g. freshly created user).
assert photon_auth.user_assigned_line({"phoneNumber": "+15551234567"}) is None
assert photon_auth.user_assigned_line({"assignedPhoneNumber": ""}) is None
assert photon_auth.user_assigned_line({}) is None
assert photon_auth.user_assigned_line(None) is None
def test_register_user_if_absent_creates(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body=[])
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body={"success": True, "user": {"id": "u-new"}})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
user, created = photon_auth.register_user_if_absent(
"tok", "proj", phone_number="+15551234567",
)
assert created is True
assert user["id"] == "u-new"
# ---------------------------------------------------------------------------
# Lines (assigned number)
def test_get_imessage_line_returns_existing(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body=[
{"id": "l1", "platform": "imessage", "phoneNumber": "+15559999999", "status": "active"},
])
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
line = photon_auth.get_imessage_line("tok", "proj")
assert line is not None and line["phoneNumber"] == "+15559999999"
def test_get_imessage_line_provisions_when_missing(monkeypatch: pytest.MonkeyPatch) -> None:
added = {"n": 0}
def fake_get(url: str, **kwargs: Any) -> _FakeResponse:
return _FakeResponse(json_body=[])
def fake_post(url: str, **kwargs: Any) -> _FakeResponse:
added["n"] += 1
assert kwargs.get("json", {}).get("platform") == "imessage"
return _FakeResponse(json_body={"success": True, "line": {
"id": "l-new", "platform": "imessage", "phoneNumber": "+15558888888",
}})
monkeypatch.setattr(photon_auth.httpx, "get", fake_get)
monkeypatch.setattr(photon_auth.httpx, "post", fake_post)
line = photon_auth.get_imessage_line("tok", "proj")
assert added["n"] == 1
assert line["phoneNumber"] == "+15558888888"
# ---------------------------------------------------------------------------
# Credential summary (no secret leakage)
def test_credential_summary_no_secret_leak(
tmp_hermes_home: Path, monkeypatch: pytest.MonkeyPatch,
def test_persist_webhook_signing_secret_writes_env(
tmp_hermes_home: Path,
) -> None:
monkeypatch.setattr(photon_auth, "_persist_runtime_env", lambda *a, **k: None)
photon_auth.store_photon_token("token-aaaaaaaaaaaaaaaa")
photon_auth.store_project_credentials(
spectrum_project_id="sp-uuid",
project_secret="secret-bbbbbbbbbbb",
dashboard_project_id="dash-uuid",
"""The helper hands the secret to save_env_value, never returns it."""
summary: list = []
response = {
"id": "wh-uuid",
"webhookUrl": "https://x.example.com/hook",
"signingSecret": "ABCDEF1234567890" * 4,
}
ok = photon_auth.persist_webhook_signing_secret(
response, on_summary=summary.append,
)
assert ok is True
env_path = tmp_hermes_home / ".env"
assert env_path.exists()
env_text = env_path.read_text()
assert "PHOTON_WEBHOOK_SECRET=ABCDEF1234567890" in env_text
# The on_summary callback gets the redacted response + a saved-to path;
# none of those strings should leak the raw secret.
joined = "\n".join(summary)
assert "<redacted>" in joined
assert "ABCDEF1234567890" not in joined
def test_persist_webhook_signing_secret_no_secret_no_write(
tmp_hermes_home: Path,
) -> None:
summary: list = []
ok = photon_auth.persist_webhook_signing_secret(
{"id": "wh-uuid", "webhookUrl": "https://x"},
on_summary=summary.append,
)
assert ok is False
# No env file written; summary callback still received the redacted
# response (without a signingSecret key, nothing to redact).
assert not (tmp_hermes_home / ".env").exists()
def test_credential_summary_returns_only_display_strings(
tmp_hermes_home: Path,
) -> None:
"""credential_summary must not leak raw token/secret material."""
photon_auth.store_photon_token("token-aaaaaaaaaaaaaaaa")
photon_auth.store_project_credentials("proj-uuid", "secret-bbbbbbbbbbb")
summary = photon_auth.credential_summary()
blob = "\n".join(summary.values())
assert "token-aaaa" not in blob
assert "secret-bbbb" not in blob
assert summary["device_token"].startswith("")
assert summary["project_key"].startswith("")
assert summary["spectrum_project_id"] == "sp-uuid"
assert summary["dashboard_project_id"] == "dash-uuid"
assert summary["project_id"] == "proj-uuid"
def test_print_credential_summary_emits_only_display_strings(
tmp_hermes_home: Path,
) -> None:
"""The emit callback must never receive raw credential bytes."""
photon_auth.store_photon_token("token-aaaaaaaaaaaaaaaa")
photon_auth.store_project_credentials("proj-uuid", "secret-bbbbbbbbbbb")
lines: list = []
photon_auth.print_credential_summary(lines.append)
blob = "\n".join(lines)
assert "token-aaaa" not in blob
assert "secret-bbbb" not in blob
assert "✓ stored" in blob # device token line
assert "proj-uuid" in blob # project id is intentionally surfaced
# Header is always emitted
assert any("Photon iMessage status" in line for line in lines)
# ---------------------------------------------------------------------------

View File

@@ -1,15 +1,12 @@
"""Inbound dispatch + dedup tests for PhotonAdapter.
These bypass the loopback HTTP stream — they call ``_dispatch_inbound`` /
``_on_inbound_line`` / ``_is_duplicate`` directly, exercising the
sidecar-event parsing without spawning the Node sidecar or binding ports.
These tests bypass the aiohttp server — they call ``_dispatch_inbound``
and ``_is_duplicate`` directly. That keeps them fast and means we can
exercise the message-shape parsing logic without binding ports.
"""
from __future__ import annotations
import base64
import json
from pathlib import Path
from typing import Any, Dict, List
from typing import List
import pytest
@@ -19,39 +16,38 @@ from plugins.platforms.photon.adapter import PhotonAdapter
def _make_adapter(monkeypatch: pytest.MonkeyPatch) -> PhotonAdapter:
# Avoid touching real auth.json / env.
monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret")
monkeypatch.delenv("PHOTON_WEBHOOK_SECRET", raising=False)
cfg = PlatformConfig(enabled=True, token="", extra={})
return PhotonAdapter(cfg)
def _capture(adapter: PhotonAdapter, monkeypatch: pytest.MonkeyPatch) -> List[MessageEvent]:
@pytest.mark.asyncio
async def test_dispatch_text_dm(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured: List[MessageEvent] = []
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
return captured
def _dm_event(text: str, msg_id: str = "spc-msg-abc") -> Dict[str, Any]:
return {
"messageId": msg_id,
"platform": "iMessage",
"space": {"id": "+15551234567", "type": "dm", "phone": "+15551234567"},
"sender": {"id": "+15551234567"},
"content": {"type": "text", "text": text},
"timestamp": "2026-05-14T19:06:32.000Z",
payload = {
"event": "messages",
"space": {"id": "any;-;+15551234567", "platform": "iMessage"},
"message": {
"id": "spc-msg-abc",
"platform": "iMessage",
"direction": "inbound",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567", "platform": "iMessage"},
"space": {"id": "any;-;+15551234567", "platform": "iMessage"},
"content": {"type": "text", "text": "hello world"},
},
}
@pytest.mark.asyncio
async def test_dispatch_text_dm(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
await adapter._dispatch_inbound(_dm_event("hello world"))
await adapter._dispatch_inbound(payload)
assert len(captured) == 1
event = captured[0]
@@ -61,157 +57,70 @@ async def test_dispatch_text_dm(monkeypatch: pytest.MonkeyPatch) -> None:
src = event.source
assert src is not None
assert src.platform == Platform("photon")
assert src.chat_id == "+15551234567"
assert src.chat_id == "any;-;+15551234567"
assert src.chat_type == "dm"
assert src.user_id == "+15551234567"
@pytest.mark.asyncio
async def test_dispatch_group_type(monkeypatch: pytest.MonkeyPatch) -> None:
async def test_dispatch_group_id_detected(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
captured: List[MessageEvent] = []
event = {
"messageId": "spc-msg-grp",
"space": {"id": "group-guid-xyz", "type": "group", "phone": None},
"sender": {"id": "+15551234567"},
"content": {"type": "text", "text": "hi group"},
"timestamp": "2026-05-14T19:06:32.000Z",
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
payload = {
"event": "messages",
"space": {"id": "any;+;group-guid-xyz", "platform": "iMessage"},
"message": {
"id": "spc-msg-grp",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;+;group-guid-xyz"},
"content": {"type": "text", "text": "hi group"},
},
}
await adapter._dispatch_inbound(event)
await adapter._dispatch_inbound(payload)
assert captured[0].source.chat_type == "group"
# A real 1x1 transparent PNG (passes base.py's _looks_like_image magic check).
_PNG_1X1_B64 = (
"iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR42mNkYPhf"
"DwAChwGA60e6kgAAAABJRU5ErkJggg=="
)
@pytest.mark.asyncio
async def test_dispatch_attachment_surfaces_marker(
monkeypatch: pytest.MonkeyPatch,
) -> None:
adapter = _make_adapter(monkeypatch)
captured: List[MessageEvent] = []
async def fake_handle(event: MessageEvent) -> None:
captured.append(event)
def _attachment_event(
content: Dict[str, Any], msg_id: str = "spc-msg-att"
) -> Dict[str, Any]:
return {
"messageId": msg_id,
"space": {"id": "+15551234567", "type": "dm", "phone": "+15551234567"},
"sender": {"id": "+15551234567"},
"content": {"type": "attachment", **content},
"timestamp": "2026-05-14T19:06:32.000Z",
monkeypatch.setattr(adapter, "handle_message", fake_handle)
payload = {
"event": "messages",
"message": {
"id": "spc-msg-att",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;-;+15551234567"},
"content": {
"type": "attachment",
"name": "IMG_4127.HEIC",
"mimeType": "image/heic",
"size": 12345,
},
},
}
@pytest.mark.asyncio
async def test_dispatch_attachment_without_bytes_surfaces_marker(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""No inline ``data`` (over cap / failed sidecar read) -> text marker, no media."""
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
event = _attachment_event(
{"name": "IMG_4127.HEIC", "mimeType": "image/heic", "size": 12345}
)
await adapter._dispatch_inbound(event)
await adapter._dispatch_inbound(payload)
assert len(captured) == 1
ev = captured[0]
assert "Photon attachment received" in ev.text
assert "IMG_4127.HEIC" in ev.text
assert ev.message_type == MessageType.PHOTO
assert ev.media_urls == []
assert ev.media_types == []
@pytest.mark.asyncio
async def test_dispatch_attachment_downloads_image(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Inline base64 image bytes are decoded, cached, and exposed as media."""
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
raw = base64.b64decode(_PNG_1X1_B64)
event = _attachment_event(
{
"name": "photo.png",
"mimeType": "image/png",
"size": len(raw),
"data": _PNG_1X1_B64,
"encoding": "base64",
}
)
await adapter._dispatch_inbound(event)
assert len(captured) == 1
ev = captured[0]
assert ev.message_type == MessageType.PHOTO
assert ev.media_types == ["image/png"]
assert len(ev.media_urls) == 1
cached = Path(ev.media_urls[0])
try:
assert cached.is_file()
assert cached.read_bytes() == raw
assert ev.text == "(attachment)"
finally:
cached.unlink(missing_ok=True)
@pytest.mark.asyncio
async def test_dispatch_attachment_downloads_document(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Non-image attachments route through the document cache as DOCUMENT."""
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
raw = b"%PDF-1.4 hermes test document"
event = _attachment_event(
{
"name": "report.pdf",
"mimeType": "application/pdf",
"size": len(raw),
"data": base64.b64encode(raw).decode("ascii"),
"encoding": "base64",
}
)
await adapter._dispatch_inbound(event)
assert len(captured) == 1
ev = captured[0]
assert ev.message_type == MessageType.DOCUMENT
assert ev.media_types == ["application/pdf"]
assert len(ev.media_urls) == 1
cached = Path(ev.media_urls[0])
try:
assert cached.is_file()
assert cached.read_bytes() == raw
assert ev.text == "(attachment)"
finally:
cached.unlink(missing_ok=True)
@pytest.mark.asyncio
async def test_on_inbound_line_dispatches_and_dedups(
monkeypatch: pytest.MonkeyPatch,
) -> None:
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
line = json.dumps(_dm_event("ping", msg_id="dup-1"))
await adapter._on_inbound_line(line)
await adapter._on_inbound_line(line) # same messageId -> deduped
assert len(captured) == 1
assert captured[0].text == "ping"
@pytest.mark.asyncio
async def test_on_inbound_line_ignores_bad_json(monkeypatch: pytest.MonkeyPatch) -> None:
adapter = _make_adapter(monkeypatch)
captured = _capture(adapter, monkeypatch)
await adapter._on_inbound_line("{not json")
assert captured == []
event = captured[0]
# Attachment carries metadata marker; mime → MessageType.PHOTO.
assert "Photon attachment received" in event.text
assert "IMG_4127.HEIC" in event.text
assert event.message_type == MessageType.PHOTO
def test_is_duplicate_window(monkeypatch: pytest.MonkeyPatch) -> None:

View File

@@ -22,6 +22,7 @@ from plugins.platforms.photon.adapter import PhotonAdapter
def _make_adapter(monkeypatch: pytest.MonkeyPatch, extra: dict | None = None) -> PhotonAdapter:
monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret")
monkeypatch.delenv("PHOTON_WEBHOOK_SECRET", raising=False)
monkeypatch.delenv("PHOTON_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("PHOTON_MENTION_PATTERNS", raising=False)
cfg = PlatformConfig(enabled=True, token="", extra=extra or {})
@@ -30,21 +31,27 @@ def _make_adapter(monkeypatch: pytest.MonkeyPatch, extra: dict | None = None) ->
def _group_payload(text: str) -> dict:
return {
"messageId": f"grp-{abs(hash(text))}",
"space": {"id": "group-guid-xyz", "type": "group", "phone": None},
"sender": {"id": "+15551234567"},
"content": {"type": "text", "text": text},
"timestamp": "2026-05-14T19:06:32.000Z",
"event": "messages",
"message": {
"id": f"grp-{abs(hash(text))}",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;+;group-guid-xyz"},
"content": {"type": "text", "text": text},
},
}
def _dm_payload(text: str) -> dict:
return {
"messageId": f"dm-{abs(hash(text))}",
"space": {"id": "+15551234567", "type": "dm", "phone": "+15551234567"},
"sender": {"id": "+15551234567"},
"content": {"type": "text", "text": text},
"timestamp": "2026-05-14T19:06:32.000Z",
"event": "messages",
"message": {
"id": f"dm-{abs(hash(text))}",
"timestamp": "2026-05-14T19:06:32.000Z",
"sender": {"id": "+15551234567"},
"space": {"id": "any;-;+15551234567"},
"content": {"type": "text", "text": text},
},
}
@@ -119,6 +126,7 @@ def test_custom_mention_patterns_from_config(monkeypatch: pytest.MonkeyPatch) ->
def test_mention_patterns_env_comma_separated(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PHOTON_PROJECT_ID", "test-project-id")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "test-project-secret")
monkeypatch.delenv("PHOTON_WEBHOOK_SECRET", raising=False)
monkeypatch.setenv("PHOTON_REQUIRE_MENTION", "true")
monkeypatch.setenv("PHOTON_MENTION_PATTERNS", r"bot\b, assistant\b")
cfg = PlatformConfig(enabled=True, token="", extra={})

View File

@@ -1,69 +0,0 @@
"""Tests for `hermes photon setup`'s access auto-configuration.
`_autoconfigure_access` allowlists the operator and points the cron home
channel at their DM, writing to the per-test ~/.hermes/.env (the hermetic
HERMES_HOME fixture isolates this). It must fill only unset keys so a re-run
never clobbers a hand-tuned allowlist.
"""
from __future__ import annotations
import pytest
from hermes_cli.config import get_env_value, save_env_value
from plugins.platforms.photon.adapter import _env_enablement
from plugins.platforms.photon import cli
def test_autoconfigure_access_fills_unset(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.delenv("PHOTON_ALLOWED_USERS", raising=False)
monkeypatch.delenv("PHOTON_HOME_CHANNEL", raising=False)
cli._autoconfigure_access("+15551234567")
assert get_env_value("PHOTON_ALLOWED_USERS") == "+15551234567"
assert get_env_value("PHOTON_HOME_CHANNEL") == "+15551234567"
def test_autoconfigure_access_preserves_existing_allowlist(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.delenv("PHOTON_ALLOWED_USERS", raising=False)
monkeypatch.delenv("PHOTON_HOME_CHANNEL", raising=False)
# A hand-tuned allowlist already in place must survive a setup re-run.
save_env_value("PHOTON_ALLOWED_USERS", "+19998887777,+15551112222")
cli._autoconfigure_access("+15551234567")
assert get_env_value("PHOTON_ALLOWED_USERS") == "+19998887777,+15551112222"
# The still-unset home channel is filled.
assert get_env_value("PHOTON_HOME_CHANNEL") == "+15551234567"
def test_env_enablement_seeds_home_channel(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PHOTON_PROJECT_ID", "project_123")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "secret_123")
monkeypatch.setenv("PHOTON_HOME_CHANNEL", "+15551234567")
monkeypatch.setenv("PHOTON_HOME_CHANNEL_NAME", "Primary DM")
seed = _env_enablement()
assert seed is not None
assert seed["home_channel"] == {
"chat_id": "+15551234567",
"name": "Primary DM",
}
def test_env_enablement_home_channel_defaults_name(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("PHOTON_PROJECT_ID", "project_123")
monkeypatch.setenv("PHOTON_PROJECT_SECRET", "secret_123")
monkeypatch.setenv("PHOTON_HOME_CHANNEL", "+15551234567")
monkeypatch.delenv("PHOTON_HOME_CHANNEL_NAME", raising=False)
seed = _env_enablement()
assert seed is not None
assert seed["home_channel"] == {
"chat_id": "+15551234567",
"name": "Home",
}

View File

@@ -0,0 +1,95 @@
"""Signature verification tests for the Photon webhook receiver."""
from __future__ import annotations
import hashlib
import hmac
import time
import pytest
from plugins.platforms.photon.adapter import verify_signature
def _sign(secret: str, body: bytes, ts: int) -> str:
return "v0=" + hmac.new(
secret.encode(), f"v0:{ts}:".encode() + body, hashlib.sha256,
).hexdigest()
def test_accepts_valid_signature() -> None:
secret = "topsecret-32chars-or-whatever"
body = b'{"event":"messages"}'
ts = int(time.time())
sig = _sign(secret, body, ts)
assert verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret=secret,
)
def test_rejects_tampered_body() -> None:
secret = "s"
body = b'{"event":"messages"}'
ts = int(time.time())
sig = _sign(secret, body, ts)
assert not verify_signature(
body=body + b" tamper", timestamp_header=str(ts),
signature_header=sig, signing_secret=secret,
)
def test_rejects_wrong_secret() -> None:
body = b"x"
ts = int(time.time())
sig = _sign("right", body, ts)
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret="wrong",
)
def test_rejects_drifted_timestamp() -> None:
secret = "s"
body = b"x"
ts = int(time.time()) - 3600 # 1h old; drift window is 5 min
sig = _sign(secret, body, ts)
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=sig,
signing_secret=secret,
)
def test_rejects_missing_v0_prefix() -> None:
secret = "s"
body = b"x"
ts = int(time.time())
raw_hex = hmac.new(
secret.encode(), f"v0:{ts}:".encode() + body, hashlib.sha256,
).hexdigest()
# Strip the "v0=" prefix — verify_signature must reject.
assert not verify_signature(
body=body, timestamp_header=str(ts), signature_header=raw_hex,
signing_secret=secret,
)
def test_rejects_empty_inputs() -> None:
assert not verify_signature(
body=b"x", timestamp_header="", signature_header="v0=abc",
signing_secret="s",
)
assert not verify_signature(
body=b"x", timestamp_header="123", signature_header="",
signing_secret="s",
)
assert not verify_signature(
body=b"x", timestamp_header="123", signature_header="v0=abc",
signing_secret="",
)
def test_rejects_non_integer_timestamp() -> None:
assert not verify_signature(
body=b"x", timestamp_header="not-an-int",
signature_header="v0=abc", signing_secret="s",
)

View File

@@ -1,20 +0,0 @@
"""Parser-only tests for send_message targets.
These stay separate from ``test_send_message_tool.py`` because that module
skips wholesale when optional Telegram dependencies are not installed.
"""
from tools.send_message_tool import _parse_target_ref
def test_photon_e164_target_is_explicit() -> None:
chat_id, thread_id, is_explicit = _parse_target_ref("photon", "+15551234567")
assert chat_id == "+15551234567"
assert thread_id is None
assert is_explicit is True
def test_e164_target_still_requires_phone_platform() -> None:
assert _parse_target_ref("matrix", "+15551234567")[2] is False

View File

@@ -1199,11 +1199,6 @@ class TestParseTargetRefE164:
assert chat_id == "+15551234567"
assert is_explicit is True
def test_photon_e164_is_explicit(self):
chat_id, _, is_explicit = _parse_target_ref("photon", "+15551234567")
assert chat_id == "+15551234567"
assert is_explicit is True
def test_signal_bare_digits_still_work(self):
"""Bare digit strings continue to match the generic numeric branch."""
chat_id, _, is_explicit = _parse_target_ref("signal", "15551234567")

View File

@@ -38,7 +38,7 @@ _NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
# below and falls through to channel-name resolution, which has no way to
# resolve a raw phone number. Keeping the '+' preserves the E.164 form that
# downstream adapters (signal, etc.) expect.
_PHONE_PLATFORMS = frozenset({"photon", "signal", "sms", "whatsapp"})
_PHONE_PLATFORMS = frozenset({"signal", "sms", "whatsapp"})
_E164_TARGET_RE = re.compile(r"^\s*\+(\d{7,15})\s*$")
# Email addresses — a valid email like "user@domain.com" should be treated as
# an explicit target for the email platform, not fall through to channel-name

View File

@@ -22,30 +22,26 @@ your account.
## Architecture
Photon is a **persistent-connection** channel, like Discord or Slack —
**no webhook, no public URL, no signing secret to manage.**
Inbound messages arrive as **signed webhooks**: Photon POSTs JSON with
an `X-Spectrum-Signature` header to a URL you register, and Hermes'
aiohttp listener verifies the HMAC-SHA256 signature before dispatching
the event into the agent.
The `spectrum-ts` SDK holds a long-lived **gRPC stream** to Photon for
both directions. Because the SDK is TypeScript-only, Hermes runs it in a
small supervised **Node sidecar** and talks to it over loopback:
- **Inbound** — the sidecar consumes the SDK's `app.messages` gRPC
stream and forwards each message to the Python adapter over a loopback
`GET /inbound` (NDJSON). The adapter dedupes and dispatches it to the
agent, reconnecting automatically if the stream drops.
- **Outbound** — replies are loopback POSTs to the sidecar, which calls
`space.send(...)` on the SDK.
The Python plugin starts, supervises, and shuts down the sidecar
automatically.
Outbound replies go through a small supervised **Node sidecar** that
runs the `spectrum-ts` SDK on loopback. Photon does not currently
expose a public HTTP send-message endpoint — that's a roadmap item on
their side — so until then the sidecar is the only way to call
`Space.send(...)`. The Python plugin starts, supervises, and shuts
down the sidecar automatically. When Photon ships an HTTP send
endpoint we'll retire the sidecar in a follow-up release.
## Prerequisites
- A Photon account — sign up at [app.photon.codes][app]
- **Node.js 18.17 or newer** on PATH (`node --version`)
- A phone number that can receive iMessage (used to bind your account)
That's it — there is no public URL or tunnel to set up.
- A publicly reachable URL for the webhook receiver — Cloudflare
Tunnel, ngrok, or your own gateway hostname all work
## First-time setup
@@ -62,24 +58,17 @@ hermes gateway setup
hermes photon setup --phone +15551234567
```
The setup, in order:
The setup:
1. **Device login** (`client_id=photon-cli`) — opens
`https://app.photon.codes/` for approval and stores the bearer token.
2. **Finds or creates** the `Hermes Agent` project on your account.
3. **Enables Spectrum**, reads the project's Spectrum id, and rotates
the project secret.
4. **Registers your phone number** as a Spectrum user — skipped if a
user with that number already exists, so re-running is safe.
5. **Prints your assigned iMessage line** — the number you text to reach
your agent.
6. **Runs `npm install`** inside the plugin's sidecar directory.
1. Opens `https://app.photon.codes/` for device approval
2. Creates a Spectrum-enabled project under your account
3. Calls the Spectrum `create-user` endpoint with `type: shared` so
Photon allocates an iMessage line from the free pool
4. Runs `npm install` inside the plugin's sidecar directory
Runtime credentials are written to `~/.hermes/.env`
(`PHOTON_PROJECT_ID` = the Spectrum project id, `PHOTON_PROJECT_SECRET`),
the same place every other channel keeps its token. Management metadata
(device token, dashboard project id) lives in `~/.hermes/auth.json` under
`credential_pool.photon` / `credential_pool.photon_project`.
Credentials are stored in `~/.hermes/auth.json` under
`credential_pool.photon` (bearer token) and
`credential_pool.photon_project` (project id + secret).
## Authorizing users
@@ -142,6 +131,26 @@ Both keys also accept env vars (`PHOTON_REQUIRE_MENTION`,
`PHOTON_MENTION_PATTERNS`). This is the same mention-gating model the
BlueBubbles iMessage channel uses.
## Registering the webhook
Photon needs a public URL it can POST to. Expose your local listener
(default port 8788, path `/photon/webhook`) via Cloudflare Tunnel or
ngrok, then:
```bash
hermes photon webhook register https://YOUR-PUBLIC-URL/photon/webhook
```
The response includes a `signingSecret` — **Photon only returns it
once.** Save it to `~/.hermes/.env`:
```bash
PHOTON_WEBHOOK_SECRET=v0_64-char-hex...
```
The plugin verifies every inbound `POST` against this secret and
rejects deliveries with a timestamp drift greater than 5 minutes.
## Start the gateway
```bash
@@ -151,7 +160,7 @@ hermes gateway start --platform photon
You'll see something like:
```
[photon] connected — sidecar on 127.0.0.1:8789, streaming inbound over gRPC
[photon] connected — webhook at 0.0.0.0:8788/photon/webhook, sidecar on 127.0.0.1:8789
```
Send an iMessage to your assigned number and Hermes will reply.
@@ -168,9 +177,9 @@ Prints:
Photon iMessage status
──────────────────────
device token : ✓ stored
dashboard project : 3c90c3cc-0d44-4b50-...
spectrum project id : sp-...
project secret : ✓ stored
project id : 3c90c3cc-0d44-4b50-...
project key : ✓ stored
webhook key : ✓ set
node binary : /usr/bin/node
sidecar deps : ✓ installed
```
@@ -179,19 +188,27 @@ Common issues:
- **`sidecar deps : ✗ run hermes photon install-sidecar`** — Node is
installed but `spectrum-ts` isn't. Run the suggested command.
- **`device token : ✗ missing`** — run `hermes photon setup` to log in.
- **`No iMessage line assigned yet`** — Spectrum is enabled but no line
has been provisioned; re-run `hermes photon setup` or check the
[dashboard][app].
- **Sidecar won't start** — confirm `node --version` is 18.17+ and that
`hermes photon install-sidecar` completed without errors.
- **`webhook key : ⚠ unset — verification disabled`** — the
plugin will accept ANY POST to the webhook URL, which is unsafe.
Re-run `hermes photon webhook register` and store the secret.
- **`PHOTON_WEBHOOK_PORT` already in use** — set a different port via
`~/.hermes/.env`.
- **Webhook reachable from localhost but Photon can't deliver** —
Photon needs a public hostname. Cloudflare Tunnel is the easiest
free option.
## Webhook management
```bash
hermes photon webhook list # show registered hooks
hermes photon webhook delete <webhook-id> # remove one
```
## Limits today
- **Inbound attachments are metadata-only.** Inbound events carry the
filename + MIME type; the agent sees a marker but can't yet read the
bytes. The SDK exposes attachment bytes via `content.read()`, so this
is a sidecar follow-up.
- **Inbound attachments are metadata-only.** Inbound webhooks carry the
filename + MIME type but no download URL — Photon documents an
attachment retrieval endpoint as roadmap.
- **Outbound attachments are supported.** Hermes sends images, voice
notes, video, and documents through spectrum-ts' `attachment()` /
`voice()` content builders via the sidecar's `/send-attachment`
@@ -205,17 +222,22 @@ Common issues:
| Variable | Default | Notes |
|---------------------------|--------------------|--------------------------------------------|
| `PHOTON_PROJECT_ID` | from `.env` | Spectrum project id (the SDK's `projectId`); set by setup |
| `PHOTON_PROJECT_SECRET` | from `.env` | Project secret; set by setup |
| `PHOTON_SIDECAR_PORT` | `8789` | Loopback port for the sidecar control + inbound channel |
| `PHOTON_PROJECT_ID` | from `auth.json` | Set by `hermes photon setup` |
| `PHOTON_PROJECT_SECRET` | from `auth.json` | Set by `hermes photon setup` |
| `PHOTON_WEBHOOK_SECRET` | (unset) | From `hermes photon webhook register` |
| `PHOTON_WEBHOOK_PORT` | `8788` | Local port for the aiohttp listener |
| `PHOTON_WEBHOOK_PATH` | `/photon/webhook` | Path under which the listener mounts |
| `PHOTON_WEBHOOK_BIND` | `0.0.0.0` | Bind address for the listener |
| `PHOTON_SIDECAR_PORT` | `8789` | Loopback port for sidecar control |
| `PHOTON_SIDECAR_AUTOSTART`| `true` | Whether the adapter spawns the sidecar |
| `PHOTON_NODE_BIN` | `which node` | Override the Node binary path |
| `PHOTON_HOME_CHANNEL` | (unset) | Default space id for cron / notifications |
| `PHOTON_HOME_CHANNEL` | (unset) | Default space ID for cron / notifications |
| `PHOTON_HOME_CHANNEL_NAME`| (unset) | Human label for the home channel |
| `PHOTON_ALLOWED_USERS` | (unset) | Comma-separated E.164 allowlist |
| `PHOTON_ALLOW_ALL_USERS` | `false` | Dev only — accept any sender |
| `PHOTON_REQUIRE_MENTION` | `false` | Require a wake word before responding in groups |
| `PHOTON_MENTION_PATTERNS` | Hermes wake words | JSON list / comma / newline regex patterns for group mentions |
| `PHOTON_API_HOST` | `spectrum.photon.codes` | Override the Spectrum management API host |
| `PHOTON_DASHBOARD_HOST` | `app.photon.codes` | Override the dashboard / device-login host |
[photon]: https://photon.codes/

View File

@@ -54,11 +54,8 @@ SIMPLEX_HOME_CHANNEL=<contact-id>
| `SIMPLEX_WS_URL` | Yes | WebSocket URL of the simplex-chat daemon |
| `SIMPLEX_ALLOWED_USERS` | Recommended | Comma-separated allowlist. Each entry can be a numeric `contactId` **or** a display name — both forms work. |
| `SIMPLEX_ALLOW_ALL_USERS` | Optional | Set `true` to allow every contact (use carefully) |
| `SIMPLEX_AUTO_ACCEPT` | Optional | Auto-accept incoming contact requests (default: `true`) |
| `SIMPLEX_GROUP_ALLOWED` | Optional | Comma-separated group IDs the bot participates in, or `*` for any group. Omit to ignore group messages entirely |
| `SIMPLEX_HOME_CHANNEL` | Optional | Default contact/group ID for cron job delivery |
| `SIMPLEX_HOME_CHANNEL` | Optional | Default contact ID for cron job delivery |
| `SIMPLEX_HOME_CHANNEL_NAME` | Optional | Human label for the home channel |
| `HERMES_SIMPLEX_TEXT_BATCH_DELAY` | Optional | Quiet-period seconds (default: `0.8`) used to concatenate rapid-fire inbound text messages into one event |
## Find your contact ID or display name
@@ -71,37 +68,6 @@ By default **all contacts are denied**. You must either:
1. Set `SIMPLEX_ALLOWED_USERS` to a comma-separated list of `contactId`s and/or display names (e.g. `SIMPLEX_ALLOWED_USERS=4,alice` matches either contactId 4 or the contact whose display name is "alice"), or
2. Use **DM pairing** — send any message to the bot and it will reply with a pairing code. Enter that code via `hermes pairing approve simplex <CODE>`.
## Group chats
By default the adapter ignores group messages — a bot in a group otherwise
processes every member's traffic. Opt-in explicitly:
```
SIMPLEX_GROUP_ALLOWED=12,34 # specific group IDs
# or
SIMPLEX_GROUP_ALLOWED=* # any group the bot is in
```
Address groups by prefixing the chat ID with `group:`, e.g.
`simplex:group:12` in `send_message` or as a cron `deliver=` target.
## Attachments
The adapter supports native SimpleX attachments in both directions:
- **Inbound** — incoming images, voice notes, and files are accepted via
the daemon's XFTP flow (`rcvFileDescrReady``/freceive` → wait for
`rcvFileComplete`) and surfaced as `MessageEvent.media_urls` with the
appropriate `MessageType` (`PHOTO`, `VOICE`, `TEXT` + document).
- **Outbound** — `send_image_file`, `send_voice`, `send_document`, and
`send_video` all use the structured `/_send` form with `filePath`, so
the receiving SimpleX client renders images inline and plays voice
notes inline rather than offering them as downloads.
Agent replies can also embed `MEDIA:/path/to/file` tags in plain text —
the adapter strips the tag from the body and sends the file as either a
voice note (audio extensions) or a document.
## Using SimpleX with cron jobs
```python