Compare commits

...

6 Commits

Author SHA1 Message Date
Ben Barclay
f9af07e4bf docs(gateway): document multiplexing opt-in + contract changes
Extend the 'Running Many Gateways at Once' user-guide page with a
'one gateway for all profiles (multiplexing)' section, kept to a single page:

- How to opt in (gateway.multiplex_profiles on the default profile) and when to
  prefer it vs one-process-per-profile.
- Every contract change a user sees when the flag is on:
  1. secondary-profile 'gateway start' is a hard error (--force escape hatch),
  2. HTTP-inbound reached via /p/<profile>/ prefix; secondary profiles must NOT
     enable a port-binding platform (webhook/api_server/msgraph_webhook/feishu/
     wecom_callback/bluebubbles/sms) — config error at startup,
  3. per-credential platforms still need their own token per profile,
  4. session keys namespaced agent:<profile>: (default stays agent:main:),
  5. single PID/lock + aggregated hermes status, per-profile runtime_status.json.
- What does NOT change: per-profile .env credential isolation (stricter, incl.
  MCP/Kanban subprocess env), Kanban, profile-scoped skills/memory/SOUL, routing.

All inert when the flag is off.
2026-06-19 15:49:25 +10:00
Ben Barclay
5627099949 feat(gateway): multiplex phase 4 — lifecycle guard + per-profile observability
- _guard_named_profile_under_multiplexer: when the default gateway is running
  with gateway.multiplex_profiles=on, a named-profile 'hermes gateway run' hard
  -errors (pointing at the multiplexer) instead of double-binding that
  profile's platforms. Inert unless all hold: this invocation is a named
  profile, a default-profile gateway is alive, and its config has multiplexing
  on. --force overrides. Wired into run_gateway's guard chain.
- write_runtime_status gains served_profiles: the secondary-adapter startup
  records [active] + multiplexed profiles into runtime_status.json so
  'hermes status' can show per-profile coverage without a second probe. Absent
  for single-profile gateways.

Tests: served_profiles round-trips and is absent by default; guard is inert for
the default profile / under --force / when no default gateway is running.
2026-06-19 15:43:42 +10:00
Ben Barclay
f9f51b798f feat(gateway): multiplex phase 3 — secondary-profile adapter registry + conflict detection
Bring up adapters for every profile the gateway serves, not just the active
one. Keeps self.adapters as the default/active profile's map (the ~93 existing
self.adapters[...] sites are untouched) and adds secondary profiles under
self._profile_adapters[profile][platform].

- _start_secondary_profile_adapters loops profiles_to_serve(multiplex=True),
  skips the active profile (handled by the primary startup loop), and for each
  other profile loads its gateway config and creates+connects its enabled
  adapters under that profile's _profile_runtime_scope (home + secret scope).
- Each secondary adapter gets _make_profile_message_handler(profile): stamps
  source.profile (when unset) before delegating to the shared _handle_message,
  so the agent turn and session key resolve to that profile.
- Same-platform credential-conflict detection: _adapter_credential_fingerprint
  hashes the adapter's bot token (salted, truncated — never logs the token);
  two profiles claiming the same (platform, token) refuse the duplicate with a
  clear error naming both, since one token can't be polled twice.
- Port-binding hard-error: a SECONDARY profile that enables a port-binding
  platform (webhook, api_server, msgraph_webhook, feishu, wecom_callback,
  bluebubbles, sms) is a config error and aborts startup via MultiplexConfigError
  — the default profile owns the single shared HTTP listener and serves every
  profile through the /p/<profile>/ prefix, so a second bind can only collide.
  Distinct from a transient connect failure (which logs + stays alive to retry):
  a config error writes gateway_state=startup_failed and exits cleanly with an
  actionable message (names the profile, the platform, and the fix). There is no
  valid reason to bind a second port once you've opted into a multiplexer.
- Shutdown tears down secondary adapters alongside the primary ones.
- Defensive getattr guards keep partial-construction unit tests (stop(),
  _run_agent on bare instances) working.

No-op when multiplex_profiles is off (self._profile_adapters stays empty).

Tests: fingerprint stability/log-safety/distinctness, profile message-handler
stamping (and not overriding an already-stamped source), port-binding hard-error
raises + names the profile/platform, non-binding platform is not rejected, and
the guard set covers every TCP-binding adapter.
2026-06-19 15:43:42 +10:00
Ben Barclay
98b3cf2956 feat(gateway): multiplex phase 1 — HTTP-inbound /p/<profile>/ routing (webhook)
Serve webhook inbound for multiple profiles off the one shared listener via a
URL prefix, with no second port bound.

- SessionSource gains a 'profile' field (round-trips through to_dict/from_dict;
  omitted when unset so existing serialization is unchanged). It carries which
  profile an inbound message was routed to.
- WebhookAdapter registers /p/{profile}/webhooks/{route_name} alongside the
  existing /webhooks/{route_name}. _resolve_request_profile validates the
  prefix against profiles_to_serve(): None when absent or multiplexing is off
  (ignored, handled as default — no spurious 404), the profile name when valid,
  _PROFILE_REJECTED (→ 404) when the profile isn't served. The resolved profile
  is stamped onto the SessionSource.
- session-key namespacing and the per-turn home/credential scope now prefer
  source.profile: SessionStore._resolve_profile_for_key(source),
  _session_key_for_source fallback, and _resolve_profile_home_for_source all
  honor it (→ the agent turn resolves that profile's config/skills/credentials
  via the Phase 2 _profile_runtime_scope).

Constraint: routing inbound needs no per-profile platform credential, but the
agent still needs the routed profile's provider key — delivered by Phase 2's
secret scope. api_server (OpenAI-compatible surface) profile routing is a
focused follow-on; its source-construction path differs from webhook's.

Tests: SessionSource.profile round-trip + namespace drive; _resolve_request_
profile accept/reject/ignore matrix.
2026-06-18 15:56:13 +10:00
Ben Barclay
cdd708c6bf feat(gateway): multiplex phase 2 — fail-closed profile credential isolation (Workstream A)
The credential gate. When multiplexing is active, a profile's secrets resolve
from a context-local scope, never the process-global os.environ (which in a
multiplexer may hold another profile's keys, and is inherited by every
subprocess spawned with env=dict(os.environ)).

- agent/secret_scope.py: get_secret() backed by a secret-scope contextvar.
  FAIL-CLOSED: when multiplex is active and no scope is installed, an unscoped
  read RAISES UnscopedSecretError instead of falling back to os.environ — a
  missed/new call site crashes loudly at that line rather than leaking a
  cross-profile value. Genuinely-global vars (HERMES_*, PATH, kanban paths,
  …) keep reading os.environ via an allowlist. load_env_file/build_profile_
  secret_scope parse a profile .env into an isolated dict WITHOUT mutating
  os.environ. Off by default => transparent os.getenv behavior.
- hermes_cli/runtime_provider.py: all credential/provider/base-url reads go
  through _getenv -> get_secret.
- agent/credential_pool.py: env fallbacks route through get_secret (the
  ~/.hermes/.env-first preference is preserved and already profile-correct via
  the home override).
- tools/mcp_tool.py: MCP config  interpolation resolves through
  get_secret, so a server's  picks up the routed profile's value.
- gateway/run.py: set_multiplex_active() at GatewayRunner init; per-turn .env
  reload is a no-op for credentials in multiplex mode (secrets come from the
  scope, not global env); _profile_runtime_scope context manager combines the
  HERMES_HOME override + secret scope; _run_agent wraps _run_agent_inner in
  that scope (resolved via _resolve_profile_home_for_source) when multiplexing.

Propagates into the agent worker thread for free via the existing
copy_context() in _run_in_executor_with_context.

Tests: 13 unit (fail-closed, scope isolation, global allowlist, .env parsing
without environ mutation) + 7 E2E (runtime_provider + MCP interpolation prove
two profiles isolated, unscoped read raises, globals still read environ).
2026-06-18 15:51:33 +10:00
Ben Barclay
b8b24c3ba5 feat(gateway): multiplex phase 0 — config flag, profile enumeration, profile-stamped session keys
Foundations for serving multiple profiles from one gateway process, inert
when off:

- gateway.multiplex_profiles config flag (default false), round-trips through
  GatewayConfig and load_gateway_config (top-level + nested gateway.* form).
- hermes_cli.profiles.profiles_to_serve(multiplex): the single chokepoint for
  which (profile, HERMES_HOME) pairs the gateway serves. Lightweight dir scan;
  active-profile-only when off, default + all named profiles when on.
- build_session_key gains a profile= namespace slot. Default/None reuse the
  historical 'agent:main:...' literal BYTE-IDENTICALLY (no session migration,
  positional parsers unaffected); a named profile becomes 'agent:<profile>:...'
  so two profiles on the same platform/chat never collide.
- SessionStore._resolve_profile_for_key + _session_key_for_source fallback
  resolve the namespace from the flag (legacy when off, active profile when on).

Tests: byte-identical-when-off (parametrized), namespace isolation, positional
layout preserved, config round-trip, profiles_to_serve enumeration.
2026-06-18 15:50:57 +10:00
19 changed files with 1730 additions and 42 deletions

View File

@@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import OPENROUTER_BASE_URL
from hermes_cli.config import load_env
from agent.secret_scope import get_secret as _get_secret
from agent.credential_persistence import (
is_borrowed_credential_source,
sanitize_borrowed_credential_payload,
@@ -1666,7 +1667,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
_env_file = load_env()
def _env_val(key: str) -> str:
return (_env_file.get(key) or os.environ.get(key) or "").strip()
return (_env_file.get(key) or _get_secret(key, "") or "").strip()
anthropic_api_key = _env_val("ANTHROPIC_API_KEY")
anthropic_oauth_env = (
@@ -1952,7 +1953,7 @@ def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool
# changes to the .env file.
def _get_env_prefer_dotenv(key: str) -> str:
env_file = load_env()
val = env_file.get(key) or os.environ.get(key) or ""
val = env_file.get(key) or _get_secret(key, "") or ""
return val.strip()
# Honour user suppression — `hermes auth remove <provider> <N>` for an

205
agent/secret_scope.py Normal file
View File

@@ -0,0 +1,205 @@
"""Profile-scoped credential resolution for multi-profile gateway multiplexing.
The multiplexing gateway serves many profiles from one process. Each profile
has its own ``.env`` with its own provider keys and platform tokens, so we
**cannot** union them into the process-global ``os.environ`` (that would leak
profile A's keys to profile B's turns, and to every subprocess spawned with
``env=dict(os.environ)``).
This module provides a fail-closed, context-local secret scope:
- ``set_secret_scope(mapping)`` installs the active profile's secrets for the
current task (a contextvar, so it propagates into the agent's worker thread
via ``copy_context()`` exactly like the HERMES_HOME override).
- ``get_secret(name)`` reads from that scope. When multiplexing is **active**
and no scope is set, it RAISES rather than silently falling back to
``os.environ`` — an un-migrated or newly-added call site fails loud at that
exact line instead of leaking another profile's value. When multiplexing is
**off** (the default), it transparently reads ``os.environ`` so the
single-profile gateway and every non-gateway caller behave exactly as before.
Design rationale lives in ``docs/design/multiplexing-gateway.md`` (Workstream A).
"""
from __future__ import annotations
import os
from contextvars import ContextVar, Token
from pathlib import Path
from typing import Dict, Mapping, Optional
# ── multiplex-active flag ────────────────────────────────────────────────
# Process-global: set once at gateway startup when gateway.multiplex_profiles
# is true. Governs whether get_secret() fails closed on an unscoped read.
# A plain module global (not a contextvar): it describes the deployment mode,
# not a per-task value.
_MULTIPLEX_ACTIVE: bool = False
def set_multiplex_active(active: bool) -> None:
"""Mark whether the process is running as a profile multiplexer.
Called once at gateway startup. When True, ``get_secret`` fails closed on
an unscoped read instead of falling back to ``os.environ``.
"""
global _MULTIPLEX_ACTIVE
_MULTIPLEX_ACTIVE = bool(active)
def is_multiplex_active() -> bool:
"""Return whether the process is running as a profile multiplexer."""
return _MULTIPLEX_ACTIVE
# ── the secret scope contextvar ──────────────────────────────────────────
_SECRET_SCOPE: ContextVar[Optional[Mapping[str, str]]] = ContextVar(
"_SECRET_SCOPE", default=None
)
class UnscopedSecretError(RuntimeError):
"""Raised when a secret is read in multiplex mode with no scope installed.
This is the fail-closed signal: it means a credential read reached
``get_secret`` without a profile scope active, which in a multiplexer would
otherwise leak whichever profile's value happened to be in ``os.environ``.
The fix is to wrap the call path in ``set_secret_scope(...)`` (the per-turn
/ per-adapter profile scope), not to widen the allowlist.
"""
def set_secret_scope(secrets: Optional[Mapping[str, str]]) -> Token:
"""Install the active profile's secret mapping for the current context.
Returns a token for ``reset_secret_scope``. Pass ``None`` to clear.
"""
return _SECRET_SCOPE.set(secrets)
def reset_secret_scope(token: Token) -> None:
"""Restore the previous secret scope."""
_SECRET_SCOPE.reset(token)
def current_secret_scope() -> Optional[Mapping[str, str]]:
"""Return the active secret mapping, or None when no scope is installed."""
return _SECRET_SCOPE.get()
# ── genuinely-global env vars (NOT per-profile secrets) ──────────────────
# These are process/deployment-level settings, not profile credentials. They
# legitimately live in os.environ and must keep reading from it even in
# multiplex mode — routing them through the fail-closed path would wrongly
# crash. Anything matching is read from os.environ regardless of scope.
#
# Membership test is by exact name OR prefix (see _is_global_env). Keep this
# list tight: when in doubt a value is a profile secret, not a global.
_GLOBAL_ENV_EXACT = frozenset({
# Hermes runtime / deployment
"HERMES_HOME", "HERMES_PROFILE", "HERMES_GATEWAY_LOCK_DIR",
"HERMES_MAX_ITERATIONS", "HERMES_MAX_TOKENS", "HERMES_API_TIMEOUT",
"HERMES_REDACT_SECRETS", "HERMES_NOUS_TIMEOUT_SECONDS",
"_HERMES_GATEWAY",
# OS / interpreter
"PATH", "HOME", "USER", "LANG", "LC_ALL", "TZ", "PWD", "SHELL", "TMPDIR",
"VIRTUAL_ENV", "PYTHONPATH", "SSL_CERT_FILE",
# Kanban paths (per-board, not per-profile-secret)
"HERMES_KANBAN_DB", "HERMES_KANBAN_WORKSPACES_ROOT", "HERMES_KANBAN_BOARD",
})
_GLOBAL_ENV_PREFIXES = (
"HERMES_KANBAN_",
"HERMES_TELEGRAM_", # tuning knobs (batch delays, fallback toggles) — NOT the token
"TERMINAL_", # terminal/sandbox backend settings
)
def _is_global_env(name: str) -> bool:
"""Return True for genuinely process-global (non-profile-secret) env vars."""
if name in _GLOBAL_ENV_EXACT:
return True
return any(name.startswith(p) for p in _GLOBAL_ENV_PREFIXES)
def get_secret(name: str, default: Optional[str] = None) -> Optional[str]:
"""Resolve a credential by env-var name, honoring the active profile scope.
Resolution order:
1. Genuinely-global vars (``_is_global_env``) always read ``os.environ`` —
they are deployment settings, not profile secrets.
2. When a secret scope is installed (multiplexed turn), read from it; an
absent key returns ``default``. The scope is authoritative — we do NOT
fall through to ``os.environ``, because in a multiplexer ``os.environ``
may hold another profile's value.
3. No scope installed:
- multiplex INACTIVE (default deployment): read ``os.environ`` —
identical to the legacy ``os.getenv`` behavior every caller had before.
- multiplex ACTIVE: FAIL CLOSED. Raise ``UnscopedSecretError`` so the
missing scope is caught loudly instead of leaking a cross-profile value.
"""
if _is_global_env(name):
val = os.environ.get(name)
return val if val is not None else default
scope = _SECRET_SCOPE.get()
if scope is not None:
val = scope.get(name)
return val if val is not None else default
if _MULTIPLEX_ACTIVE:
raise UnscopedSecretError(
f"get_secret({name!r}) called with no profile secret scope active "
f"while multiplexing is on. This credential read must run inside a "
f"set_secret_scope(...) block (the per-turn / per-adapter profile "
f"scope). Reading os.environ here would risk leaking another "
f"profile's value. See docs/design/multiplexing-gateway.md "
f"(Workstream A)."
)
val = os.environ.get(name)
return val if val is not None else default
def load_env_file(env_path: Path) -> Dict[str, str]:
"""Parse a ``.env`` file into a plain dict WITHOUT touching ``os.environ``.
Used to load a profile's secrets into an isolated mapping for
``set_secret_scope``. Mirrors python-dotenv's basic parsing (KEY=VALUE,
``export`` prefix, ``#`` comments, optional matching quotes) but never
mutates the process environment — that isolation is the whole point.
"""
secrets: Dict[str, str] = {}
try:
text = env_path.read_text(encoding="utf-8")
except (FileNotFoundError, OSError, UnicodeDecodeError):
return secrets
for raw in text.splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
line = line[len("export "):].lstrip()
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
if not key:
continue
value = value.strip()
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
value = value[1:-1]
secrets[key] = value
return secrets
def build_profile_secret_scope(hermes_home: Path) -> Dict[str, str]:
"""Build a profile's secret mapping from its ``<home>/.env``.
Returns a fresh dict (safe to install via ``set_secret_scope``). Genuinely
global vars are intentionally NOT copied in — ``get_secret`` reads those
from ``os.environ`` directly, so the scope holds only profile secrets.
"""
return load_env_file(Path(hermes_home) / ".env")

View File

@@ -545,6 +545,13 @@ class GatewayConfig:
thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants
max_concurrent_sessions: Optional[int] = None # Positive int caps simultaneous active chat sessions
# Multi-profile multiplexing (opt-in; default off preserves one-gateway-per-profile).
# When True, the default profile's gateway serves inbound messages for every
# profile on the host: profiles are stamped into session keys and (in later
# phases) per-profile adapters/credentials are resolved. When False, the
# gateway behaves exactly as before — single HERMES_HOME, no profile stamping.
multiplex_profiles: bool = False
# Unauthorized DM policy
unauthorized_dm_behavior: str = "pair" # "pair" or "ignore"
@@ -650,6 +657,7 @@ class GatewayConfig:
"group_sessions_per_user": self.group_sessions_per_user,
"thread_sessions_per_user": self.thread_sessions_per_user,
"max_concurrent_sessions": self.max_concurrent_sessions,
"multiplex_profiles": self.multiplex_profiles,
"unauthorized_dm_behavior": self.unauthorized_dm_behavior,
"streaming": self.streaming.to_dict(),
"session_store_max_age_days": self.session_store_max_age_days,
@@ -695,7 +703,12 @@ class GatewayConfig:
group_sessions_per_user = data.get("group_sessions_per_user")
thread_sessions_per_user = data.get("thread_sessions_per_user")
multiplex_profiles = data.get("multiplex_profiles")
nested_gateway = data.get("gateway") if isinstance(data.get("gateway"), dict) else {}
if multiplex_profiles is None and isinstance(nested_gateway, dict):
# Also honor gateway.multiplex_profiles written by
# ``hermes config set gateway.multiplex_profiles true``.
multiplex_profiles = nested_gateway.get("multiplex_profiles")
if "max_concurrent_sessions" in data:
max_concurrent_raw = data.get("max_concurrent_sessions")
max_concurrent_key = "max_concurrent_sessions"
@@ -732,6 +745,7 @@ class GatewayConfig:
stt_enabled=_coerce_bool(stt_enabled, True),
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False),
multiplex_profiles=_coerce_bool(multiplex_profiles, False),
max_concurrent_sessions=max_concurrent_sessions,
unauthorized_dm_behavior=unauthorized_dm_behavior,
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
@@ -823,6 +837,13 @@ def load_gateway_config() -> GatewayConfig:
if "thread_sessions_per_user" in yaml_cfg:
gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"]
# Multiplexing flag: accept both the top-level key and the nested
# gateway.multiplex_profiles form (from_dict resolves the nested
# fallback, but surface the top-level key here for parity with the
# other session-scope flags above).
if "multiplex_profiles" in yaml_cfg:
gw_data["multiplex_profiles"] = yaml_cfg["multiplex_profiles"]
gateway_section = yaml_cfg.get("gateway")
if isinstance(gateway_section, dict) and "max_concurrent_sessions" in gateway_section:
gw_data["max_concurrent_sessions"] = gateway_section["max_concurrent_sessions"]

View File

@@ -57,6 +57,11 @@ from gateway.platforms.base import (
logger = logging.getLogger(__name__)
# Sentinel returned by _resolve_request_profile when a /p/<profile>/ prefix
# names a profile this gateway does not serve (→ 404). Distinct from None
# (no prefix / multiplexing off → handle as the default profile).
_PROFILE_REJECTED = object()
_BUILTIN_DELIVER_PLATFORMS = {
"telegram", "discord", "slack", "signal", "sms", "whatsapp",
"matrix", "mattermost", "homeassistant", "email", "dingtalk",
@@ -189,6 +194,14 @@ class WebhookAdapter(BasePlatformAdapter):
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
# Multi-profile multiplexing: a /p/<profile>/webhooks/<route> prefix
# routes the inbound event to that profile. Same handler; the profile is
# captured from the path and stamped onto the SessionSource so the agent
# turn resolves that profile's config/skills/credentials. Only honored
# when gateway.multiplex_profiles is on (the handler validates).
app.router.add_post(
"/p/{profile}/webhooks/{route_name}", self._handle_webhook
)
# Port conflict detection — fail fast if port is already in use
import socket as _socket
@@ -397,6 +410,35 @@ class WebhookAdapter(BasePlatformAdapter):
except Exception as e:
logger.error("[webhook] Failed to reload dynamic routes: %s", e)
def _resolve_request_profile(self, request: "web.Request"):
"""Resolve + validate the /p/<profile>/ URL prefix on a webhook request.
Returns:
- ``None`` when no profile prefix is present, or multiplexing is off
(the prefix is ignored, request handled as the default profile).
- the profile name (str) when present, multiplexing is on, and the
profile is one this gateway serves.
- ``_PROFILE_REJECTED`` when a prefix is present but the profile is
unknown/unconfigured (handler returns 404).
"""
profile = (request.match_info.get("profile") or "").strip()
if not profile:
return None
runner = self.gateway_runner
cfg = getattr(runner, "config", None)
if not getattr(cfg, "multiplex_profiles", False):
# Prefix supplied but multiplexing is off — ignore it, behave as
# the single-profile gateway (don't 404 a would-be valid route).
return None
try:
from hermes_cli.profiles import profiles_to_serve
served = {name for name, _ in profiles_to_serve(multiplex=True)}
except Exception:
return _PROFILE_REJECTED
if profile not in served:
return _PROFILE_REJECTED
return profile
async def _handle_webhook(self, request: "web.Request") -> "web.Response":
"""POST /webhooks/{route_name} — receive and process a webhook event."""
# Hot-reload dynamic subscriptions on each request (mtime-gated, cheap)
@@ -405,6 +447,13 @@ class WebhookAdapter(BasePlatformAdapter):
route_name = request.match_info.get("route_name", "")
route_config = self._routes.get(route_name)
# Multi-profile: resolve + validate the /p/<profile>/ prefix if present.
profile = self._resolve_request_profile(request)
if profile is _PROFILE_REJECTED:
return web.json_response(
{"error": "Unknown or unconfigured profile"}, status=404
)
if not route_config:
return web.json_response(
{"error": f"Unknown route: {route_name}"}, status=404
@@ -641,6 +690,8 @@ class WebhookAdapter(BasePlatformAdapter):
user_id=f"webhook:{route_name}",
user_name=route_name,
)
if profile and isinstance(profile, str):
source.profile = profile
event = MessageEvent(
text=prompt,
message_type=MessageType.TEXT,

View File

@@ -1173,13 +1173,31 @@ def _reload_runtime_env_preserving_config_authority() -> None:
pick up rotated API keys. config.yaml remains authoritative for agent budget
settings such as agent.max_turns; otherwise a stale HERMES_MAX_ITERATIONS in
.env can replace the startup bridge on later turns.
In multiplex mode this is a NO-OP for the credential reload: secrets come
from the per-turn ``set_secret_scope`` (installed by ``_profile_runtime_scope``)
which loads the routed profile's ``.env`` into an isolated mapping. Mutating
the process-global ``os.environ`` here would defeat that isolation and leak
the default profile's keys to every profile's turns and subprocesses.
"""
from agent.secret_scope import is_multiplex_active
if is_multiplex_active():
# Credentials are resolved from the active profile's secret scope, not
# os.environ. Still honor config.yaml's agent.max_turns bridge below
# using the scoped home, but never reload .env into global env.
_bridge_max_turns_from_config(_hermes_home)
return
load_hermes_dotenv(
hermes_home=_hermes_home,
project_env=Path(__file__).resolve().parents[1] / '.env',
)
_bridge_max_turns_from_config(_hermes_home)
config_path = _hermes_home / 'config.yaml'
def _bridge_max_turns_from_config(home: "Path") -> None:
"""Bridge config.yaml agent.max_turns into HERMES_MAX_ITERATIONS (a global)."""
config_path = home / 'config.yaml'
if not config_path.exists():
return
try:
@@ -1196,6 +1214,71 @@ def _reload_runtime_env_preserving_config_authority() -> None:
os.environ["HERMES_MAX_ITERATIONS"] = str(agent_cfg["max_turns"])
from contextlib import contextmanager as _contextmanager
# Platforms that bind a host TCP port (HTTP/webhook listeners). In a profile
# multiplexer the default profile owns the single shared listener and serves
# every profile through the /p/<profile>/ URL prefix, so a SECONDARY profile
# enabling one of these is always a misconfiguration: it would try to bind a
# port already held by the default's listener. We hard-error on it rather than
# silently dropping the adapter (see _start_one_profile_adapters).
# Stored as platform .value strings since the Platform enum is imported below.
_PORT_BINDING_PLATFORM_VALUES = frozenset({
"webhook",
"api_server",
"msgraph_webhook",
"feishu",
"wecom_callback",
"bluebubbles",
"sms",
})
class MultiplexConfigError(RuntimeError):
"""A profile multiplexer config is invalid (fail-fast at startup).
Distinct from a transient adapter-connect failure: a transient error is
logged and the gateway stays alive to retry, but a config error means the
operator must fix config.yaml, so it aborts startup cleanly.
"""
@_contextmanager
def _profile_runtime_scope(profile_home: "Path"):
"""Scope config/skills/memory AND credentials to a profile for one turn.
Combines the two seams the multiplexer needs:
1. ``set_hermes_home_override`` — redirects ``get_hermes_home()`` (config,
skills, memory, SOUL, sessions) to the profile's home. Contextvar, so
it propagates into the agent worker thread via ``copy_context()``.
2. ``set_secret_scope`` — installs the profile's ``.env`` secrets as the
authoritative credential source, so ``get_secret`` reads this profile's
keys and never the process-global ``os.environ`` (which in a
multiplexer may hold another profile's values).
Only used on the multiplexed inbound path. Single-profile gateways never
enter this scope, so their behavior is unchanged. Loading the profile's
``.env`` here does NOT mutate ``os.environ`` — ``build_profile_secret_scope``
returns an isolated dict — which is what keeps subprocesses (MCP, kanban)
from inheriting cross-profile secrets.
"""
from hermes_constants import set_hermes_home_override, reset_hermes_home_override
from agent.secret_scope import (
build_profile_secret_scope,
set_secret_scope,
reset_secret_scope,
)
home_token = set_hermes_home_override(str(profile_home))
secret_token = set_secret_scope(build_profile_secret_scope(Path(profile_home)))
try:
yield
finally:
reset_secret_scope(secret_token)
reset_hermes_home_override(home_token)
_DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P<host>.+):(?P<container>/[^:]+?)(?::(?P<options>[^:]+))?$")
_DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"}
@@ -2240,7 +2323,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
def __init__(self, config: Optional[GatewayConfig] = None):
global _gateway_runner_ref
self.config = config or load_gateway_config()
# Mark the process as a profile multiplexer when configured. This flips
# agent.secret_scope.get_secret() to fail-closed on any unscoped
# credential read, so a missed migration crashes loudly instead of
# leaking a cross-profile value (Workstream A). Inert when off.
try:
from agent.secret_scope import set_multiplex_active
set_multiplex_active(bool(getattr(self.config, "multiplex_profiles", False)))
except Exception:
logger.debug("could not set multiplex-active flag", exc_info=True)
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
# Multi-profile multiplexing: adapters for NON-default profiles live
# here, keyed by profile name then Platform. self.adapters stays the
# default/active profile's map so the ~93 existing self.adapters[...]
# sites are untouched when multiplexing is off (this dict is empty).
# Populated by _start_secondary_profile_adapters().
self._profile_adapters: Dict[str, Dict[Platform, BasePlatformAdapter]] = {}
self._warn_if_docker_media_delivery_is_risky()
_gateway_runner_ref = _weakref.ref(self)
@@ -2792,10 +2890,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
config = getattr(self, "config", None)
# Mirror SessionStore._resolve_profile_for_key so this fallback path
# produces the same namespace as the primary path: None (legacy
# agent:main) unless multiplexing is on, then the active profile.
_profile = None
if getattr(config, "multiplex_profiles", False):
if source.profile:
_profile = source.profile
else:
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name() or "default"
except Exception:
_profile = None
return build_session_key(
source,
group_sessions_per_user=getattr(config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False),
profile=_profile,
)
def _telegram_topic_mode_enabled(self, source: SessionSource) -> bool:
@@ -5324,7 +5436,30 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"attempts": 1,
"next_retry": time.monotonic() + 30,
}
# Multi-profile multiplexing: bring up adapters for every OTHER profile
# this gateway serves. Each profile's adapters connect under that
# profile's home + credential scope and stamp their inbound events with
# the profile so the agent turn resolves correctly. No-op when off.
try:
_secondary_connected = await self._start_secondary_profile_adapters()
connected_count += _secondary_connected
except MultiplexConfigError as e:
# Invalid multiplexer config — abort startup cleanly so the operator
# fixes config.yaml rather than running a half-wired gateway.
reason = str(e)
logger.error("Gateway multiplexer config error: %s", reason)
try:
from gateway.status import write_runtime_status
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
except Exception:
pass
self._request_clean_exit(reason)
self._startup_restore_in_progress = False
return True
except Exception as e:
logger.error("Secondary-profile adapter startup failed: %s", e, exc_info=True)
if connected_count == 0:
if startup_nonretryable_errors:
reason = "; ".join(startup_nonretryable_errors)
@@ -6331,6 +6466,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
time.monotonic() - _adapter_started_at,
e,
)
# Disconnect secondary-profile adapters (multiplex mode).
for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()):
for platform, adapter in list(_amap.items()):
try:
await adapter.cancel_background_tasks()
except Exception as e:
logger.debug("%s bg-cancel error (profile %s): %s", platform.value, _prof, e)
try:
await adapter.disconnect()
logger.info("%s disconnected (profile: %s)", platform.value, _prof)
except Exception as e:
logger.error("%s disconnect error (profile %s): %s", platform.value, _prof, e)
_amap.clear()
if hasattr(self, "_profile_adapters"):
self._profile_adapters.clear()
logger.info(
"Shutdown phase: all adapters disconnected at +%.2fs",
_phase_elapsed(),
@@ -6500,6 +6651,175 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"""Wait for shutdown signal."""
await self._shutdown_event.wait()
async def _start_secondary_profile_adapters(self) -> int:
"""Bring up adapters for every non-active profile this gateway serves.
Returns the number of secondary adapters that connected. No-op (returns
0) unless ``gateway.multiplex_profiles`` is on.
Each profile's adapters are created and connected under that profile's
HERMES_HOME + secret scope (``_profile_runtime_scope``), stored in
``self._profile_adapters[profile]``, and given a message handler that
stamps ``source.profile`` before delegating to the shared
``_handle_message`` — so the agent turn resolves that profile's config,
skills, and credentials. Same-platform credential collisions (two
profiles polling the same bot token) are detected and refused here, the
only point that sees every profile's resolved credentials together.
"""
if not getattr(self.config, "multiplex_profiles", False):
return 0
try:
from hermes_cli.profiles import profiles_to_serve, get_active_profile_name
except Exception:
return 0
active = get_active_profile_name() or "default"
connected = 0
# (platform, token-fingerprint) -> profile that claimed it. Detects two
# profiles trying to poll the same bot credential (impossible to do
# concurrently). Seed with the active profile's adapters.
claimed: Dict[tuple, str] = {}
for _plat, _ad in self.adapters.items():
fp = self._adapter_credential_fingerprint(_ad)
if fp is not None:
claimed[(_plat, fp)] = active
for profile_name, profile_home in profiles_to_serve(multiplex=True):
if profile_name == active:
continue # handled by the primary startup loop
try:
connected += await self._start_one_profile_adapters(
profile_name, profile_home, claimed
)
except MultiplexConfigError:
# Config error (e.g. a secondary profile binding a port) is not
# transient — propagate so startup aborts cleanly instead of
# limping along with a half-configured multiplexer.
raise
except Exception as e:
logger.error(
"Failed to start adapters for profile '%s': %s",
profile_name, e, exc_info=True,
)
# Record served profiles in runtime status for `hermes status`.
try:
from gateway.status import write_runtime_status
served = [active] + sorted(self._profile_adapters.keys())
write_runtime_status(served_profiles=served)
except Exception:
logger.debug("could not record served_profiles", exc_info=True)
return connected
async def _start_one_profile_adapters(
self, profile_name: str, profile_home: "Path", claimed: Dict[tuple, str]
) -> int:
"""Create+connect one profile's adapters under its runtime scope."""
from gateway.config import load_gateway_config
with _profile_runtime_scope(profile_home):
profile_cfg = load_gateway_config()
profile_map = self._profile_adapters.setdefault(profile_name, {})
connected = 0
for platform, platform_config in profile_cfg.platforms.items():
if not platform_config.enabled:
continue
# A secondary profile must NOT enable a port-binding platform: the
# default profile's listener already serves every profile via the
# /p/<profile>/ prefix, so a second bind can only collide. This is a
# config error, not a transient failure — fail fast and loud.
if platform.value in _PORT_BINDING_PLATFORM_VALUES:
raise MultiplexConfigError(
f"Profile '{profile_name}' enables the port-binding platform "
f"'{platform.value}', but gateway.multiplex_profiles is on. The "
f"default profile owns the single shared HTTP listener and "
f"serves every profile through the /p/{profile_name}/ URL "
f"prefix — a secondary profile cannot bind its own port. "
f"Remove platforms.{platform.value} from profile "
f"'{profile_name}'s config.yaml (configure it only on the "
f"default profile)."
)
with _profile_runtime_scope(profile_home):
adapter = self._create_adapter(platform, platform_config)
if not adapter:
continue
# Same-token conflict detection — refuse a duplicate poll.
fp = self._adapter_credential_fingerprint(adapter)
if fp is not None:
owner = claimed.get((platform, fp))
if owner is not None:
logger.error(
"Profile '%s' and '%s' both configure %s with the same "
"credential — refusing to start the duplicate (a single "
"bot token cannot be polled twice). Give each profile its "
"own %s credential.",
owner, profile_name, platform.value, platform.value,
)
await self._safe_adapter_disconnect(adapter, platform)
continue
claimed[(platform, fp)] = profile_name
# Stamp every inbound event from this adapter with its profile so
# the agent turn (and session key) resolve to the right home.
adapter.set_message_handler(
self._make_profile_message_handler(profile_name)
)
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
adapter.set_session_store(self.session_store)
adapter.set_busy_session_handler(self._handle_active_session_busy_message)
adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id)
adapter._busy_text_mode = self._busy_text_mode
try:
with _profile_runtime_scope(profile_home):
success = await self._connect_adapter_with_timeout(adapter, platform)
if success:
profile_map[platform] = adapter
connected += 1
logger.info("%s connected (profile: %s)", platform.value, profile_name)
else:
logger.warning("%s failed to connect (profile: %s)", platform.value, profile_name)
await self._safe_adapter_disconnect(adapter, platform)
except Exception as e:
logger.error("%s error (profile: %s): %s", platform.value, profile_name, e)
await self._safe_adapter_disconnect(adapter, platform)
return connected
def _make_profile_message_handler(self, profile_name: str):
"""Return a message handler that stamps source.profile then delegates."""
async def _handler(event):
try:
if getattr(event, "source", None) is not None and not event.source.profile:
event.source.profile = profile_name
except Exception:
pass
return await self._handle_message(event)
return _handler
@staticmethod
def _adapter_credential_fingerprint(adapter: Any) -> Optional[str]:
"""Return a stable, log-safe fingerprint of an adapter's credential.
Used only to detect two profiles claiming the same bot token. Returns a
salted hash (never the token itself) of the adapter's primary
credential, or None when no credential is discoverable (in which case
we don't attempt conflict detection for it).
"""
token = None
for attr in ("token", "bot_token", "_token", "api_token", "_bot_token"):
val = getattr(adapter, attr, None)
if isinstance(val, str) and val.strip():
token = val.strip()
break
if not token:
return None
import hashlib
return hashlib.sha256(("hermes-mux:" + token).encode("utf-8")).hexdigest()[:16]
def _create_adapter(
self,
platform: Platform,
@@ -13729,6 +14049,64 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
channel_prompt: Optional[str] = None,
persist_user_message: Optional[str] = None,
persist_user_timestamp: Optional[float] = None,
) -> Dict[str, Any]:
"""Profile-scoping wrapper around the agent run.
When multiplexing is active, resolve the inbound source's profile and
run the whole turn inside ``_profile_runtime_scope`` so config/skills/
memory resolve to that profile's home AND credentials resolve from that
profile's secret scope (never the process-global ``os.environ``). When
multiplexing is off this is a transparent pass-through — zero behavior
change for single-profile gateways.
"""
if not getattr(getattr(self, "config", None), "multiplex_profiles", False):
return await self._run_agent_inner(
message, context_prompt, history, source, session_id,
session_key=session_key, run_generation=run_generation,
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
persist_user_timestamp=persist_user_timestamp,
)
profile_home = self._resolve_profile_home_for_source(source)
with _profile_runtime_scope(profile_home):
return await self._run_agent_inner(
message, context_prompt, history, source, session_id,
session_key=session_key, run_generation=run_generation,
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
persist_user_timestamp=persist_user_timestamp,
)
def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path":
"""Resolve which profile's HERMES_HOME should serve this inbound source.
Prefers the profile the source was routed to (``source.profile`` — set
by the /p/<profile>/ URL prefix or a per-credential adapter), falling
back to the active profile (the multiplexer's own home).
"""
from hermes_cli.profiles import get_active_profile_name, get_profile_dir
try:
name = (source.profile or "").strip() or get_active_profile_name() or "default"
return get_profile_dir(name)
except Exception:
from hermes_constants import get_hermes_home
return get_hermes_home()
async def _run_agent_inner(
self,
message: str,
context_prompt: str,
history: List[Dict[str, Any]],
source: SessionSource,
session_id: str,
session_key: str = None,
run_generation: Optional[int] = None,
_interrupt_depth: int = 0,
event_message_id: Optional[str] = None,
channel_prompt: Optional[str] = None,
persist_user_message: Optional[str] = None,
persist_user_timestamp: Optional[float] = None,
) -> Dict[str, Any]:
"""
Run the agent with the given message and context.

View File

@@ -92,6 +92,11 @@ class SessionSource:
parent_chat_id: Optional[str] = None # Parent channel when chat_id refers to a thread
message_id: Optional[str] = None # ID of the triggering message (for pin/reply/react)
role_authorized: bool = False # True when adapter granted access via role (not user ID)
# Profile this inbound message is routed to in a multiplexing gateway
# (from the /p/<profile>/ URL prefix or per-credential adapter ownership).
# None => the gateway's active/default profile. Drives both session-key
# namespacing and the per-turn config/credential scope.
profile: Optional[str] = None
@property
def description(self) -> str:
@@ -135,6 +140,8 @@ class SessionSource:
d["parent_chat_id"] = self.parent_chat_id
if self.message_id:
d["message_id"] = self.message_id
if self.profile:
d["profile"] = self.profile
return d
@classmethod
@@ -153,6 +160,7 @@ class SessionSource:
guild_id=data.get("guild_id"),
parent_chat_id=data.get("parent_chat_id"),
message_id=data.get("message_id"),
profile=data.get("profile"),
)
@@ -615,15 +623,41 @@ def is_shared_multi_user_session(
return not group_sessions_per_user
def _session_key_namespace(profile: Optional[str]) -> str:
"""Return the ``agent:<ns>`` namespace prefix for a session key.
The historical key format is ``agent:main:<platform>:<chat_type>:...`` where
``main`` is a static namespace literal (NOT a branch name — branching keys
off ``session_id``, not this slot). Multi-profile multiplexing reuses this
slot to carry the profile:
- default profile (or ``None``/``""``/``"default"``) → ``agent:main`` —
BYTE-IDENTICAL to every key ever generated, so existing sessions and all
positional parsers (``parts[2]`` == platform, etc.) are unaffected.
- named profile ``coder`` → ``agent:coder`` — keeps the same positional
layout, just a different namespace, so two profiles serving the same
platform/chat never collide.
"""
if not profile or profile == "default":
return "agent:main"
return f"agent:{profile}"
def build_session_key(
source: SessionSource,
group_sessions_per_user: bool = True,
thread_sessions_per_user: bool = False,
profile: Optional[str] = None,
) -> str:
"""Build a deterministic session key from a message source.
This is the single source of truth for session key construction.
``profile`` selects the key namespace (see :func:`_session_key_namespace`).
It defaults to ``None`` ⇒ the legacy ``agent:main`` namespace, so callers
that don't multiplex produce byte-identical keys to before. Only the
multiplexing gateway passes a non-default profile.
DM rules:
- DMs include chat_id when present, so each private conversation is isolated.
- thread_id further differentiates threaded DMs within the same DM chat.
@@ -643,6 +677,7 @@ def build_session_key(
shared session per chat.
- Without identifiers, messages fall back to one session per platform/chat_type.
"""
ns = _session_key_namespace(profile)
platform = source.platform.value
if source.chat_type == "dm":
dm_chat_id = source.chat_id
@@ -651,12 +686,12 @@ def build_session_key(
if dm_chat_id:
if source.thread_id:
return f"agent:main:{platform}:dm:{dm_chat_id}:{source.thread_id}"
return f"agent:main:{platform}:dm:{dm_chat_id}"
return f"{ns}:{platform}:dm:{dm_chat_id}:{source.thread_id}"
return f"{ns}:{platform}:dm:{dm_chat_id}"
# No chat_id — fall back to the sender's own identifier before the
# bare per-platform sink. Without this, every DM from every user that
# arrives without a chat_id (non-standard adapters / synthetic sources)
# collapses into one shared "agent:main:<platform>:dm" session, and a
# collapses into one shared "<ns>:<platform>:dm" session, and a
# single cached agent ends up serving multiple people's conversations —
# cross-user history bleed. participant_id keeps DMs isolated per user.
dm_participant_id = source.user_id_alt or source.user_id
@@ -667,11 +702,11 @@ def build_session_key(
)
if dm_participant_id:
if source.thread_id:
return f"agent:main:{platform}:dm:{dm_participant_id}:{source.thread_id}"
return f"agent:main:{platform}:dm:{dm_participant_id}"
return f"{ns}:{platform}:dm:{dm_participant_id}:{source.thread_id}"
return f"{ns}:{platform}:dm:{dm_participant_id}"
if source.thread_id:
return f"agent:main:{platform}:dm:{source.thread_id}"
return f"agent:main:{platform}:dm"
return f"{ns}:{platform}:dm:{source.thread_id}"
return f"{ns}:{platform}:dm"
participant_id = source.user_id_alt or source.user_id
if participant_id and source.platform == Platform.WHATSAPP:
@@ -679,7 +714,7 @@ def build_session_key(
# single group member gets two isolated per-user sessions when the
# bridge reshuffles alias forms.
participant_id = canonical_whatsapp_identifier(str(participant_id)) or participant_id
key_parts = ["agent:main", platform, source.chat_type]
key_parts = [ns, platform, source.chat_type]
if source.chat_id:
key_parts.append(source.chat_id)
@@ -775,12 +810,32 @@ class SessionStore:
logger.debug("Could not remove temp file %s: %s", tmp_path, e)
raise
def _resolve_profile_for_key(self, source: Optional[SessionSource] = None) -> Optional[str]:
"""Return the profile namespace for session keys, or None when off.
When ``multiplex_profiles`` is disabled (default), returns ``None`` so
keys stay in the legacy ``agent:main`` namespace — byte-identical to
before. When enabled, prefers the profile the inbound source was routed
to (``source.profile`` — set by the /p/<profile>/ URL prefix or
per-credential adapter), falling back to the active profile name.
"""
if not getattr(self.config, "multiplex_profiles", False):
return None
if source is not None and source.profile:
return source.profile
try:
from hermes_cli.profiles import get_active_profile_name
return get_active_profile_name() or "default"
except Exception:
return None
def _generate_session_key(self, source: SessionSource) -> str:
"""Generate a session key from a source."""
return build_session_key(
source,
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
profile=self._resolve_profile_for_key(source),
)
def _is_session_expired(self, entry: SessionEntry) -> bool:

View File

@@ -515,6 +515,7 @@ def write_runtime_status(
platform_state: Any = _UNSET,
error_code: Any = _UNSET,
error_message: Any = _UNSET,
served_profiles: Any = _UNSET,
) -> None:
"""Persist gateway runtime health information for diagnostics/status."""
path = _get_runtime_status_path()
@@ -535,6 +536,11 @@ def write_runtime_status(
payload["restart_requested"] = bool(restart_requested)
if active_agents is not _UNSET:
payload["active_agents"] = max(0, int(active_agents))
if served_profiles is not _UNSET:
# Profiles this gateway multiplexes (multi-profile mode). Absent/empty
# for a single-profile gateway. Lets `hermes status` show per-profile
# coverage without a second probe.
payload["served_profiles"] = list(served_profiles or [])
if platform is not _UNSET:
platform_payload = payload["platforms"].get(platform, {})

View File

@@ -3865,6 +3865,86 @@ def _running_under_gateway_supervisor() -> bool:
return False
def _guard_named_profile_under_multiplexer(force: bool = False) -> None:
"""Refuse a named-profile gateway when a multiplexer is already serving it.
When the default profile's gateway runs with gateway.multiplex_profiles=on,
it is the sole inbound process for EVERY profile on the host. Starting a
separate gateway for a named profile would double-bind that profile's
platforms (two pollers on one bot token, port fights). In that mode a
named-profile ``hermes gateway run`` is always a misconfiguration, so we
hard-error with a pointer to the multiplexer. ``--force`` overrides.
Inert unless ALL of: (a) this invocation is a named profile, (b) a default-
profile gateway is running, (c) that gateway's config has multiplexing on.
"""
if force:
return
# (a) Are we a named profile? Default/custom-hash homes return "".
try:
suffix = _profile_suffix()
except Exception:
return
if not suffix:
return # default profile (or unrecognized) — this guard doesn't apply
try:
from hermes_constants import get_default_hermes_root
default_root = get_default_hermes_root()
# (b) Is the default-profile gateway running?
from gateway.status import get_running_pid as _default_running_pid # noqa
except Exception:
return
try:
import yaml as _yaml
from gateway.status import _read_pid_record # type: ignore
# (b) default gateway PID file present + alive
default_pid_path = default_root / "gateway.pid"
rec = _read_pid_record(default_pid_path)
if not rec:
return
from gateway.status import _pid_exists, _pid_from_record
pid = _pid_from_record(rec)
if not pid or not _pid_exists(pid):
return
# (c) default config has multiplexing on
cfg_path = default_root / "config.yaml"
if not cfg_path.exists():
return
with open(cfg_path, encoding="utf-8") as f:
cfg = _yaml.safe_load(f) or {}
multiplex = bool(
cfg.get("multiplex_profiles")
or (cfg.get("gateway", {}) or {}).get("multiplex_profiles")
)
if not multiplex:
return
except Exception:
logger.debug("Multiplexer-conflict probe failed", exc_info=True)
return
print_error(
f"The default gateway is running as a profile multiplexer and already "
f"serves profile '{suffix}'."
)
print(
" When gateway.multiplex_profiles is on, the default gateway is the\n"
" single inbound process for every profile. Starting a separate\n"
" gateway for this profile would double-bind its platforms (two\n"
" pollers on one bot token, port conflicts).\n"
)
print(" Manage the multiplexer instead (from the default profile):")
print()
print(" hermes gateway restart")
print()
print(" Pass --force to start a separate profile gateway anyway (not")
print(" recommended while the multiplexer is running).")
sys.exit(1)
def _guard_supervised_gateway_conflict(force: bool = False) -> None:
"""Refuse a foreground gateway when a service manager already supervises one.
@@ -3977,6 +4057,7 @@ def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False, fo
systemd/launchd service is already supervising this profile.
"""
_guard_official_docker_root_gateway()
_guard_named_profile_under_multiplexer(force=force)
_guard_supervised_gateway_conflict(force=force)
_guard_existing_gateway_process_conflict(replace=replace)
sys.path.insert(0, str(PROJECT_ROOT))

View File

@@ -29,7 +29,7 @@ import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import List, Optional
from typing import List, Optional, Tuple
from agent.skill_utils import is_excluded_skill_path
@@ -781,6 +781,47 @@ def list_profiles() -> List[ProfileInfo]:
return profiles
def profiles_to_serve(multiplex: bool) -> List[Tuple[str, Path]]:
"""Return the ``(profile_name, hermes_home)`` pairs a gateway should serve.
This is the single chokepoint for "which profiles does the inbound gateway
handle" so later multiplexing phases never re-derive the set.
- ``multiplex=False`` (default): returns exactly one entry for the *active*
profile — byte-for-byte the single-profile behavior the gateway has
always had. The name is ``"default"`` for the default profile or the
active named profile's id.
- ``multiplex=True``: returns the default profile plus every valid named
profile under ``profiles/``, each paired with its own HERMES_HOME.
Intentionally lightweight (a directory scan + name validation only): no
per-profile config reads, gateway-running probes, or skill counts like
:func:`list_profiles`. It runs on gateway startup and must stay cheap.
The returned ``hermes_home`` is the path to pass to
``set_hermes_home_override`` when scoping a turn to that profile.
"""
active = get_active_profile_name() or "default"
if not multiplex:
return [(active, get_profile_dir(active))]
serve: List[Tuple[str, Path]] = [("default", _get_default_hermes_home())]
profiles_root = _get_profiles_root()
if profiles_root.is_dir():
for entry in sorted(profiles_root.iterdir()):
if not entry.is_dir():
continue
name = entry.name
if name == "default":
continue # default is the built-in entry already added above
if not _PROFILE_ID_RE.match(name):
continue
serve.append((name, entry))
return serve
def create_profile(
name: str,
clone_from: Optional[str] = None,

View File

@@ -12,6 +12,7 @@ logger = logging.getLogger(__name__)
from hermes_cli import auth as auth_mod
from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool
from agent.secret_scope import get_secret as _get_secret
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
@@ -35,6 +36,19 @@ from hermes_constants import OPENROUTER_BASE_URL
from utils import base_url_host_matches, base_url_hostname, env_int
def _getenv(name: str, default: str = "") -> str:
"""Profile-scoped replacement for ``os.getenv`` on credential/provider reads.
Routes through the secret scope (Workstream A): identical to ``os.getenv``
when multiplexing is off, scope-aware (and fail-closed on an unscoped read)
when on. Genuinely-global vars are handled inside ``get_secret`` and still
read ``os.environ``. Keeps the ``(name, default) -> str`` contract every
call site here already relies on.
"""
val = _get_secret(name, default)
return val if val is not None else default
def _normalize_custom_provider_name(value: str) -> str:
return value.strip().lower().replace(" ", "-")
@@ -156,7 +170,7 @@ def _host_derived_api_key(base_url: str) -> str:
if sanitized in ("OPENAI", "OPENROUTER", "OLLAMA"):
return ""
env_name = f"{sanitized}_API_KEY"
return (os.getenv(env_name, "") or "").strip()
return (_getenv(env_name, "") or "").strip()
def _auto_detect_local_model(base_url: str) -> str:
@@ -437,7 +451,7 @@ def resolve_requested_provider(requested: Optional[str] = None) -> str:
# Prefer the persisted config selection over any stale shell/.env
# provider override so chat uses the endpoint the user last saved.
env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
env_provider = _getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
if env_provider:
return env_provider
@@ -542,7 +556,7 @@ def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, An
name_norm = _normalize_custom_provider_name(ep_name)
# Resolve the API key from the env var name stored in key_env
key_env = str(entry.get("key_env", "") or "").strip()
resolved_api_key = os.getenv(key_env, "").strip() if key_env else ""
resolved_api_key = _getenv(key_env, "").strip() if key_env else ""
# Fall back to inline api_key when key_env is absent or unresolvable
if not resolved_api_key:
resolved_api_key = str(entry.get("api_key", "") or "").strip()
@@ -761,8 +775,8 @@ def _resolve_named_custom_runtime(
api_key_candidates = [
(explicit_api_key or "").strip(),
# Gate env key fallbacks on authoritative hosts (#28660)
(os.getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
(os.getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
(_getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
(_getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
# intuitive match without configuring `custom_providers` first.
@@ -815,11 +829,11 @@ def _resolve_named_custom_runtime(
api_key_candidates = [
(explicit_api_key or "").strip(),
str(custom_provider.get("api_key", "") or "").strip(),
os.getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
_getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
# Gate provider env keys on their authoritative hosts — sending
# OPENAI_API_KEY to a local-llm endpoint leaks credentials (#28660).
(os.getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
(os.getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
(_getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
(_getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host as a final
# fallback when key_env wasn't set explicitly.
_host_derived_api_key(base_url),
@@ -878,8 +892,8 @@ def _resolve_openrouter_runtime(
except Exception:
pass
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
env_custom_base_url = os.getenv("CUSTOM_BASE_URL", "").strip()
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
env_custom_base_url = _getenv("CUSTOM_BASE_URL", "").strip()
# Use config base_url when available and the provider context matches.
# OPENAI_BASE_URL env var is no longer consulted — config.yaml is
@@ -919,8 +933,8 @@ def _resolve_openrouter_runtime(
if _is_openrouter_context:
api_key_candidates = [
explicit_api_key,
os.getenv("OPENROUTER_API_KEY"),
os.getenv("OPENAI_API_KEY"),
_getenv("OPENROUTER_API_KEY"),
_getenv("OPENAI_API_KEY"),
]
else:
# Custom endpoint: use api_key from config when using config base_url (#1760).
@@ -940,9 +954,9 @@ def _resolve_openrouter_runtime(
api_key_candidates = [
explicit_api_key,
(cfg_api_key if use_config_base_url else ""),
(os.getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
(os.getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
(os.getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
(_getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
(_getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
(_getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
# intuitive match. Helper returns "" for IPs/loopback and for env
@@ -1045,7 +1059,7 @@ def _resolve_azure_foundry_runtime(
if inferred:
cfg_api_mode = inferred
env_base_url = os.getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
env_base_url = _getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
base_url = explicit_base_url_clean or cfg_base_url or env_base_url
if not base_url:
raise AuthError(
@@ -1134,7 +1148,7 @@ def _resolve_azure_foundry_runtime(
except Exception:
api_key = ""
if not api_key:
api_key = os.getenv("AZURE_FOUNDRY_API_KEY", "").strip()
api_key = _getenv("AZURE_FOUNDRY_API_KEY", "").strip()
if not api_key:
raise AuthError(
"Azure Foundry requires an API key. Set AZURE_FOUNDRY_API_KEY in "
@@ -1234,7 +1248,7 @@ def _resolve_explicit_runtime(
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
creds = resolve_nous_runtime_credentials(
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
api_key = creds.get("api_key", "")
expires_at = creds.get("expires_at")
@@ -1263,7 +1277,7 @@ def _resolve_explicit_runtime(
if pconfig and pconfig.auth_type == "api_key":
env_url = ""
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
env_url = _getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
base_url = explicit_base_url
if not base_url:
@@ -1335,8 +1349,8 @@ def resolve_runtime_provider(
if requested_provider == "anthropic" and "azure.com" in _eff_base:
_azure_key = (
(explicit_api_key or "").strip()
or os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
or os.getenv("ANTHROPIC_API_KEY", "").strip()
or _getenv("AZURE_ANTHROPIC_KEY", "").strip()
or _getenv("ANTHROPIC_API_KEY", "").strip()
)
return {
"provider": "anthropic",
@@ -1391,8 +1405,8 @@ def resolve_runtime_provider(
if provider == "openrouter":
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
env_openai_base_url = _getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
has_custom_endpoint = bool(
explicit_base_url
or env_openai_base_url
@@ -1448,7 +1462,7 @@ def resolve_runtime_provider(
if provider == "nous":
try:
creds = resolve_nous_runtime_credentials(
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
return {
"provider": "nous",
@@ -1601,7 +1615,7 @@ def resolve_runtime_provider(
for hint_key in ("key_env", "api_key_env"):
env_var = str(model_cfg.get(hint_key) or "").strip()
if env_var:
token = os.getenv(env_var, "").strip()
token = _getenv(env_var, "").strip()
if token:
break
# Next: an inline api_key on the model config (useful in multi-profile
@@ -1611,8 +1625,8 @@ def resolve_runtime_provider(
# Finally fall back to the historical fixed names.
if not token:
token = (
os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
or os.getenv("ANTHROPIC_API_KEY", "").strip()
_getenv("AZURE_ANTHROPIC_KEY", "").strip()
or _getenv("ANTHROPIC_API_KEY", "").strip()
)
if not token:
raise AuthError(

View File

@@ -0,0 +1,130 @@
"""Tests for the profile-scoped credential primitive (Workstream A / Phase 2)."""
import pytest
from agent import secret_scope as ss
@pytest.fixture(autouse=True)
def _reset_multiplex():
"""Ensure each test starts and ends with multiplexing off (it's a global)."""
ss.set_multiplex_active(False)
yield
ss.set_multiplex_active(False)
class TestMultiplexInactiveBackwardCompat:
"""Default deployment: get_secret transparently reads os.environ."""
def test_reads_environ(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test")
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-test"
def test_missing_returns_default(self, monkeypatch):
monkeypatch.delenv("NOPE_KEY", raising=False)
assert ss.get_secret("NOPE_KEY") is None
assert ss.get_secret("NOPE_KEY", "fallback") == "fallback"
def test_no_raise_without_scope(self, monkeypatch):
monkeypatch.delenv("SOME_KEY", raising=False)
# multiplex off => unscoped read is fine, returns default
assert ss.get_secret("SOME_KEY") is None
class TestMultiplexActiveFailClosed:
"""Multiplex on: an unscoped secret read raises instead of leaking."""
def test_unscoped_read_raises(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-leaky")
ss.set_multiplex_active(True)
with pytest.raises(ss.UnscopedSecretError):
ss.get_secret("ANTHROPIC_API_KEY")
def test_scoped_read_uses_scope_not_environ(self, monkeypatch):
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-from-environ")
ss.set_multiplex_active(True)
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-from-scope"})
try:
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-from-scope"
finally:
ss.reset_secret_scope(token)
def test_scoped_missing_key_returns_default_not_environ(self, monkeypatch):
# Even though the value exists in os.environ, a scope is authoritative:
# an absent scope key must NOT fall through to the (cross-profile) env.
monkeypatch.setenv("OPENAI_API_KEY", "sk-other-profile")
ss.set_multiplex_active(True)
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-mine"})
try:
assert ss.get_secret("OPENAI_API_KEY") is None
assert ss.get_secret("OPENAI_API_KEY", "d") == "d"
finally:
ss.reset_secret_scope(token)
def test_global_env_still_reads_environ_under_multiplex(self, monkeypatch):
monkeypatch.setenv("HERMES_HOME", "/opt/data")
ss.set_multiplex_active(True)
# No scope, multiplex on — but HERMES_HOME is global, so no raise.
assert ss.get_secret("HERMES_HOME") == "/opt/data"
def test_kanban_prefix_is_global(self, monkeypatch):
monkeypatch.setenv("HERMES_KANBAN_DB", "/x/kanban.db")
ss.set_multiplex_active(True)
assert ss.get_secret("HERMES_KANBAN_DB") == "/x/kanban.db"
class TestScopeIsolation:
"""Two scopes never see each other's secrets."""
def test_nested_scopes_restore(self):
ss.set_multiplex_active(True)
t1 = ss.set_secret_scope({"K": "a"})
try:
assert ss.get_secret("K") == "a"
t2 = ss.set_secret_scope({"K": "b"})
try:
assert ss.get_secret("K") == "b"
finally:
ss.reset_secret_scope(t2)
assert ss.get_secret("K") == "a"
finally:
ss.reset_secret_scope(t1)
class TestEnvFileParsing:
"""load_env_file parses without mutating os.environ."""
def test_parses_basic(self, tmp_path):
env = tmp_path / ".env"
env.write_text(
"# comment\n"
"ANTHROPIC_API_KEY=sk-abc\n"
"export OPENAI_API_KEY=sk-def\n"
'QUOTED="quoted-value"\n'
"SINGLE='single'\n"
"\n"
"BAD_LINE_NO_EQUALS\n"
)
out = ss.load_env_file(env)
assert out == {
"ANTHROPIC_API_KEY": "sk-abc",
"OPENAI_API_KEY": "sk-def",
"QUOTED": "quoted-value",
"SINGLE": "single",
}
def test_does_not_mutate_environ(self, tmp_path, monkeypatch):
monkeypatch.delenv("ZZZ_KEY", raising=False)
env = tmp_path / ".env"
env.write_text("ZZZ_KEY=secret\n")
ss.load_env_file(env)
import os
assert "ZZZ_KEY" not in os.environ
def test_missing_file_returns_empty(self, tmp_path):
assert ss.load_env_file(tmp_path / "nope.env") == {}
def test_build_profile_secret_scope(self, tmp_path):
(tmp_path / ".env").write_text("ANTHROPIC_API_KEY=sk-profile\n")
assert ss.build_profile_secret_scope(tmp_path) == {
"ANTHROPIC_API_KEY": "sk-profile"
}

View File

@@ -0,0 +1,136 @@
"""Phase 3: secondary-profile adapter registry + same-token conflict detection."""
import pytest
from gateway.run import GatewayRunner
class _FakeAdapter:
def __init__(self, token=None):
self.token = token
class TestCredentialFingerprint:
def test_none_without_token(self):
assert GatewayRunner._adapter_credential_fingerprint(_FakeAdapter()) is None
def test_stable_and_log_safe(self):
a = _FakeAdapter(token="secret-bot-token")
fp1 = GatewayRunner._adapter_credential_fingerprint(a)
fp2 = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="secret-bot-token"))
assert fp1 == fp2 # stable
assert "secret-bot-token" not in (fp1 or "") # never the raw token
assert len(fp1) == 16
def test_distinct_tokens_distinct_fp(self):
a = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-A"))
b = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-B"))
assert a != b
def test_reads_alt_attrs(self):
class _AltAdapter:
def __init__(self):
self.bot_token = "alt-token"
assert GatewayRunner._adapter_credential_fingerprint(_AltAdapter()) is not None
class TestProfileMessageHandler:
@pytest.mark.asyncio
async def test_stamps_profile_on_unstamped_source(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = None
class _Evt:
source = _Src()
result = await handler(_Evt())
assert result == "ok"
assert seen["profile"] == "coder"
@pytest.mark.asyncio
async def test_does_not_override_existing_profile(self):
runner = GatewayRunner.__new__(GatewayRunner)
seen = {}
async def _fake_handle(event):
seen["profile"] = event.source.profile
return "ok"
runner._handle_message = _fake_handle
handler = runner._make_profile_message_handler("coder")
class _Src:
profile = "writer" # already stamped (e.g. by URL prefix)
class _Evt:
source = _Src()
await handler(_Evt())
assert seen["profile"] == "writer"
class TestPortBindingHardError:
"""A secondary profile enabling a port-binding platform aborts startup."""
@pytest.mark.asyncio
async def test_secondary_webhook_raises(self, monkeypatch):
from gateway.run import MultiplexConfigError
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
# reviewer profile config enables webhook (a port-binding platform)
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.WEBHOOK: PlatformConfig(enabled=True, extra={"port": 8644}),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
with pytest.raises(MultiplexConfigError) as ei:
await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert "webhook" in str(ei.value)
assert "reviewer" in str(ei.value)
@pytest.mark.asyncio
async def test_secondary_non_binding_platform_ok(self, monkeypatch):
"""A non-port-binding platform (e.g. telegram) is NOT rejected."""
from gateway.config import GatewayConfig, Platform, PlatformConfig
runner = GatewayRunner.__new__(GatewayRunner)
runner.config = GatewayConfig(multiplex_profiles=True)
runner._profile_adapters = {}
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
reviewer_cfg.platforms = {
Platform.TELEGRAM: PlatformConfig(enabled=True, token="t"),
}
monkeypatch.setattr(
"gateway.config.load_gateway_config", lambda: reviewer_cfg
)
# _create_adapter returns None here (no real telegram token wiring), so
# the loop simply connects nothing — the key assertion is NO raise.
monkeypatch.setattr(runner, "_create_adapter", lambda p, c: None)
connected = await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
assert connected == 0 # nothing connected, but no MultiplexConfigError
def test_port_binding_set_covers_known_listeners(self):
from gateway.run import _PORT_BINDING_PLATFORM_VALUES
# Every adapter that binds a TCP port must be in the guard set.
for p in ("webhook", "api_server", "msgraph_webhook", "feishu",
"wecom_callback", "bluebubbles", "sms"):
assert p in _PORT_BINDING_PLATFORM_VALUES

View File

@@ -0,0 +1,88 @@
"""End-to-end credential isolation proof for multiplex mode (Workstream A).
These exercise the REAL resolution path (runtime_provider, secret scope, MCP
interpolation) rather than mocking it, proving the property that matters: two
profiles with different keys never see each other's, and an unscoped read in
multiplex mode fails closed instead of leaking.
"""
import pytest
from agent import secret_scope as ss
@pytest.fixture(autouse=True)
def _reset(monkeypatch):
ss.set_multiplex_active(False)
yield
ss.set_multiplex_active(False)
class TestRuntimeProviderUsesScope:
"""hermes_cli.runtime_provider._getenv resolves through the secret scope."""
def test_getenv_reads_scope_under_multiplex(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-global-leak")
ss.set_multiplex_active(True)
tok = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-profileA"})
try:
assert _getenv("ANTHROPIC_API_KEY") == "sk-profileA"
finally:
ss.reset_secret_scope(tok)
def test_getenv_two_profiles_isolated(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
ss.set_multiplex_active(True)
tok_a = ss.set_secret_scope({"OPENAI_API_KEY": "sk-A"})
try:
assert _getenv("OPENAI_API_KEY") == "sk-A"
finally:
ss.reset_secret_scope(tok_a)
tok_b = ss.set_secret_scope({"OPENAI_API_KEY": "sk-B"})
try:
assert _getenv("OPENAI_API_KEY") == "sk-B"
finally:
ss.reset_secret_scope(tok_b)
def test_getenv_fails_closed_unscoped(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-leak")
ss.set_multiplex_active(True)
with pytest.raises(ss.UnscopedSecretError):
_getenv("OPENROUTER_API_KEY")
def test_getenv_global_var_still_reads_environ(self, monkeypatch):
from hermes_cli.runtime_provider import _getenv
monkeypatch.setenv("HERMES_MAX_ITERATIONS", "42")
ss.set_multiplex_active(True)
# global var: no scope needed, no raise
assert _getenv("HERMES_MAX_ITERATIONS") == "42"
class TestMcpInterpolationUsesScope:
"""MCP config ${VAR} interpolation resolves through the secret scope."""
def test_interpolation_reads_scope(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.setenv("MY_MCP_TOKEN", "global-token")
ss.set_multiplex_active(True)
tok = ss.set_secret_scope({"MY_MCP_TOKEN": "profile-token"})
try:
cfg = {"env": {"TOKEN": "${MY_MCP_TOKEN}"}}
assert _interpolate_env_vars(cfg) == {"env": {"TOKEN": "profile-token"}}
finally:
ss.reset_secret_scope(tok)
def test_interpolation_unset_keeps_placeholder(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.delenv("UNSET_MCP_VAR", raising=False)
# multiplex off: unset var keeps literal placeholder (legacy behavior)
assert _interpolate_env_vars("${UNSET_MCP_VAR}") == "${UNSET_MCP_VAR}"
def test_interpolation_off_reads_environ(self, monkeypatch):
from tools.mcp_tool import _interpolate_env_vars
monkeypatch.setenv("MY_MCP_TOKEN", "env-token")
# multiplex off: legacy os.environ resolution
assert _interpolate_env_vars("${MY_MCP_TOKEN}") == "env-token"

View File

@@ -0,0 +1,73 @@
"""Phase 1: HTTP-inbound /p/<profile>/ routing for the webhook adapter."""
import pytest
from gateway.config import GatewayConfig, Platform
from gateway.session import SessionSource, build_session_key
class TestSessionSourceProfileField:
def test_profile_roundtrips(self):
s = SessionSource(
platform=Platform.WEBHOOK if hasattr(Platform, "WEBHOOK") else Platform.TELEGRAM,
chat_id="c1",
chat_type="webhook",
profile="coder",
)
restored = SessionSource.from_dict(s.to_dict())
assert restored.profile == "coder"
def test_profile_absent_not_serialized(self):
s = SessionSource(platform=Platform.TELEGRAM, chat_id="c1", chat_type="dm")
assert "profile" not in s.to_dict()
def test_source_profile_drives_session_key_namespace(self):
s = SessionSource(platform=Platform.TELEGRAM, chat_id="99", chat_type="dm")
# build_session_key takes profile explicitly; the adapter passes
# source.profile through. Verify the namespace follows it.
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
class TestWebhookProfileResolution:
"""_resolve_request_profile validates the /p/<profile>/ prefix."""
def _adapter(self, multiplex: bool, served=("default", "coder")):
from gateway.platforms.webhook import WebhookAdapter, _PROFILE_REJECTED
class _FakeReq:
def __init__(self, profile):
self.match_info = {"profile": profile} if profile is not None else {}
cfg = GatewayConfig(multiplex_profiles=multiplex)
class _Runner:
config = cfg
# Construct minimally; we only call _resolve_request_profile.
adapter = WebhookAdapter.__new__(WebhookAdapter)
adapter.gateway_runner = _Runner()
return adapter, _FakeReq, _PROFILE_REJECTED, served
def test_no_prefix_returns_none(self):
adapter, Req, _REJ, _ = self._adapter(multiplex=True)
assert adapter._resolve_request_profile(Req(None)) is None
def test_prefix_ignored_when_multiplex_off(self):
adapter, Req, _REJ, _ = self._adapter(multiplex=False)
# Even a bogus profile is ignored (not 404'd) when multiplexing is off.
assert adapter._resolve_request_profile(Req("anything")) is None
def test_known_profile_accepted(self, monkeypatch):
adapter, Req, _REJ, served = self._adapter(multiplex=True)
monkeypatch.setattr(
"hermes_cli.profiles.profiles_to_serve",
lambda multiplex: [(n, None) for n in served],
)
assert adapter._resolve_request_profile(Req("coder")) == "coder"
def test_unknown_profile_rejected(self, monkeypatch):
adapter, Req, REJ, served = self._adapter(multiplex=True)
monkeypatch.setattr(
"hermes_cli.profiles.profiles_to_serve",
lambda multiplex: [(n, None) for n in served],
)
assert adapter._resolve_request_profile(Req("ghost")) is REJ

View File

@@ -0,0 +1,55 @@
"""Phase 4: lifecycle guard + per-profile observability."""
import pytest
class TestServedProfilesStatus:
def test_write_and_read_served_profiles(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
import importlib
import gateway.status as status
importlib.reload(status)
try:
status.write_runtime_status(
gateway_state="running", served_profiles=["default", "coder"]
)
rec = status.read_runtime_status()
assert rec.get("served_profiles") == ["default", "coder"]
finally:
importlib.reload(status)
def test_served_profiles_absent_by_default(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
import importlib
import gateway.status as status
importlib.reload(status)
try:
status.write_runtime_status(gateway_state="running")
rec = status.read_runtime_status()
assert "served_profiles" not in rec
finally:
importlib.reload(status)
class TestNamedProfileMultiplexerGuard:
"""_guard_named_profile_under_multiplexer is inert unless all conditions hold."""
def test_inert_for_default_profile(self, monkeypatch):
from hermes_cli import gateway as gw
monkeypatch.setattr(gw, "_profile_suffix", lambda: "")
# Should return without raising (default profile => guard N/A).
gw._guard_named_profile_under_multiplexer(force=False)
def test_force_bypasses(self, monkeypatch):
from hermes_cli import gateway as gw
# Even if it looks like a named profile, force returns immediately.
monkeypatch.setattr(gw, "_profile_suffix", lambda: "coder")
gw._guard_named_profile_under_multiplexer(force=True)
def test_inert_when_no_default_gateway_running(self, monkeypatch, tmp_path):
from hermes_cli import gateway as gw
monkeypatch.setattr(gw, "_profile_suffix", lambda: "coder")
monkeypatch.setattr(
"hermes_constants.get_default_hermes_root", lambda: tmp_path
)
# No gateway.pid in tmp_path => no running default gateway => no raise.
gw._guard_named_profile_under_multiplexer(force=False)

View File

@@ -0,0 +1,165 @@
"""Phase 0 foundations for multi-profile gateway multiplexing.
Covers the three Phase 0 deliverables:
1. ``gateway.multiplex_profiles`` config flag (default False, round-trips).
2. ``hermes_cli.profiles.profiles_to_serve`` enumeration.
3. Profile-stamped ``build_session_key`` that is BYTE-IDENTICAL when the
flag is off (the orphan-every-session guard) and namespace-segmented when
on, without disturbing the positional key layout downstream parsers rely
on.
"""
import pytest
from unittest.mock import patch
from gateway.config import GatewayConfig, Platform
from gateway.session import SessionSource, SessionStore, build_session_key
def _src(**kw) -> SessionSource:
kw.setdefault("platform", Platform.TELEGRAM)
kw.setdefault("chat_id", "99")
kw.setdefault("chat_type", "dm")
return SessionSource(**kw)
class TestSessionKeyByteIdenticalWhenOff:
"""The non-negotiable guard: with no profile (or 'default'), every key is
byte-for-byte what it was before Phase 0. A diff here orphans every
existing session on upgrade."""
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_with_chat_id(self, profile):
s = _src(chat_id="99", chat_type="dm")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99"
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_with_thread(self, profile):
s = _src(chat_id="99", chat_type="dm", thread_id="t1")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99:t1"
@pytest.mark.parametrize("profile", [None, "default"])
def test_dm_without_chat_id_falls_back_to_user(self, profile):
s = _src(chat_id="", chat_type="dm", user_id="jordan")
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:jordan"
@pytest.mark.parametrize("profile", [None, "default"])
def test_group_per_user(self, profile):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, profile=profile)
== "agent:main:discord:group:g1:alice"
)
@pytest.mark.parametrize("profile", [None, "default"])
def test_group_shared_when_disabled(self, profile):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, group_sessions_per_user=False, profile=profile)
== "agent:main:discord:group:g1"
)
class TestSessionKeyNamespacedWhenOn:
"""A named profile occupies the namespace slot, isolating its sessions."""
def test_named_profile_dm(self):
s = _src(chat_id="99", chat_type="dm")
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
def test_named_profile_group_per_user(self):
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
assert (
build_session_key(s, profile="coder")
== "agent:coder:discord:group:g1:alice"
)
def test_two_profiles_same_chat_do_not_collide(self):
s = _src(chat_id="99", chat_type="dm")
a = build_session_key(s, profile="default")
b = build_session_key(s, profile="coder")
c = build_session_key(s, profile="writer")
assert a != b != c and a != c
def test_positional_layout_preserved_for_parsers(self):
"""Downstream parsers split on ':' and read parts[2]=platform,
parts[3]=chat_type, parts[4]=chat_id (see qqbot adapter
_parse_gateway_session_key). The profile must occupy parts[1] only."""
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
parts = build_session_key(s, profile="coder").split(":")
assert parts[0] == "agent"
assert parts[1] == "coder" # namespace slot (was always 'main')
assert parts[2] == "discord" # platform — unchanged offset
assert parts[3] == "group" # chat_type — unchanged offset
assert parts[4] == "g1" # chat_id — unchanged offset
def test_default_namespace_layout_matches_named(self):
"""Default and named keys differ ONLY in parts[1]."""
s = _src(platform=Platform.SLACK, chat_id="c1", chat_type="channel", user_id="u1")
d = build_session_key(s, profile="default").split(":")
n = build_session_key(s, profile="coder").split(":")
assert d[0] == n[0] == "agent"
assert d[1] == "main" and n[1] == "coder"
assert d[2:] == n[2:] # everything after the namespace is identical
class TestMultiplexConfigFlag:
"""gateway.multiplex_profiles defaults off and round-trips."""
def test_default_is_false(self):
assert GatewayConfig().multiplex_profiles is False
def test_to_dict_includes_flag(self):
assert GatewayConfig().to_dict()["multiplex_profiles"] is False
def test_from_dict_top_level(self):
cfg = GatewayConfig.from_dict({"multiplex_profiles": True})
assert cfg.multiplex_profiles is True
def test_from_dict_nested_gateway(self):
cfg = GatewayConfig.from_dict({"gateway": {"multiplex_profiles": True}})
assert cfg.multiplex_profiles is True
def test_from_dict_coerces_truthy_string(self):
cfg = GatewayConfig.from_dict({"multiplex_profiles": "true"})
assert cfg.multiplex_profiles is True
def test_roundtrip(self):
cfg = GatewayConfig.from_dict(GatewayConfig(multiplex_profiles=True).to_dict())
assert cfg.multiplex_profiles is True
class TestSessionStoreProfileResolution:
"""SessionStore._generate_session_key honors the flag: legacy namespace
when off, active-profile namespace when on."""
def _store(self, tmp_path, **cfg_kw):
config = GatewayConfig(**cfg_kw)
with patch("gateway.session.SessionStore._ensure_loaded"):
s = SessionStore(sessions_dir=tmp_path, config=config)
s._db = None
s._loaded = True
return s
def test_flag_off_uses_legacy_namespace(self, tmp_path):
store = self._store(tmp_path) # multiplex_profiles defaults False
s = _src(chat_id="99", chat_type="dm")
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"
assert store._generate_session_key(s) == build_session_key(s)
def test_flag_off_resolve_profile_is_none(self, tmp_path):
store = self._store(tmp_path)
assert store._resolve_profile_for_key() is None
def test_flag_on_uses_active_profile_namespace(self, tmp_path):
store = self._store(tmp_path, multiplex_profiles=True)
s = _src(chat_id="99", chat_type="dm")
with patch("hermes_cli.profiles.get_active_profile_name", return_value="coder"):
assert store._generate_session_key(s) == "agent:coder:telegram:dm:99"
def test_flag_on_default_profile_stays_legacy(self, tmp_path):
store = self._store(tmp_path, multiplex_profiles=True)
s = _src(chat_id="99", chat_type="dm")
with patch("hermes_cli.profiles.get_active_profile_name", return_value="default"):
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"

View File

@@ -35,6 +35,7 @@ from hermes_cli.profiles import (
has_bundled_skills_opt_out,
NO_BUNDLED_SKILLS_MARKER,
backfill_profile_envs,
profiles_to_serve,
)
from hermes_cli.config import DEFAULT_CONFIG
@@ -1487,3 +1488,48 @@ class TestEdgeCases:
delete_profile("coder", yes=True)
assert get_active_profile() == "default"
class TestProfilesToServe:
"""profiles_to_serve(multiplex) — the gateway's profile-enumeration chokepoint."""
def test_off_returns_only_active_default(self, profile_env):
serve = profiles_to_serve(multiplex=False)
assert len(serve) == 1
name, home = serve[0]
assert name == "default"
assert home == _get_default_hermes_home()
def test_off_returns_only_active_named(self, profile_env, monkeypatch):
# A named profile's gateway runs with HERMES_HOME pointing at the
# profile dir; get_active_profile_name() infers the name from there.
create_profile("coder", no_alias=True)
monkeypatch.setenv("HERMES_HOME", str(get_profile_dir("coder")))
serve = profiles_to_serve(multiplex=False)
assert len(serve) == 1
assert serve[0][0] == "coder"
assert serve[0][1] == get_profile_dir("coder")
def test_on_returns_default_plus_all_named(self, profile_env):
create_profile("coder", no_alias=True)
create_profile("writer", no_alias=True)
serve = dict(profiles_to_serve(multiplex=True))
assert set(serve) == {"default", "coder", "writer"}
assert serve["default"] == _get_default_hermes_home()
assert serve["coder"] == get_profile_dir("coder")
def test_on_default_always_first(self, profile_env):
create_profile("coder", no_alias=True)
serve = profiles_to_serve(multiplex=True)
assert serve[0][0] == "default"
def test_on_active_profile_does_not_change_set(self, profile_env):
"""Enumeration is independent of which profile is active."""
create_profile("coder", no_alias=True)
set_active_profile("coder")
serve = dict(profiles_to_serve(multiplex=True))
assert set(serve) == {"default", "coder"}
def test_on_no_named_profiles_returns_just_default(self, profile_env):
serve = profiles_to_serve(multiplex=True)
assert [n for n, _ in serve] == ["default"]

View File

@@ -2662,10 +2662,19 @@ def _interrupted_call_result() -> str:
# ---------------------------------------------------------------------------
def _interpolate_env_vars(value):
"""Recursively resolve ``${VAR}`` placeholders from ``os.environ``."""
"""Recursively resolve ``${VAR}`` placeholders.
Resolves from the active profile's secret scope when multiplexing is on
(so an MCP server config's ``${API_KEY}`` picks up the routed profile's
value, not the process-global ``os.environ`` which may hold another
profile's), falling back to ``os.environ`` otherwise. Unset vars keep the
literal ``${VAR}`` placeholder, as before.
"""
from agent.secret_scope import get_secret as _get_secret
if isinstance(value, str):
def _replace(m):
return os.environ.get(m.group(1), m.group(0))
return _get_secret(m.group(1), m.group(0)) or m.group(0)
return _ENV_VAR_PATTERN.sub(_replace, value)
if isinstance(value, dict):
return {k: _interpolate_env_vars(v) for k, v in value.items()}

View File

@@ -56,6 +56,139 @@ research gateway start
That's it — three independent agents, each on its own process, restarting
automatically on crash and on user login.
## Alternative: one gateway for all profiles (multiplexing)
The model above runs **one process per profile**. That is the default and is
the right choice for most setups. But on a host with many profiles — or a
container deployment where one process per profile is operationally heavy — you
can instead run a **single multiplexing gateway**: the default profile's gateway
becomes the sole inbound process and serves messages for *every* profile on the
box.
This is **opt-in** and **off by default**. When it's off, nothing on this page
changes — every behavior below is inert.
### When to prefer multiplexing
- A container/VPS deployment where N supervisor units, N ports, and N PID files
are a burden.
- Many low-traffic profiles that don't each justify a full process.
- You want a single thing to start, monitor, and restart.
Stick with one-process-per-profile when you want hard process-level isolation
between profiles (separate memory footprints, independent crash domains, the
ability to restart one profile without touching the others).
### How to opt in
Set the flag on the **default profile** (it owns the multiplexer) and restart
its gateway:
```bash
hermes config set gateway.multiplex_profiles true
hermes gateway restart
```
Equivalently, in the default profile's `~/.hermes/config.yaml`:
```yaml
gateway:
multiplex_profiles: true
```
(The flag is also accepted as a top-level `multiplex_profiles: true` for
convenience.) On the next start the default gateway enumerates every profile,
brings up each profile's enabled platforms under that profile's own
credentials, and routes each inbound message to the profile it belongs to. Each
turn resolves the routed profile's config, skills, memory, SOUL, **and provider
keys** — credentials are never shared across profiles.
You do **not** run `hermes gateway start` for the secondary profiles — the
default gateway serves them. See the contract changes below.
### What changes when multiplexing is on
Enabling the flag changes how a few things behave. All of these revert the
moment the flag is off.
#### 1. Secondary profiles must not start their own gateway
With a multiplexer running, a named-profile `hermes gateway start` / `run` is a
**hard error**, pointing you back at the multiplexer:
```
The default gateway is running as a profile multiplexer and already serves
profile 'coder'. ...
```
The multiplexer is the single inbound process; a second profile gateway would
double-bind that profile's platforms. Pass `--force` only if you deliberately
want a separate process for that profile (not recommended while the multiplexer
is running). The cross-profile lifecycle wrapper script earlier on this page is
therefore **not** used in multiplex mode — you only manage the default gateway.
#### 2. HTTP-inbound platforms are reached via a `/p/<profile>/` URL prefix
Webhook (and other HTTP-inbound) traffic for a secondary profile arrives on the
default listener under a profile prefix, **not** a second port:
```
# default profile
POST http://host:8644/webhooks/<route>
# the "coder" profile, same listener
POST http://host:8644/p/coder/webhooks/<route>
```
An unknown or unconfigured profile in the prefix returns `404`. Because the one
shared listener already serves every profile this way, a **secondary profile
must not enable a port-binding platform itself** — doing so is a config error
and the gateway refuses to start, naming the profile and platform:
```
Profile 'coder' enables the port-binding platform 'webhook', but
gateway.multiplex_profiles is on. ... Remove platforms.webhook from profile
'coder's config.yaml (configure it only on the default profile).
```
Port-binding platforms covered by this rule: `webhook`, `api_server`,
`msgraph_webhook`, `feishu`, `wecom_callback`, `bluebubbles`, `sms`. Configure
any of these **only on the default profile**; every profile is reachable through
its `/p/<profile>/` prefix.
#### 3. Per-credential platforms still need their own token per profile
Polling/connection platforms (Telegram, Discord, Slack, Matrix, Signal, …) work
fine multiplexed, but each profile that enables one must supply its **own** bot
token — the same token cannot be polled by two profiles at once. If two profiles
configure the same `(platform, token)`, startup fails fast naming both profiles
(see [Token-conflict safety](#token-conflict-safety) — the rule is unchanged,
it's just enforced inside the one process now).
#### 4. Session keys are namespaced by profile
Each profile's sessions live under an `agent:<profile>:…` namespace so two
profiles on the same platform/chat never collide in the shared session store.
The **default** profile keeps the historical `agent:main:…` namespace
byte-for-byte, so existing default-profile sessions are unaffected — no
migration, no orphaned history.
#### 5. One PID/lock and one status surface
There is a single process-level PID and lock (the multiplexer, under the default
home). `hermes status` reports the multiplexer and the profiles it serves;
`hermes status -p <name>` slices to one profile. Each profile still writes its
own `runtime_status.json` under its own home, so existing per-profile readers
keep working.
#### What does **not** change
Per-profile `.env` credential isolation is preserved and, if anything,
stricter: a profile's keys are resolved from its own scope and are never unioned
into a shared environment (this also means subprocesses like MCP servers and
Kanban workers only ever see their own profile's secrets). Kanban,
profile-scoped skills/memory/SOUL, and model routing all behave per-profile
exactly as they do with separate gateways.
## Start, stop, or restart all gateways at once
The CLI ships with single-profile lifecycle commands. To act across every