mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-19 16:40:38 +08:00
Compare commits
6 Commits
bb/pets
...
feat/multi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9af07e4bf | ||
|
|
5627099949 | ||
|
|
f9f51b798f | ||
|
|
98b3cf2956 | ||
|
|
cdd708c6bf | ||
|
|
b8b24c3ba5 |
@@ -15,6 +15,7 @@ from typing import Any, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from hermes_constants import OPENROUTER_BASE_URL
|
||||
from hermes_cli.config import load_env
|
||||
from agent.secret_scope import get_secret as _get_secret
|
||||
from agent.credential_persistence import (
|
||||
is_borrowed_credential_source,
|
||||
sanitize_borrowed_credential_payload,
|
||||
@@ -1666,7 +1667,7 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
|
||||
_env_file = load_env()
|
||||
|
||||
def _env_val(key: str) -> str:
|
||||
return (_env_file.get(key) or os.environ.get(key) or "").strip()
|
||||
return (_env_file.get(key) or _get_secret(key, "") or "").strip()
|
||||
|
||||
anthropic_api_key = _env_val("ANTHROPIC_API_KEY")
|
||||
anthropic_oauth_env = (
|
||||
@@ -1952,7 +1953,7 @@ def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool
|
||||
# changes to the .env file.
|
||||
def _get_env_prefer_dotenv(key: str) -> str:
|
||||
env_file = load_env()
|
||||
val = env_file.get(key) or os.environ.get(key) or ""
|
||||
val = env_file.get(key) or _get_secret(key, "") or ""
|
||||
return val.strip()
|
||||
|
||||
# Honour user suppression — `hermes auth remove <provider> <N>` for an
|
||||
|
||||
205
agent/secret_scope.py
Normal file
205
agent/secret_scope.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Profile-scoped credential resolution for multi-profile gateway multiplexing.
|
||||
|
||||
The multiplexing gateway serves many profiles from one process. Each profile
|
||||
has its own ``.env`` with its own provider keys and platform tokens, so we
|
||||
**cannot** union them into the process-global ``os.environ`` (that would leak
|
||||
profile A's keys to profile B's turns, and to every subprocess spawned with
|
||||
``env=dict(os.environ)``).
|
||||
|
||||
This module provides a fail-closed, context-local secret scope:
|
||||
|
||||
- ``set_secret_scope(mapping)`` installs the active profile's secrets for the
|
||||
current task (a contextvar, so it propagates into the agent's worker thread
|
||||
via ``copy_context()`` exactly like the HERMES_HOME override).
|
||||
- ``get_secret(name)`` reads from that scope. When multiplexing is **active**
|
||||
and no scope is set, it RAISES rather than silently falling back to
|
||||
``os.environ`` — an un-migrated or newly-added call site fails loud at that
|
||||
exact line instead of leaking another profile's value. When multiplexing is
|
||||
**off** (the default), it transparently reads ``os.environ`` so the
|
||||
single-profile gateway and every non-gateway caller behave exactly as before.
|
||||
|
||||
Design rationale lives in ``docs/design/multiplexing-gateway.md`` (Workstream A).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from contextvars import ContextVar, Token
|
||||
from pathlib import Path
|
||||
from typing import Dict, Mapping, Optional
|
||||
|
||||
|
||||
# ── multiplex-active flag ────────────────────────────────────────────────
|
||||
# Process-global: set once at gateway startup when gateway.multiplex_profiles
|
||||
# is true. Governs whether get_secret() fails closed on an unscoped read.
|
||||
# A plain module global (not a contextvar): it describes the deployment mode,
|
||||
# not a per-task value.
|
||||
_MULTIPLEX_ACTIVE: bool = False
|
||||
|
||||
|
||||
def set_multiplex_active(active: bool) -> None:
|
||||
"""Mark whether the process is running as a profile multiplexer.
|
||||
|
||||
Called once at gateway startup. When True, ``get_secret`` fails closed on
|
||||
an unscoped read instead of falling back to ``os.environ``.
|
||||
"""
|
||||
global _MULTIPLEX_ACTIVE
|
||||
_MULTIPLEX_ACTIVE = bool(active)
|
||||
|
||||
|
||||
def is_multiplex_active() -> bool:
|
||||
"""Return whether the process is running as a profile multiplexer."""
|
||||
return _MULTIPLEX_ACTIVE
|
||||
|
||||
|
||||
# ── the secret scope contextvar ──────────────────────────────────────────
|
||||
_SECRET_SCOPE: ContextVar[Optional[Mapping[str, str]]] = ContextVar(
|
||||
"_SECRET_SCOPE", default=None
|
||||
)
|
||||
|
||||
|
||||
class UnscopedSecretError(RuntimeError):
|
||||
"""Raised when a secret is read in multiplex mode with no scope installed.
|
||||
|
||||
This is the fail-closed signal: it means a credential read reached
|
||||
``get_secret`` without a profile scope active, which in a multiplexer would
|
||||
otherwise leak whichever profile's value happened to be in ``os.environ``.
|
||||
The fix is to wrap the call path in ``set_secret_scope(...)`` (the per-turn
|
||||
/ per-adapter profile scope), not to widen the allowlist.
|
||||
"""
|
||||
|
||||
|
||||
def set_secret_scope(secrets: Optional[Mapping[str, str]]) -> Token:
|
||||
"""Install the active profile's secret mapping for the current context.
|
||||
|
||||
Returns a token for ``reset_secret_scope``. Pass ``None`` to clear.
|
||||
"""
|
||||
return _SECRET_SCOPE.set(secrets)
|
||||
|
||||
|
||||
def reset_secret_scope(token: Token) -> None:
|
||||
"""Restore the previous secret scope."""
|
||||
_SECRET_SCOPE.reset(token)
|
||||
|
||||
|
||||
def current_secret_scope() -> Optional[Mapping[str, str]]:
|
||||
"""Return the active secret mapping, or None when no scope is installed."""
|
||||
return _SECRET_SCOPE.get()
|
||||
|
||||
|
||||
# ── genuinely-global env vars (NOT per-profile secrets) ──────────────────
|
||||
# These are process/deployment-level settings, not profile credentials. They
|
||||
# legitimately live in os.environ and must keep reading from it even in
|
||||
# multiplex mode — routing them through the fail-closed path would wrongly
|
||||
# crash. Anything matching is read from os.environ regardless of scope.
|
||||
#
|
||||
# Membership test is by exact name OR prefix (see _is_global_env). Keep this
|
||||
# list tight: when in doubt a value is a profile secret, not a global.
|
||||
_GLOBAL_ENV_EXACT = frozenset({
|
||||
# Hermes runtime / deployment
|
||||
"HERMES_HOME", "HERMES_PROFILE", "HERMES_GATEWAY_LOCK_DIR",
|
||||
"HERMES_MAX_ITERATIONS", "HERMES_MAX_TOKENS", "HERMES_API_TIMEOUT",
|
||||
"HERMES_REDACT_SECRETS", "HERMES_NOUS_TIMEOUT_SECONDS",
|
||||
"_HERMES_GATEWAY",
|
||||
# OS / interpreter
|
||||
"PATH", "HOME", "USER", "LANG", "LC_ALL", "TZ", "PWD", "SHELL", "TMPDIR",
|
||||
"VIRTUAL_ENV", "PYTHONPATH", "SSL_CERT_FILE",
|
||||
# Kanban paths (per-board, not per-profile-secret)
|
||||
"HERMES_KANBAN_DB", "HERMES_KANBAN_WORKSPACES_ROOT", "HERMES_KANBAN_BOARD",
|
||||
})
|
||||
_GLOBAL_ENV_PREFIXES = (
|
||||
"HERMES_KANBAN_",
|
||||
"HERMES_TELEGRAM_", # tuning knobs (batch delays, fallback toggles) — NOT the token
|
||||
"TERMINAL_", # terminal/sandbox backend settings
|
||||
)
|
||||
|
||||
|
||||
def _is_global_env(name: str) -> bool:
|
||||
"""Return True for genuinely process-global (non-profile-secret) env vars."""
|
||||
if name in _GLOBAL_ENV_EXACT:
|
||||
return True
|
||||
return any(name.startswith(p) for p in _GLOBAL_ENV_PREFIXES)
|
||||
|
||||
|
||||
def get_secret(name: str, default: Optional[str] = None) -> Optional[str]:
|
||||
"""Resolve a credential by env-var name, honoring the active profile scope.
|
||||
|
||||
Resolution order:
|
||||
|
||||
1. Genuinely-global vars (``_is_global_env``) always read ``os.environ`` —
|
||||
they are deployment settings, not profile secrets.
|
||||
2. When a secret scope is installed (multiplexed turn), read from it; an
|
||||
absent key returns ``default``. The scope is authoritative — we do NOT
|
||||
fall through to ``os.environ``, because in a multiplexer ``os.environ``
|
||||
may hold another profile's value.
|
||||
3. No scope installed:
|
||||
- multiplex INACTIVE (default deployment): read ``os.environ`` —
|
||||
identical to the legacy ``os.getenv`` behavior every caller had before.
|
||||
- multiplex ACTIVE: FAIL CLOSED. Raise ``UnscopedSecretError`` so the
|
||||
missing scope is caught loudly instead of leaking a cross-profile value.
|
||||
"""
|
||||
if _is_global_env(name):
|
||||
val = os.environ.get(name)
|
||||
return val if val is not None else default
|
||||
|
||||
scope = _SECRET_SCOPE.get()
|
||||
if scope is not None:
|
||||
val = scope.get(name)
|
||||
return val if val is not None else default
|
||||
|
||||
if _MULTIPLEX_ACTIVE:
|
||||
raise UnscopedSecretError(
|
||||
f"get_secret({name!r}) called with no profile secret scope active "
|
||||
f"while multiplexing is on. This credential read must run inside a "
|
||||
f"set_secret_scope(...) block (the per-turn / per-adapter profile "
|
||||
f"scope). Reading os.environ here would risk leaking another "
|
||||
f"profile's value. See docs/design/multiplexing-gateway.md "
|
||||
f"(Workstream A)."
|
||||
)
|
||||
|
||||
val = os.environ.get(name)
|
||||
return val if val is not None else default
|
||||
|
||||
|
||||
def load_env_file(env_path: Path) -> Dict[str, str]:
|
||||
"""Parse a ``.env`` file into a plain dict WITHOUT touching ``os.environ``.
|
||||
|
||||
Used to load a profile's secrets into an isolated mapping for
|
||||
``set_secret_scope``. Mirrors python-dotenv's basic parsing (KEY=VALUE,
|
||||
``export`` prefix, ``#`` comments, optional matching quotes) but never
|
||||
mutates the process environment — that isolation is the whole point.
|
||||
"""
|
||||
secrets: Dict[str, str] = {}
|
||||
try:
|
||||
text = env_path.read_text(encoding="utf-8")
|
||||
except (FileNotFoundError, OSError, UnicodeDecodeError):
|
||||
return secrets
|
||||
|
||||
for raw in text.splitlines():
|
||||
line = raw.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if line.startswith("export "):
|
||||
line = line[len("export "):].lstrip()
|
||||
if "=" not in line:
|
||||
continue
|
||||
key, _, value = line.partition("=")
|
||||
key = key.strip()
|
||||
if not key:
|
||||
continue
|
||||
value = value.strip()
|
||||
if len(value) >= 2 and value[0] == value[-1] and value[0] in ("'", '"'):
|
||||
value = value[1:-1]
|
||||
secrets[key] = value
|
||||
|
||||
return secrets
|
||||
|
||||
|
||||
def build_profile_secret_scope(hermes_home: Path) -> Dict[str, str]:
|
||||
"""Build a profile's secret mapping from its ``<home>/.env``.
|
||||
|
||||
Returns a fresh dict (safe to install via ``set_secret_scope``). Genuinely
|
||||
global vars are intentionally NOT copied in — ``get_secret`` reads those
|
||||
from ``os.environ`` directly, so the scope holds only profile secrets.
|
||||
"""
|
||||
return load_env_file(Path(hermes_home) / ".env")
|
||||
|
||||
@@ -545,6 +545,13 @@ class GatewayConfig:
|
||||
thread_sessions_per_user: bool = False # When False (default), threads are shared across all participants
|
||||
max_concurrent_sessions: Optional[int] = None # Positive int caps simultaneous active chat sessions
|
||||
|
||||
# Multi-profile multiplexing (opt-in; default off preserves one-gateway-per-profile).
|
||||
# When True, the default profile's gateway serves inbound messages for every
|
||||
# profile on the host: profiles are stamped into session keys and (in later
|
||||
# phases) per-profile adapters/credentials are resolved. When False, the
|
||||
# gateway behaves exactly as before — single HERMES_HOME, no profile stamping.
|
||||
multiplex_profiles: bool = False
|
||||
|
||||
# Unauthorized DM policy
|
||||
unauthorized_dm_behavior: str = "pair" # "pair" or "ignore"
|
||||
|
||||
@@ -650,6 +657,7 @@ class GatewayConfig:
|
||||
"group_sessions_per_user": self.group_sessions_per_user,
|
||||
"thread_sessions_per_user": self.thread_sessions_per_user,
|
||||
"max_concurrent_sessions": self.max_concurrent_sessions,
|
||||
"multiplex_profiles": self.multiplex_profiles,
|
||||
"unauthorized_dm_behavior": self.unauthorized_dm_behavior,
|
||||
"streaming": self.streaming.to_dict(),
|
||||
"session_store_max_age_days": self.session_store_max_age_days,
|
||||
@@ -695,7 +703,12 @@ class GatewayConfig:
|
||||
|
||||
group_sessions_per_user = data.get("group_sessions_per_user")
|
||||
thread_sessions_per_user = data.get("thread_sessions_per_user")
|
||||
multiplex_profiles = data.get("multiplex_profiles")
|
||||
nested_gateway = data.get("gateway") if isinstance(data.get("gateway"), dict) else {}
|
||||
if multiplex_profiles is None and isinstance(nested_gateway, dict):
|
||||
# Also honor gateway.multiplex_profiles written by
|
||||
# ``hermes config set gateway.multiplex_profiles true``.
|
||||
multiplex_profiles = nested_gateway.get("multiplex_profiles")
|
||||
if "max_concurrent_sessions" in data:
|
||||
max_concurrent_raw = data.get("max_concurrent_sessions")
|
||||
max_concurrent_key = "max_concurrent_sessions"
|
||||
@@ -732,6 +745,7 @@ class GatewayConfig:
|
||||
stt_enabled=_coerce_bool(stt_enabled, True),
|
||||
group_sessions_per_user=_coerce_bool(group_sessions_per_user, True),
|
||||
thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False),
|
||||
multiplex_profiles=_coerce_bool(multiplex_profiles, False),
|
||||
max_concurrent_sessions=max_concurrent_sessions,
|
||||
unauthorized_dm_behavior=unauthorized_dm_behavior,
|
||||
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
|
||||
@@ -823,6 +837,13 @@ def load_gateway_config() -> GatewayConfig:
|
||||
if "thread_sessions_per_user" in yaml_cfg:
|
||||
gw_data["thread_sessions_per_user"] = yaml_cfg["thread_sessions_per_user"]
|
||||
|
||||
# Multiplexing flag: accept both the top-level key and the nested
|
||||
# gateway.multiplex_profiles form (from_dict resolves the nested
|
||||
# fallback, but surface the top-level key here for parity with the
|
||||
# other session-scope flags above).
|
||||
if "multiplex_profiles" in yaml_cfg:
|
||||
gw_data["multiplex_profiles"] = yaml_cfg["multiplex_profiles"]
|
||||
|
||||
gateway_section = yaml_cfg.get("gateway")
|
||||
if isinstance(gateway_section, dict) and "max_concurrent_sessions" in gateway_section:
|
||||
gw_data["max_concurrent_sessions"] = gateway_section["max_concurrent_sessions"]
|
||||
|
||||
@@ -57,6 +57,11 @@ from gateway.platforms.base import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Sentinel returned by _resolve_request_profile when a /p/<profile>/ prefix
|
||||
# names a profile this gateway does not serve (→ 404). Distinct from None
|
||||
# (no prefix / multiplexing off → handle as the default profile).
|
||||
_PROFILE_REJECTED = object()
|
||||
|
||||
_BUILTIN_DELIVER_PLATFORMS = {
|
||||
"telegram", "discord", "slack", "signal", "sms", "whatsapp",
|
||||
"matrix", "mattermost", "homeassistant", "email", "dingtalk",
|
||||
@@ -189,6 +194,14 @@ class WebhookAdapter(BasePlatformAdapter):
|
||||
app = web.Application()
|
||||
app.router.add_get("/health", self._handle_health)
|
||||
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
|
||||
# Multi-profile multiplexing: a /p/<profile>/webhooks/<route> prefix
|
||||
# routes the inbound event to that profile. Same handler; the profile is
|
||||
# captured from the path and stamped onto the SessionSource so the agent
|
||||
# turn resolves that profile's config/skills/credentials. Only honored
|
||||
# when gateway.multiplex_profiles is on (the handler validates).
|
||||
app.router.add_post(
|
||||
"/p/{profile}/webhooks/{route_name}", self._handle_webhook
|
||||
)
|
||||
|
||||
# Port conflict detection — fail fast if port is already in use
|
||||
import socket as _socket
|
||||
@@ -397,6 +410,35 @@ class WebhookAdapter(BasePlatformAdapter):
|
||||
except Exception as e:
|
||||
logger.error("[webhook] Failed to reload dynamic routes: %s", e)
|
||||
|
||||
def _resolve_request_profile(self, request: "web.Request"):
|
||||
"""Resolve + validate the /p/<profile>/ URL prefix on a webhook request.
|
||||
|
||||
Returns:
|
||||
- ``None`` when no profile prefix is present, or multiplexing is off
|
||||
(the prefix is ignored, request handled as the default profile).
|
||||
- the profile name (str) when present, multiplexing is on, and the
|
||||
profile is one this gateway serves.
|
||||
- ``_PROFILE_REJECTED`` when a prefix is present but the profile is
|
||||
unknown/unconfigured (handler returns 404).
|
||||
"""
|
||||
profile = (request.match_info.get("profile") or "").strip()
|
||||
if not profile:
|
||||
return None
|
||||
runner = self.gateway_runner
|
||||
cfg = getattr(runner, "config", None)
|
||||
if not getattr(cfg, "multiplex_profiles", False):
|
||||
# Prefix supplied but multiplexing is off — ignore it, behave as
|
||||
# the single-profile gateway (don't 404 a would-be valid route).
|
||||
return None
|
||||
try:
|
||||
from hermes_cli.profiles import profiles_to_serve
|
||||
served = {name for name, _ in profiles_to_serve(multiplex=True)}
|
||||
except Exception:
|
||||
return _PROFILE_REJECTED
|
||||
if profile not in served:
|
||||
return _PROFILE_REJECTED
|
||||
return profile
|
||||
|
||||
async def _handle_webhook(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /webhooks/{route_name} — receive and process a webhook event."""
|
||||
# Hot-reload dynamic subscriptions on each request (mtime-gated, cheap)
|
||||
@@ -405,6 +447,13 @@ class WebhookAdapter(BasePlatformAdapter):
|
||||
route_name = request.match_info.get("route_name", "")
|
||||
route_config = self._routes.get(route_name)
|
||||
|
||||
# Multi-profile: resolve + validate the /p/<profile>/ prefix if present.
|
||||
profile = self._resolve_request_profile(request)
|
||||
if profile is _PROFILE_REJECTED:
|
||||
return web.json_response(
|
||||
{"error": "Unknown or unconfigured profile"}, status=404
|
||||
)
|
||||
|
||||
if not route_config:
|
||||
return web.json_response(
|
||||
{"error": f"Unknown route: {route_name}"}, status=404
|
||||
@@ -641,6 +690,8 @@ class WebhookAdapter(BasePlatformAdapter):
|
||||
user_id=f"webhook:{route_name}",
|
||||
user_name=route_name,
|
||||
)
|
||||
if profile and isinstance(profile, str):
|
||||
source.profile = profile
|
||||
event = MessageEvent(
|
||||
text=prompt,
|
||||
message_type=MessageType.TEXT,
|
||||
|
||||
382
gateway/run.py
382
gateway/run.py
@@ -1173,13 +1173,31 @@ def _reload_runtime_env_preserving_config_authority() -> None:
|
||||
pick up rotated API keys. config.yaml remains authoritative for agent budget
|
||||
settings such as agent.max_turns; otherwise a stale HERMES_MAX_ITERATIONS in
|
||||
.env can replace the startup bridge on later turns.
|
||||
|
||||
In multiplex mode this is a NO-OP for the credential reload: secrets come
|
||||
from the per-turn ``set_secret_scope`` (installed by ``_profile_runtime_scope``)
|
||||
which loads the routed profile's ``.env`` into an isolated mapping. Mutating
|
||||
the process-global ``os.environ`` here would defeat that isolation and leak
|
||||
the default profile's keys to every profile's turns and subprocesses.
|
||||
"""
|
||||
from agent.secret_scope import is_multiplex_active
|
||||
if is_multiplex_active():
|
||||
# Credentials are resolved from the active profile's secret scope, not
|
||||
# os.environ. Still honor config.yaml's agent.max_turns bridge below
|
||||
# using the scoped home, but never reload .env into global env.
|
||||
_bridge_max_turns_from_config(_hermes_home)
|
||||
return
|
||||
|
||||
load_hermes_dotenv(
|
||||
hermes_home=_hermes_home,
|
||||
project_env=Path(__file__).resolve().parents[1] / '.env',
|
||||
)
|
||||
_bridge_max_turns_from_config(_hermes_home)
|
||||
|
||||
config_path = _hermes_home / 'config.yaml'
|
||||
|
||||
def _bridge_max_turns_from_config(home: "Path") -> None:
|
||||
"""Bridge config.yaml agent.max_turns into HERMES_MAX_ITERATIONS (a global)."""
|
||||
config_path = home / 'config.yaml'
|
||||
if not config_path.exists():
|
||||
return
|
||||
try:
|
||||
@@ -1196,6 +1214,71 @@ def _reload_runtime_env_preserving_config_authority() -> None:
|
||||
os.environ["HERMES_MAX_ITERATIONS"] = str(agent_cfg["max_turns"])
|
||||
|
||||
|
||||
from contextlib import contextmanager as _contextmanager
|
||||
|
||||
|
||||
# Platforms that bind a host TCP port (HTTP/webhook listeners). In a profile
|
||||
# multiplexer the default profile owns the single shared listener and serves
|
||||
# every profile through the /p/<profile>/ URL prefix, so a SECONDARY profile
|
||||
# enabling one of these is always a misconfiguration: it would try to bind a
|
||||
# port already held by the default's listener. We hard-error on it rather than
|
||||
# silently dropping the adapter (see _start_one_profile_adapters).
|
||||
# Stored as platform .value strings since the Platform enum is imported below.
|
||||
_PORT_BINDING_PLATFORM_VALUES = frozenset({
|
||||
"webhook",
|
||||
"api_server",
|
||||
"msgraph_webhook",
|
||||
"feishu",
|
||||
"wecom_callback",
|
||||
"bluebubbles",
|
||||
"sms",
|
||||
})
|
||||
|
||||
|
||||
class MultiplexConfigError(RuntimeError):
|
||||
"""A profile multiplexer config is invalid (fail-fast at startup).
|
||||
|
||||
Distinct from a transient adapter-connect failure: a transient error is
|
||||
logged and the gateway stays alive to retry, but a config error means the
|
||||
operator must fix config.yaml, so it aborts startup cleanly.
|
||||
"""
|
||||
|
||||
|
||||
@_contextmanager
|
||||
def _profile_runtime_scope(profile_home: "Path"):
|
||||
"""Scope config/skills/memory AND credentials to a profile for one turn.
|
||||
|
||||
Combines the two seams the multiplexer needs:
|
||||
1. ``set_hermes_home_override`` — redirects ``get_hermes_home()`` (config,
|
||||
skills, memory, SOUL, sessions) to the profile's home. Contextvar, so
|
||||
it propagates into the agent worker thread via ``copy_context()``.
|
||||
2. ``set_secret_scope`` — installs the profile's ``.env`` secrets as the
|
||||
authoritative credential source, so ``get_secret`` reads this profile's
|
||||
keys and never the process-global ``os.environ`` (which in a
|
||||
multiplexer may hold another profile's values).
|
||||
|
||||
Only used on the multiplexed inbound path. Single-profile gateways never
|
||||
enter this scope, so their behavior is unchanged. Loading the profile's
|
||||
``.env`` here does NOT mutate ``os.environ`` — ``build_profile_secret_scope``
|
||||
returns an isolated dict — which is what keeps subprocesses (MCP, kanban)
|
||||
from inheriting cross-profile secrets.
|
||||
"""
|
||||
from hermes_constants import set_hermes_home_override, reset_hermes_home_override
|
||||
from agent.secret_scope import (
|
||||
build_profile_secret_scope,
|
||||
set_secret_scope,
|
||||
reset_secret_scope,
|
||||
)
|
||||
|
||||
home_token = set_hermes_home_override(str(profile_home))
|
||||
secret_token = set_secret_scope(build_profile_secret_scope(Path(profile_home)))
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
reset_secret_scope(secret_token)
|
||||
reset_hermes_home_override(home_token)
|
||||
|
||||
|
||||
_DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P<host>.+):(?P<container>/[^:]+?)(?::(?P<options>[^:]+))?$")
|
||||
_DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"}
|
||||
|
||||
@@ -2240,7 +2323,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
def __init__(self, config: Optional[GatewayConfig] = None):
|
||||
global _gateway_runner_ref
|
||||
self.config = config or load_gateway_config()
|
||||
# Mark the process as a profile multiplexer when configured. This flips
|
||||
# agent.secret_scope.get_secret() to fail-closed on any unscoped
|
||||
# credential read, so a missed migration crashes loudly instead of
|
||||
# leaking a cross-profile value (Workstream A). Inert when off.
|
||||
try:
|
||||
from agent.secret_scope import set_multiplex_active
|
||||
set_multiplex_active(bool(getattr(self.config, "multiplex_profiles", False)))
|
||||
except Exception:
|
||||
logger.debug("could not set multiplex-active flag", exc_info=True)
|
||||
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
|
||||
# Multi-profile multiplexing: adapters for NON-default profiles live
|
||||
# here, keyed by profile name then Platform. self.adapters stays the
|
||||
# default/active profile's map so the ~93 existing self.adapters[...]
|
||||
# sites are untouched when multiplexing is off (this dict is empty).
|
||||
# Populated by _start_secondary_profile_adapters().
|
||||
self._profile_adapters: Dict[str, Dict[Platform, BasePlatformAdapter]] = {}
|
||||
self._warn_if_docker_media_delivery_is_risky()
|
||||
_gateway_runner_ref = _weakref.ref(self)
|
||||
|
||||
@@ -2792,10 +2890,24 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
except Exception:
|
||||
pass
|
||||
config = getattr(self, "config", None)
|
||||
# Mirror SessionStore._resolve_profile_for_key so this fallback path
|
||||
# produces the same namespace as the primary path: None (legacy
|
||||
# agent:main) unless multiplexing is on, then the active profile.
|
||||
_profile = None
|
||||
if getattr(config, "multiplex_profiles", False):
|
||||
if source.profile:
|
||||
_profile = source.profile
|
||||
else:
|
||||
try:
|
||||
from hermes_cli.profiles import get_active_profile_name
|
||||
_profile = get_active_profile_name() or "default"
|
||||
except Exception:
|
||||
_profile = None
|
||||
return build_session_key(
|
||||
source,
|
||||
group_sessions_per_user=getattr(config, "group_sessions_per_user", True),
|
||||
thread_sessions_per_user=getattr(config, "thread_sessions_per_user", False),
|
||||
profile=_profile,
|
||||
)
|
||||
|
||||
def _telegram_topic_mode_enabled(self, source: SessionSource) -> bool:
|
||||
@@ -5324,7 +5436,30 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
"attempts": 1,
|
||||
"next_retry": time.monotonic() + 30,
|
||||
}
|
||||
|
||||
|
||||
# Multi-profile multiplexing: bring up adapters for every OTHER profile
|
||||
# this gateway serves. Each profile's adapters connect under that
|
||||
# profile's home + credential scope and stamp their inbound events with
|
||||
# the profile so the agent turn resolves correctly. No-op when off.
|
||||
try:
|
||||
_secondary_connected = await self._start_secondary_profile_adapters()
|
||||
connected_count += _secondary_connected
|
||||
except MultiplexConfigError as e:
|
||||
# Invalid multiplexer config — abort startup cleanly so the operator
|
||||
# fixes config.yaml rather than running a half-wired gateway.
|
||||
reason = str(e)
|
||||
logger.error("Gateway multiplexer config error: %s", reason)
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
write_runtime_status(gateway_state="startup_failed", exit_reason=reason)
|
||||
except Exception:
|
||||
pass
|
||||
self._request_clean_exit(reason)
|
||||
self._startup_restore_in_progress = False
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error("Secondary-profile adapter startup failed: %s", e, exc_info=True)
|
||||
|
||||
if connected_count == 0:
|
||||
if startup_nonretryable_errors:
|
||||
reason = "; ".join(startup_nonretryable_errors)
|
||||
@@ -6331,6 +6466,22 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
time.monotonic() - _adapter_started_at,
|
||||
e,
|
||||
)
|
||||
|
||||
# Disconnect secondary-profile adapters (multiplex mode).
|
||||
for _prof, _amap in list(getattr(self, "_profile_adapters", {}).items()):
|
||||
for platform, adapter in list(_amap.items()):
|
||||
try:
|
||||
await adapter.cancel_background_tasks()
|
||||
except Exception as e:
|
||||
logger.debug("✗ %s bg-cancel error (profile %s): %s", platform.value, _prof, e)
|
||||
try:
|
||||
await adapter.disconnect()
|
||||
logger.info("✓ %s disconnected (profile: %s)", platform.value, _prof)
|
||||
except Exception as e:
|
||||
logger.error("✗ %s disconnect error (profile %s): %s", platform.value, _prof, e)
|
||||
_amap.clear()
|
||||
if hasattr(self, "_profile_adapters"):
|
||||
self._profile_adapters.clear()
|
||||
logger.info(
|
||||
"Shutdown phase: all adapters disconnected at +%.2fs",
|
||||
_phase_elapsed(),
|
||||
@@ -6500,6 +6651,175 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
"""Wait for shutdown signal."""
|
||||
await self._shutdown_event.wait()
|
||||
|
||||
async def _start_secondary_profile_adapters(self) -> int:
|
||||
"""Bring up adapters for every non-active profile this gateway serves.
|
||||
|
||||
Returns the number of secondary adapters that connected. No-op (returns
|
||||
0) unless ``gateway.multiplex_profiles`` is on.
|
||||
|
||||
Each profile's adapters are created and connected under that profile's
|
||||
HERMES_HOME + secret scope (``_profile_runtime_scope``), stored in
|
||||
``self._profile_adapters[profile]``, and given a message handler that
|
||||
stamps ``source.profile`` before delegating to the shared
|
||||
``_handle_message`` — so the agent turn resolves that profile's config,
|
||||
skills, and credentials. Same-platform credential collisions (two
|
||||
profiles polling the same bot token) are detected and refused here, the
|
||||
only point that sees every profile's resolved credentials together.
|
||||
"""
|
||||
if not getattr(self.config, "multiplex_profiles", False):
|
||||
return 0
|
||||
|
||||
try:
|
||||
from hermes_cli.profiles import profiles_to_serve, get_active_profile_name
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
active = get_active_profile_name() or "default"
|
||||
connected = 0
|
||||
# (platform, token-fingerprint) -> profile that claimed it. Detects two
|
||||
# profiles trying to poll the same bot credential (impossible to do
|
||||
# concurrently). Seed with the active profile's adapters.
|
||||
claimed: Dict[tuple, str] = {}
|
||||
for _plat, _ad in self.adapters.items():
|
||||
fp = self._adapter_credential_fingerprint(_ad)
|
||||
if fp is not None:
|
||||
claimed[(_plat, fp)] = active
|
||||
|
||||
for profile_name, profile_home in profiles_to_serve(multiplex=True):
|
||||
if profile_name == active:
|
||||
continue # handled by the primary startup loop
|
||||
try:
|
||||
connected += await self._start_one_profile_adapters(
|
||||
profile_name, profile_home, claimed
|
||||
)
|
||||
except MultiplexConfigError:
|
||||
# Config error (e.g. a secondary profile binding a port) is not
|
||||
# transient — propagate so startup aborts cleanly instead of
|
||||
# limping along with a half-configured multiplexer.
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to start adapters for profile '%s': %s",
|
||||
profile_name, e, exc_info=True,
|
||||
)
|
||||
|
||||
# Record served profiles in runtime status for `hermes status`.
|
||||
try:
|
||||
from gateway.status import write_runtime_status
|
||||
served = [active] + sorted(self._profile_adapters.keys())
|
||||
write_runtime_status(served_profiles=served)
|
||||
except Exception:
|
||||
logger.debug("could not record served_profiles", exc_info=True)
|
||||
|
||||
return connected
|
||||
|
||||
async def _start_one_profile_adapters(
|
||||
self, profile_name: str, profile_home: "Path", claimed: Dict[tuple, str]
|
||||
) -> int:
|
||||
"""Create+connect one profile's adapters under its runtime scope."""
|
||||
from gateway.config import load_gateway_config
|
||||
|
||||
with _profile_runtime_scope(profile_home):
|
||||
profile_cfg = load_gateway_config()
|
||||
|
||||
profile_map = self._profile_adapters.setdefault(profile_name, {})
|
||||
connected = 0
|
||||
for platform, platform_config in profile_cfg.platforms.items():
|
||||
if not platform_config.enabled:
|
||||
continue
|
||||
# A secondary profile must NOT enable a port-binding platform: the
|
||||
# default profile's listener already serves every profile via the
|
||||
# /p/<profile>/ prefix, so a second bind can only collide. This is a
|
||||
# config error, not a transient failure — fail fast and loud.
|
||||
if platform.value in _PORT_BINDING_PLATFORM_VALUES:
|
||||
raise MultiplexConfigError(
|
||||
f"Profile '{profile_name}' enables the port-binding platform "
|
||||
f"'{platform.value}', but gateway.multiplex_profiles is on. The "
|
||||
f"default profile owns the single shared HTTP listener and "
|
||||
f"serves every profile through the /p/{profile_name}/ URL "
|
||||
f"prefix — a secondary profile cannot bind its own port. "
|
||||
f"Remove platforms.{platform.value} from profile "
|
||||
f"'{profile_name}'s config.yaml (configure it only on the "
|
||||
f"default profile)."
|
||||
)
|
||||
with _profile_runtime_scope(profile_home):
|
||||
adapter = self._create_adapter(platform, platform_config)
|
||||
if not adapter:
|
||||
continue
|
||||
|
||||
# Same-token conflict detection — refuse a duplicate poll.
|
||||
fp = self._adapter_credential_fingerprint(adapter)
|
||||
if fp is not None:
|
||||
owner = claimed.get((platform, fp))
|
||||
if owner is not None:
|
||||
logger.error(
|
||||
"Profile '%s' and '%s' both configure %s with the same "
|
||||
"credential — refusing to start the duplicate (a single "
|
||||
"bot token cannot be polled twice). Give each profile its "
|
||||
"own %s credential.",
|
||||
owner, profile_name, platform.value, platform.value,
|
||||
)
|
||||
await self._safe_adapter_disconnect(adapter, platform)
|
||||
continue
|
||||
claimed[(platform, fp)] = profile_name
|
||||
|
||||
# Stamp every inbound event from this adapter with its profile so
|
||||
# the agent turn (and session key) resolve to the right home.
|
||||
adapter.set_message_handler(
|
||||
self._make_profile_message_handler(profile_name)
|
||||
)
|
||||
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
|
||||
adapter.set_session_store(self.session_store)
|
||||
adapter.set_busy_session_handler(self._handle_active_session_busy_message)
|
||||
adapter.set_topic_recovery_fn(self._recover_telegram_topic_thread_id)
|
||||
adapter._busy_text_mode = self._busy_text_mode
|
||||
|
||||
try:
|
||||
with _profile_runtime_scope(profile_home):
|
||||
success = await self._connect_adapter_with_timeout(adapter, platform)
|
||||
if success:
|
||||
profile_map[platform] = adapter
|
||||
connected += 1
|
||||
logger.info("✓ %s connected (profile: %s)", platform.value, profile_name)
|
||||
else:
|
||||
logger.warning("✗ %s failed to connect (profile: %s)", platform.value, profile_name)
|
||||
await self._safe_adapter_disconnect(adapter, platform)
|
||||
except Exception as e:
|
||||
logger.error("✗ %s error (profile: %s): %s", platform.value, profile_name, e)
|
||||
await self._safe_adapter_disconnect(adapter, platform)
|
||||
return connected
|
||||
|
||||
def _make_profile_message_handler(self, profile_name: str):
|
||||
"""Return a message handler that stamps source.profile then delegates."""
|
||||
async def _handler(event):
|
||||
try:
|
||||
if getattr(event, "source", None) is not None and not event.source.profile:
|
||||
event.source.profile = profile_name
|
||||
except Exception:
|
||||
pass
|
||||
return await self._handle_message(event)
|
||||
return _handler
|
||||
|
||||
@staticmethod
|
||||
def _adapter_credential_fingerprint(adapter: Any) -> Optional[str]:
|
||||
"""Return a stable, log-safe fingerprint of an adapter's credential.
|
||||
|
||||
Used only to detect two profiles claiming the same bot token. Returns a
|
||||
salted hash (never the token itself) of the adapter's primary
|
||||
credential, or None when no credential is discoverable (in which case
|
||||
we don't attempt conflict detection for it).
|
||||
"""
|
||||
token = None
|
||||
for attr in ("token", "bot_token", "_token", "api_token", "_bot_token"):
|
||||
val = getattr(adapter, attr, None)
|
||||
if isinstance(val, str) and val.strip():
|
||||
token = val.strip()
|
||||
break
|
||||
if not token:
|
||||
return None
|
||||
import hashlib
|
||||
return hashlib.sha256(("hermes-mux:" + token).encode("utf-8")).hexdigest()[:16]
|
||||
|
||||
def _create_adapter(
|
||||
self,
|
||||
platform: Platform,
|
||||
@@ -13729,6 +14049,64 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
channel_prompt: Optional[str] = None,
|
||||
persist_user_message: Optional[str] = None,
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Profile-scoping wrapper around the agent run.
|
||||
|
||||
When multiplexing is active, resolve the inbound source's profile and
|
||||
run the whole turn inside ``_profile_runtime_scope`` so config/skills/
|
||||
memory resolve to that profile's home AND credentials resolve from that
|
||||
profile's secret scope (never the process-global ``os.environ``). When
|
||||
multiplexing is off this is a transparent pass-through — zero behavior
|
||||
change for single-profile gateways.
|
||||
"""
|
||||
if not getattr(getattr(self, "config", None), "multiplex_profiles", False):
|
||||
return await self._run_agent_inner(
|
||||
message, context_prompt, history, source, session_id,
|
||||
session_key=session_key, run_generation=run_generation,
|
||||
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
|
||||
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
|
||||
persist_user_timestamp=persist_user_timestamp,
|
||||
)
|
||||
|
||||
profile_home = self._resolve_profile_home_for_source(source)
|
||||
with _profile_runtime_scope(profile_home):
|
||||
return await self._run_agent_inner(
|
||||
message, context_prompt, history, source, session_id,
|
||||
session_key=session_key, run_generation=run_generation,
|
||||
_interrupt_depth=_interrupt_depth, event_message_id=event_message_id,
|
||||
channel_prompt=channel_prompt, persist_user_message=persist_user_message,
|
||||
persist_user_timestamp=persist_user_timestamp,
|
||||
)
|
||||
|
||||
def _resolve_profile_home_for_source(self, source: SessionSource) -> "Path":
|
||||
"""Resolve which profile's HERMES_HOME should serve this inbound source.
|
||||
|
||||
Prefers the profile the source was routed to (``source.profile`` — set
|
||||
by the /p/<profile>/ URL prefix or a per-credential adapter), falling
|
||||
back to the active profile (the multiplexer's own home).
|
||||
"""
|
||||
from hermes_cli.profiles import get_active_profile_name, get_profile_dir
|
||||
try:
|
||||
name = (source.profile or "").strip() or get_active_profile_name() or "default"
|
||||
return get_profile_dir(name)
|
||||
except Exception:
|
||||
from hermes_constants import get_hermes_home
|
||||
return get_hermes_home()
|
||||
|
||||
async def _run_agent_inner(
|
||||
self,
|
||||
message: str,
|
||||
context_prompt: str,
|
||||
history: List[Dict[str, Any]],
|
||||
source: SessionSource,
|
||||
session_id: str,
|
||||
session_key: str = None,
|
||||
run_generation: Optional[int] = None,
|
||||
_interrupt_depth: int = 0,
|
||||
event_message_id: Optional[str] = None,
|
||||
channel_prompt: Optional[str] = None,
|
||||
persist_user_message: Optional[str] = None,
|
||||
persist_user_timestamp: Optional[float] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Run the agent with the given message and context.
|
||||
|
||||
@@ -92,6 +92,11 @@ class SessionSource:
|
||||
parent_chat_id: Optional[str] = None # Parent channel when chat_id refers to a thread
|
||||
message_id: Optional[str] = None # ID of the triggering message (for pin/reply/react)
|
||||
role_authorized: bool = False # True when adapter granted access via role (not user ID)
|
||||
# Profile this inbound message is routed to in a multiplexing gateway
|
||||
# (from the /p/<profile>/ URL prefix or per-credential adapter ownership).
|
||||
# None => the gateway's active/default profile. Drives both session-key
|
||||
# namespacing and the per-turn config/credential scope.
|
||||
profile: Optional[str] = None
|
||||
|
||||
@property
|
||||
def description(self) -> str:
|
||||
@@ -135,6 +140,8 @@ class SessionSource:
|
||||
d["parent_chat_id"] = self.parent_chat_id
|
||||
if self.message_id:
|
||||
d["message_id"] = self.message_id
|
||||
if self.profile:
|
||||
d["profile"] = self.profile
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
@@ -153,6 +160,7 @@ class SessionSource:
|
||||
guild_id=data.get("guild_id"),
|
||||
parent_chat_id=data.get("parent_chat_id"),
|
||||
message_id=data.get("message_id"),
|
||||
profile=data.get("profile"),
|
||||
)
|
||||
|
||||
|
||||
@@ -615,15 +623,41 @@ def is_shared_multi_user_session(
|
||||
return not group_sessions_per_user
|
||||
|
||||
|
||||
def _session_key_namespace(profile: Optional[str]) -> str:
|
||||
"""Return the ``agent:<ns>`` namespace prefix for a session key.
|
||||
|
||||
The historical key format is ``agent:main:<platform>:<chat_type>:...`` where
|
||||
``main`` is a static namespace literal (NOT a branch name — branching keys
|
||||
off ``session_id``, not this slot). Multi-profile multiplexing reuses this
|
||||
slot to carry the profile:
|
||||
|
||||
- default profile (or ``None``/``""``/``"default"``) → ``agent:main`` —
|
||||
BYTE-IDENTICAL to every key ever generated, so existing sessions and all
|
||||
positional parsers (``parts[2]`` == platform, etc.) are unaffected.
|
||||
- named profile ``coder`` → ``agent:coder`` — keeps the same positional
|
||||
layout, just a different namespace, so two profiles serving the same
|
||||
platform/chat never collide.
|
||||
"""
|
||||
if not profile or profile == "default":
|
||||
return "agent:main"
|
||||
return f"agent:{profile}"
|
||||
|
||||
|
||||
def build_session_key(
|
||||
source: SessionSource,
|
||||
group_sessions_per_user: bool = True,
|
||||
thread_sessions_per_user: bool = False,
|
||||
profile: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Build a deterministic session key from a message source.
|
||||
|
||||
This is the single source of truth for session key construction.
|
||||
|
||||
``profile`` selects the key namespace (see :func:`_session_key_namespace`).
|
||||
It defaults to ``None`` ⇒ the legacy ``agent:main`` namespace, so callers
|
||||
that don't multiplex produce byte-identical keys to before. Only the
|
||||
multiplexing gateway passes a non-default profile.
|
||||
|
||||
DM rules:
|
||||
- DMs include chat_id when present, so each private conversation is isolated.
|
||||
- thread_id further differentiates threaded DMs within the same DM chat.
|
||||
@@ -643,6 +677,7 @@ def build_session_key(
|
||||
shared session per chat.
|
||||
- Without identifiers, messages fall back to one session per platform/chat_type.
|
||||
"""
|
||||
ns = _session_key_namespace(profile)
|
||||
platform = source.platform.value
|
||||
if source.chat_type == "dm":
|
||||
dm_chat_id = source.chat_id
|
||||
@@ -651,12 +686,12 @@ def build_session_key(
|
||||
|
||||
if dm_chat_id:
|
||||
if source.thread_id:
|
||||
return f"agent:main:{platform}:dm:{dm_chat_id}:{source.thread_id}"
|
||||
return f"agent:main:{platform}:dm:{dm_chat_id}"
|
||||
return f"{ns}:{platform}:dm:{dm_chat_id}:{source.thread_id}"
|
||||
return f"{ns}:{platform}:dm:{dm_chat_id}"
|
||||
# No chat_id — fall back to the sender's own identifier before the
|
||||
# bare per-platform sink. Without this, every DM from every user that
|
||||
# arrives without a chat_id (non-standard adapters / synthetic sources)
|
||||
# collapses into one shared "agent:main:<platform>:dm" session, and a
|
||||
# collapses into one shared "<ns>:<platform>:dm" session, and a
|
||||
# single cached agent ends up serving multiple people's conversations —
|
||||
# cross-user history bleed. participant_id keeps DMs isolated per user.
|
||||
dm_participant_id = source.user_id_alt or source.user_id
|
||||
@@ -667,11 +702,11 @@ def build_session_key(
|
||||
)
|
||||
if dm_participant_id:
|
||||
if source.thread_id:
|
||||
return f"agent:main:{platform}:dm:{dm_participant_id}:{source.thread_id}"
|
||||
return f"agent:main:{platform}:dm:{dm_participant_id}"
|
||||
return f"{ns}:{platform}:dm:{dm_participant_id}:{source.thread_id}"
|
||||
return f"{ns}:{platform}:dm:{dm_participant_id}"
|
||||
if source.thread_id:
|
||||
return f"agent:main:{platform}:dm:{source.thread_id}"
|
||||
return f"agent:main:{platform}:dm"
|
||||
return f"{ns}:{platform}:dm:{source.thread_id}"
|
||||
return f"{ns}:{platform}:dm"
|
||||
|
||||
participant_id = source.user_id_alt or source.user_id
|
||||
if participant_id and source.platform == Platform.WHATSAPP:
|
||||
@@ -679,7 +714,7 @@ def build_session_key(
|
||||
# single group member gets two isolated per-user sessions when the
|
||||
# bridge reshuffles alias forms.
|
||||
participant_id = canonical_whatsapp_identifier(str(participant_id)) or participant_id
|
||||
key_parts = ["agent:main", platform, source.chat_type]
|
||||
key_parts = [ns, platform, source.chat_type]
|
||||
|
||||
if source.chat_id:
|
||||
key_parts.append(source.chat_id)
|
||||
@@ -775,12 +810,32 @@ class SessionStore:
|
||||
logger.debug("Could not remove temp file %s: %s", tmp_path, e)
|
||||
raise
|
||||
|
||||
def _resolve_profile_for_key(self, source: Optional[SessionSource] = None) -> Optional[str]:
|
||||
"""Return the profile namespace for session keys, or None when off.
|
||||
|
||||
When ``multiplex_profiles`` is disabled (default), returns ``None`` so
|
||||
keys stay in the legacy ``agent:main`` namespace — byte-identical to
|
||||
before. When enabled, prefers the profile the inbound source was routed
|
||||
to (``source.profile`` — set by the /p/<profile>/ URL prefix or
|
||||
per-credential adapter), falling back to the active profile name.
|
||||
"""
|
||||
if not getattr(self.config, "multiplex_profiles", False):
|
||||
return None
|
||||
if source is not None and source.profile:
|
||||
return source.profile
|
||||
try:
|
||||
from hermes_cli.profiles import get_active_profile_name
|
||||
return get_active_profile_name() or "default"
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _generate_session_key(self, source: SessionSource) -> str:
|
||||
"""Generate a session key from a source."""
|
||||
return build_session_key(
|
||||
source,
|
||||
group_sessions_per_user=getattr(self.config, "group_sessions_per_user", True),
|
||||
thread_sessions_per_user=getattr(self.config, "thread_sessions_per_user", False),
|
||||
profile=self._resolve_profile_for_key(source),
|
||||
)
|
||||
|
||||
def _is_session_expired(self, entry: SessionEntry) -> bool:
|
||||
|
||||
@@ -515,6 +515,7 @@ def write_runtime_status(
|
||||
platform_state: Any = _UNSET,
|
||||
error_code: Any = _UNSET,
|
||||
error_message: Any = _UNSET,
|
||||
served_profiles: Any = _UNSET,
|
||||
) -> None:
|
||||
"""Persist gateway runtime health information for diagnostics/status."""
|
||||
path = _get_runtime_status_path()
|
||||
@@ -535,6 +536,11 @@ def write_runtime_status(
|
||||
payload["restart_requested"] = bool(restart_requested)
|
||||
if active_agents is not _UNSET:
|
||||
payload["active_agents"] = max(0, int(active_agents))
|
||||
if served_profiles is not _UNSET:
|
||||
# Profiles this gateway multiplexes (multi-profile mode). Absent/empty
|
||||
# for a single-profile gateway. Lets `hermes status` show per-profile
|
||||
# coverage without a second probe.
|
||||
payload["served_profiles"] = list(served_profiles or [])
|
||||
|
||||
if platform is not _UNSET:
|
||||
platform_payload = payload["platforms"].get(platform, {})
|
||||
|
||||
@@ -3865,6 +3865,86 @@ def _running_under_gateway_supervisor() -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _guard_named_profile_under_multiplexer(force: bool = False) -> None:
|
||||
"""Refuse a named-profile gateway when a multiplexer is already serving it.
|
||||
|
||||
When the default profile's gateway runs with gateway.multiplex_profiles=on,
|
||||
it is the sole inbound process for EVERY profile on the host. Starting a
|
||||
separate gateway for a named profile would double-bind that profile's
|
||||
platforms (two pollers on one bot token, port fights). In that mode a
|
||||
named-profile ``hermes gateway run`` is always a misconfiguration, so we
|
||||
hard-error with a pointer to the multiplexer. ``--force`` overrides.
|
||||
|
||||
Inert unless ALL of: (a) this invocation is a named profile, (b) a default-
|
||||
profile gateway is running, (c) that gateway's config has multiplexing on.
|
||||
"""
|
||||
if force:
|
||||
return
|
||||
# (a) Are we a named profile? Default/custom-hash homes return "".
|
||||
try:
|
||||
suffix = _profile_suffix()
|
||||
except Exception:
|
||||
return
|
||||
if not suffix:
|
||||
return # default profile (or unrecognized) — this guard doesn't apply
|
||||
|
||||
try:
|
||||
from hermes_constants import get_default_hermes_root
|
||||
default_root = get_default_hermes_root()
|
||||
# (b) Is the default-profile gateway running?
|
||||
from gateway.status import get_running_pid as _default_running_pid # noqa
|
||||
except Exception:
|
||||
return
|
||||
|
||||
try:
|
||||
import yaml as _yaml
|
||||
from gateway.status import _read_pid_record # type: ignore
|
||||
|
||||
# (b) default gateway PID file present + alive
|
||||
default_pid_path = default_root / "gateway.pid"
|
||||
rec = _read_pid_record(default_pid_path)
|
||||
if not rec:
|
||||
return
|
||||
from gateway.status import _pid_exists, _pid_from_record
|
||||
pid = _pid_from_record(rec)
|
||||
if not pid or not _pid_exists(pid):
|
||||
return
|
||||
|
||||
# (c) default config has multiplexing on
|
||||
cfg_path = default_root / "config.yaml"
|
||||
if not cfg_path.exists():
|
||||
return
|
||||
with open(cfg_path, encoding="utf-8") as f:
|
||||
cfg = _yaml.safe_load(f) or {}
|
||||
multiplex = bool(
|
||||
cfg.get("multiplex_profiles")
|
||||
or (cfg.get("gateway", {}) or {}).get("multiplex_profiles")
|
||||
)
|
||||
if not multiplex:
|
||||
return
|
||||
except Exception:
|
||||
logger.debug("Multiplexer-conflict probe failed", exc_info=True)
|
||||
return
|
||||
|
||||
print_error(
|
||||
f"The default gateway is running as a profile multiplexer and already "
|
||||
f"serves profile '{suffix}'."
|
||||
)
|
||||
print(
|
||||
" When gateway.multiplex_profiles is on, the default gateway is the\n"
|
||||
" single inbound process for every profile. Starting a separate\n"
|
||||
" gateway for this profile would double-bind its platforms (two\n"
|
||||
" pollers on one bot token, port conflicts).\n"
|
||||
)
|
||||
print(" Manage the multiplexer instead (from the default profile):")
|
||||
print()
|
||||
print(" hermes gateway restart")
|
||||
print()
|
||||
print(" Pass --force to start a separate profile gateway anyway (not")
|
||||
print(" recommended while the multiplexer is running).")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def _guard_supervised_gateway_conflict(force: bool = False) -> None:
|
||||
"""Refuse a foreground gateway when a service manager already supervises one.
|
||||
|
||||
@@ -3977,6 +4057,7 @@ def run_gateway(verbose: int = 0, quiet: bool = False, replace: bool = False, fo
|
||||
systemd/launchd service is already supervising this profile.
|
||||
"""
|
||||
_guard_official_docker_root_gateway()
|
||||
_guard_named_profile_under_multiplexer(force=force)
|
||||
_guard_supervised_gateway_conflict(force=force)
|
||||
_guard_existing_gateway_process_conflict(replace=replace)
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
@@ -29,7 +29,7 @@ import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path, PurePosixPath, PureWindowsPath
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from agent.skill_utils import is_excluded_skill_path
|
||||
|
||||
@@ -781,6 +781,47 @@ def list_profiles() -> List[ProfileInfo]:
|
||||
return profiles
|
||||
|
||||
|
||||
def profiles_to_serve(multiplex: bool) -> List[Tuple[str, Path]]:
|
||||
"""Return the ``(profile_name, hermes_home)`` pairs a gateway should serve.
|
||||
|
||||
This is the single chokepoint for "which profiles does the inbound gateway
|
||||
handle" so later multiplexing phases never re-derive the set.
|
||||
|
||||
- ``multiplex=False`` (default): returns exactly one entry for the *active*
|
||||
profile — byte-for-byte the single-profile behavior the gateway has
|
||||
always had. The name is ``"default"`` for the default profile or the
|
||||
active named profile's id.
|
||||
- ``multiplex=True``: returns the default profile plus every valid named
|
||||
profile under ``profiles/``, each paired with its own HERMES_HOME.
|
||||
|
||||
Intentionally lightweight (a directory scan + name validation only): no
|
||||
per-profile config reads, gateway-running probes, or skill counts like
|
||||
:func:`list_profiles`. It runs on gateway startup and must stay cheap.
|
||||
|
||||
The returned ``hermes_home`` is the path to pass to
|
||||
``set_hermes_home_override`` when scoping a turn to that profile.
|
||||
"""
|
||||
active = get_active_profile_name() or "default"
|
||||
if not multiplex:
|
||||
return [(active, get_profile_dir(active))]
|
||||
|
||||
serve: List[Tuple[str, Path]] = [("default", _get_default_hermes_home())]
|
||||
|
||||
profiles_root = _get_profiles_root()
|
||||
if profiles_root.is_dir():
|
||||
for entry in sorted(profiles_root.iterdir()):
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
name = entry.name
|
||||
if name == "default":
|
||||
continue # default is the built-in entry already added above
|
||||
if not _PROFILE_ID_RE.match(name):
|
||||
continue
|
||||
serve.append((name, entry))
|
||||
|
||||
return serve
|
||||
|
||||
|
||||
def create_profile(
|
||||
name: str,
|
||||
clone_from: Optional[str] = None,
|
||||
|
||||
@@ -12,6 +12,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
from hermes_cli import auth as auth_mod
|
||||
from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool
|
||||
from agent.secret_scope import get_secret as _get_secret
|
||||
from hermes_cli.auth import (
|
||||
AuthError,
|
||||
DEFAULT_CODEX_BASE_URL,
|
||||
@@ -35,6 +36,19 @@ from hermes_constants import OPENROUTER_BASE_URL
|
||||
from utils import base_url_host_matches, base_url_hostname, env_int
|
||||
|
||||
|
||||
def _getenv(name: str, default: str = "") -> str:
|
||||
"""Profile-scoped replacement for ``os.getenv`` on credential/provider reads.
|
||||
|
||||
Routes through the secret scope (Workstream A): identical to ``os.getenv``
|
||||
when multiplexing is off, scope-aware (and fail-closed on an unscoped read)
|
||||
when on. Genuinely-global vars are handled inside ``get_secret`` and still
|
||||
read ``os.environ``. Keeps the ``(name, default) -> str`` contract every
|
||||
call site here already relies on.
|
||||
"""
|
||||
val = _get_secret(name, default)
|
||||
return val if val is not None else default
|
||||
|
||||
|
||||
def _normalize_custom_provider_name(value: str) -> str:
|
||||
return value.strip().lower().replace(" ", "-")
|
||||
|
||||
@@ -156,7 +170,7 @@ def _host_derived_api_key(base_url: str) -> str:
|
||||
if sanitized in ("OPENAI", "OPENROUTER", "OLLAMA"):
|
||||
return ""
|
||||
env_name = f"{sanitized}_API_KEY"
|
||||
return (os.getenv(env_name, "") or "").strip()
|
||||
return (_getenv(env_name, "") or "").strip()
|
||||
|
||||
|
||||
def _auto_detect_local_model(base_url: str) -> str:
|
||||
@@ -437,7 +451,7 @@ def resolve_requested_provider(requested: Optional[str] = None) -> str:
|
||||
|
||||
# Prefer the persisted config selection over any stale shell/.env
|
||||
# provider override so chat uses the endpoint the user last saved.
|
||||
env_provider = os.getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
|
||||
env_provider = _getenv("HERMES_INFERENCE_PROVIDER", "").strip().lower()
|
||||
if env_provider:
|
||||
return env_provider
|
||||
|
||||
@@ -542,7 +556,7 @@ def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, An
|
||||
name_norm = _normalize_custom_provider_name(ep_name)
|
||||
# Resolve the API key from the env var name stored in key_env
|
||||
key_env = str(entry.get("key_env", "") or "").strip()
|
||||
resolved_api_key = os.getenv(key_env, "").strip() if key_env else ""
|
||||
resolved_api_key = _getenv(key_env, "").strip() if key_env else ""
|
||||
# Fall back to inline api_key when key_env is absent or unresolvable
|
||||
if not resolved_api_key:
|
||||
resolved_api_key = str(entry.get("api_key", "") or "").strip()
|
||||
@@ -761,8 +775,8 @@ def _resolve_named_custom_runtime(
|
||||
api_key_candidates = [
|
||||
(explicit_api_key or "").strip(),
|
||||
# Gate env key fallbacks on authoritative hosts (#28660)
|
||||
(os.getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
|
||||
(os.getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
|
||||
(_getenv("OPENAI_API_KEY", "").strip() if _da_is_openai_url else ""),
|
||||
(_getenv("OPENROUTER_API_KEY", "").strip() if _da_is_openrouter else ""),
|
||||
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
|
||||
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
|
||||
# intuitive match without configuring `custom_providers` first.
|
||||
@@ -815,11 +829,11 @@ def _resolve_named_custom_runtime(
|
||||
api_key_candidates = [
|
||||
(explicit_api_key or "").strip(),
|
||||
str(custom_provider.get("api_key", "") or "").strip(),
|
||||
os.getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
|
||||
_getenv(str(custom_provider.get("key_env", "") or "").strip(), "").strip(),
|
||||
# Gate provider env keys on their authoritative hosts — sending
|
||||
# OPENAI_API_KEY to a local-llm endpoint leaks credentials (#28660).
|
||||
(os.getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
|
||||
(os.getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
|
||||
(_getenv("OPENAI_API_KEY", "").strip() if _cp_is_openai_url else ""),
|
||||
(_getenv("OPENROUTER_API_KEY", "").strip() if _cp_is_openrouter else ""),
|
||||
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host as a final
|
||||
# fallback when key_env wasn't set explicitly.
|
||||
_host_derived_api_key(base_url),
|
||||
@@ -878,8 +892,8 @@ def _resolve_openrouter_runtime(
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
|
||||
env_custom_base_url = os.getenv("CUSTOM_BASE_URL", "").strip()
|
||||
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
|
||||
env_custom_base_url = _getenv("CUSTOM_BASE_URL", "").strip()
|
||||
|
||||
# Use config base_url when available and the provider context matches.
|
||||
# OPENAI_BASE_URL env var is no longer consulted — config.yaml is
|
||||
@@ -919,8 +933,8 @@ def _resolve_openrouter_runtime(
|
||||
if _is_openrouter_context:
|
||||
api_key_candidates = [
|
||||
explicit_api_key,
|
||||
os.getenv("OPENROUTER_API_KEY"),
|
||||
os.getenv("OPENAI_API_KEY"),
|
||||
_getenv("OPENROUTER_API_KEY"),
|
||||
_getenv("OPENAI_API_KEY"),
|
||||
]
|
||||
else:
|
||||
# Custom endpoint: use api_key from config when using config base_url (#1760).
|
||||
@@ -940,9 +954,9 @@ def _resolve_openrouter_runtime(
|
||||
api_key_candidates = [
|
||||
explicit_api_key,
|
||||
(cfg_api_key if use_config_base_url else ""),
|
||||
(os.getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
|
||||
(os.getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
|
||||
(os.getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
|
||||
(_getenv("OLLAMA_API_KEY") if _is_ollama_url else ""),
|
||||
(_getenv("OPENAI_API_KEY") if (_is_openai_url or _is_openai_azure) else ""),
|
||||
(_getenv("OPENROUTER_API_KEY") if _is_openrouter_url else ""),
|
||||
# Bonus (#28660): derive `<VENDOR>_API_KEY` from the host so users
|
||||
# who set DEEPSEEK_API_KEY / GROQ_API_KEY / MISTRAL_API_KEY get the
|
||||
# intuitive match. Helper returns "" for IPs/loopback and for env
|
||||
@@ -1045,7 +1059,7 @@ def _resolve_azure_foundry_runtime(
|
||||
if inferred:
|
||||
cfg_api_mode = inferred
|
||||
|
||||
env_base_url = os.getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
|
||||
env_base_url = _getenv("AZURE_FOUNDRY_BASE_URL", "").strip().rstrip("/")
|
||||
base_url = explicit_base_url_clean or cfg_base_url or env_base_url
|
||||
if not base_url:
|
||||
raise AuthError(
|
||||
@@ -1134,7 +1148,7 @@ def _resolve_azure_foundry_runtime(
|
||||
except Exception:
|
||||
api_key = ""
|
||||
if not api_key:
|
||||
api_key = os.getenv("AZURE_FOUNDRY_API_KEY", "").strip()
|
||||
api_key = _getenv("AZURE_FOUNDRY_API_KEY", "").strip()
|
||||
if not api_key:
|
||||
raise AuthError(
|
||||
"Azure Foundry requires an API key. Set AZURE_FOUNDRY_API_KEY in "
|
||||
@@ -1234,7 +1248,7 @@ def _resolve_explicit_runtime(
|
||||
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
|
||||
if not api_key:
|
||||
creds = resolve_nous_runtime_credentials(
|
||||
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
||||
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
||||
)
|
||||
api_key = creds.get("api_key", "")
|
||||
expires_at = creds.get("expires_at")
|
||||
@@ -1263,7 +1277,7 @@ def _resolve_explicit_runtime(
|
||||
if pconfig and pconfig.auth_type == "api_key":
|
||||
env_url = ""
|
||||
if pconfig.base_url_env_var:
|
||||
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
|
||||
env_url = _getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
|
||||
|
||||
base_url = explicit_base_url
|
||||
if not base_url:
|
||||
@@ -1335,8 +1349,8 @@ def resolve_runtime_provider(
|
||||
if requested_provider == "anthropic" and "azure.com" in _eff_base:
|
||||
_azure_key = (
|
||||
(explicit_api_key or "").strip()
|
||||
or os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
|
||||
or os.getenv("ANTHROPIC_API_KEY", "").strip()
|
||||
or _getenv("AZURE_ANTHROPIC_KEY", "").strip()
|
||||
or _getenv("ANTHROPIC_API_KEY", "").strip()
|
||||
)
|
||||
return {
|
||||
"provider": "anthropic",
|
||||
@@ -1391,8 +1405,8 @@ def resolve_runtime_provider(
|
||||
if provider == "openrouter":
|
||||
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
|
||||
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
|
||||
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
|
||||
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
|
||||
env_openai_base_url = _getenv("OPENAI_BASE_URL", "").strip()
|
||||
env_openrouter_base_url = _getenv("OPENROUTER_BASE_URL", "").strip()
|
||||
has_custom_endpoint = bool(
|
||||
explicit_base_url
|
||||
or env_openai_base_url
|
||||
@@ -1448,7 +1462,7 @@ def resolve_runtime_provider(
|
||||
if provider == "nous":
|
||||
try:
|
||||
creds = resolve_nous_runtime_credentials(
|
||||
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
||||
timeout_seconds=float(_getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
|
||||
)
|
||||
return {
|
||||
"provider": "nous",
|
||||
@@ -1601,7 +1615,7 @@ def resolve_runtime_provider(
|
||||
for hint_key in ("key_env", "api_key_env"):
|
||||
env_var = str(model_cfg.get(hint_key) or "").strip()
|
||||
if env_var:
|
||||
token = os.getenv(env_var, "").strip()
|
||||
token = _getenv(env_var, "").strip()
|
||||
if token:
|
||||
break
|
||||
# Next: an inline api_key on the model config (useful in multi-profile
|
||||
@@ -1611,8 +1625,8 @@ def resolve_runtime_provider(
|
||||
# Finally fall back to the historical fixed names.
|
||||
if not token:
|
||||
token = (
|
||||
os.getenv("AZURE_ANTHROPIC_KEY", "").strip()
|
||||
or os.getenv("ANTHROPIC_API_KEY", "").strip()
|
||||
_getenv("AZURE_ANTHROPIC_KEY", "").strip()
|
||||
or _getenv("ANTHROPIC_API_KEY", "").strip()
|
||||
)
|
||||
if not token:
|
||||
raise AuthError(
|
||||
|
||||
130
tests/agent/test_secret_scope.py
Normal file
130
tests/agent/test_secret_scope.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""Tests for the profile-scoped credential primitive (Workstream A / Phase 2)."""
|
||||
import pytest
|
||||
|
||||
from agent import secret_scope as ss
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_multiplex():
|
||||
"""Ensure each test starts and ends with multiplexing off (it's a global)."""
|
||||
ss.set_multiplex_active(False)
|
||||
yield
|
||||
ss.set_multiplex_active(False)
|
||||
|
||||
|
||||
class TestMultiplexInactiveBackwardCompat:
|
||||
"""Default deployment: get_secret transparently reads os.environ."""
|
||||
|
||||
def test_reads_environ(self, monkeypatch):
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-test")
|
||||
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-test"
|
||||
|
||||
def test_missing_returns_default(self, monkeypatch):
|
||||
monkeypatch.delenv("NOPE_KEY", raising=False)
|
||||
assert ss.get_secret("NOPE_KEY") is None
|
||||
assert ss.get_secret("NOPE_KEY", "fallback") == "fallback"
|
||||
|
||||
def test_no_raise_without_scope(self, monkeypatch):
|
||||
monkeypatch.delenv("SOME_KEY", raising=False)
|
||||
# multiplex off => unscoped read is fine, returns default
|
||||
assert ss.get_secret("SOME_KEY") is None
|
||||
|
||||
|
||||
class TestMultiplexActiveFailClosed:
|
||||
"""Multiplex on: an unscoped secret read raises instead of leaking."""
|
||||
|
||||
def test_unscoped_read_raises(self, monkeypatch):
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-leaky")
|
||||
ss.set_multiplex_active(True)
|
||||
with pytest.raises(ss.UnscopedSecretError):
|
||||
ss.get_secret("ANTHROPIC_API_KEY")
|
||||
|
||||
def test_scoped_read_uses_scope_not_environ(self, monkeypatch):
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-from-environ")
|
||||
ss.set_multiplex_active(True)
|
||||
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-from-scope"})
|
||||
try:
|
||||
assert ss.get_secret("ANTHROPIC_API_KEY") == "sk-from-scope"
|
||||
finally:
|
||||
ss.reset_secret_scope(token)
|
||||
|
||||
def test_scoped_missing_key_returns_default_not_environ(self, monkeypatch):
|
||||
# Even though the value exists in os.environ, a scope is authoritative:
|
||||
# an absent scope key must NOT fall through to the (cross-profile) env.
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-other-profile")
|
||||
ss.set_multiplex_active(True)
|
||||
token = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-mine"})
|
||||
try:
|
||||
assert ss.get_secret("OPENAI_API_KEY") is None
|
||||
assert ss.get_secret("OPENAI_API_KEY", "d") == "d"
|
||||
finally:
|
||||
ss.reset_secret_scope(token)
|
||||
|
||||
def test_global_env_still_reads_environ_under_multiplex(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", "/opt/data")
|
||||
ss.set_multiplex_active(True)
|
||||
# No scope, multiplex on — but HERMES_HOME is global, so no raise.
|
||||
assert ss.get_secret("HERMES_HOME") == "/opt/data"
|
||||
|
||||
def test_kanban_prefix_is_global(self, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_KANBAN_DB", "/x/kanban.db")
|
||||
ss.set_multiplex_active(True)
|
||||
assert ss.get_secret("HERMES_KANBAN_DB") == "/x/kanban.db"
|
||||
|
||||
|
||||
class TestScopeIsolation:
|
||||
"""Two scopes never see each other's secrets."""
|
||||
|
||||
def test_nested_scopes_restore(self):
|
||||
ss.set_multiplex_active(True)
|
||||
t1 = ss.set_secret_scope({"K": "a"})
|
||||
try:
|
||||
assert ss.get_secret("K") == "a"
|
||||
t2 = ss.set_secret_scope({"K": "b"})
|
||||
try:
|
||||
assert ss.get_secret("K") == "b"
|
||||
finally:
|
||||
ss.reset_secret_scope(t2)
|
||||
assert ss.get_secret("K") == "a"
|
||||
finally:
|
||||
ss.reset_secret_scope(t1)
|
||||
|
||||
|
||||
class TestEnvFileParsing:
|
||||
"""load_env_file parses without mutating os.environ."""
|
||||
|
||||
def test_parses_basic(self, tmp_path):
|
||||
env = tmp_path / ".env"
|
||||
env.write_text(
|
||||
"# comment\n"
|
||||
"ANTHROPIC_API_KEY=sk-abc\n"
|
||||
"export OPENAI_API_KEY=sk-def\n"
|
||||
'QUOTED="quoted-value"\n'
|
||||
"SINGLE='single'\n"
|
||||
"\n"
|
||||
"BAD_LINE_NO_EQUALS\n"
|
||||
)
|
||||
out = ss.load_env_file(env)
|
||||
assert out == {
|
||||
"ANTHROPIC_API_KEY": "sk-abc",
|
||||
"OPENAI_API_KEY": "sk-def",
|
||||
"QUOTED": "quoted-value",
|
||||
"SINGLE": "single",
|
||||
}
|
||||
|
||||
def test_does_not_mutate_environ(self, tmp_path, monkeypatch):
|
||||
monkeypatch.delenv("ZZZ_KEY", raising=False)
|
||||
env = tmp_path / ".env"
|
||||
env.write_text("ZZZ_KEY=secret\n")
|
||||
ss.load_env_file(env)
|
||||
import os
|
||||
assert "ZZZ_KEY" not in os.environ
|
||||
|
||||
def test_missing_file_returns_empty(self, tmp_path):
|
||||
assert ss.load_env_file(tmp_path / "nope.env") == {}
|
||||
|
||||
def test_build_profile_secret_scope(self, tmp_path):
|
||||
(tmp_path / ".env").write_text("ANTHROPIC_API_KEY=sk-profile\n")
|
||||
assert ss.build_profile_secret_scope(tmp_path) == {
|
||||
"ANTHROPIC_API_KEY": "sk-profile"
|
||||
}
|
||||
136
tests/gateway/test_multiplex_adapter_registry.py
Normal file
136
tests/gateway/test_multiplex_adapter_registry.py
Normal file
@@ -0,0 +1,136 @@
|
||||
"""Phase 3: secondary-profile adapter registry + same-token conflict detection."""
|
||||
import pytest
|
||||
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
|
||||
class _FakeAdapter:
|
||||
def __init__(self, token=None):
|
||||
self.token = token
|
||||
|
||||
|
||||
class TestCredentialFingerprint:
|
||||
def test_none_without_token(self):
|
||||
assert GatewayRunner._adapter_credential_fingerprint(_FakeAdapter()) is None
|
||||
|
||||
def test_stable_and_log_safe(self):
|
||||
a = _FakeAdapter(token="secret-bot-token")
|
||||
fp1 = GatewayRunner._adapter_credential_fingerprint(a)
|
||||
fp2 = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="secret-bot-token"))
|
||||
assert fp1 == fp2 # stable
|
||||
assert "secret-bot-token" not in (fp1 or "") # never the raw token
|
||||
assert len(fp1) == 16
|
||||
|
||||
def test_distinct_tokens_distinct_fp(self):
|
||||
a = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-A"))
|
||||
b = GatewayRunner._adapter_credential_fingerprint(_FakeAdapter(token="tok-B"))
|
||||
assert a != b
|
||||
|
||||
def test_reads_alt_attrs(self):
|
||||
class _AltAdapter:
|
||||
def __init__(self):
|
||||
self.bot_token = "alt-token"
|
||||
assert GatewayRunner._adapter_credential_fingerprint(_AltAdapter()) is not None
|
||||
|
||||
|
||||
class TestProfileMessageHandler:
|
||||
@pytest.mark.asyncio
|
||||
async def test_stamps_profile_on_unstamped_source(self):
|
||||
runner = GatewayRunner.__new__(GatewayRunner)
|
||||
seen = {}
|
||||
|
||||
async def _fake_handle(event):
|
||||
seen["profile"] = event.source.profile
|
||||
return "ok"
|
||||
|
||||
runner._handle_message = _fake_handle
|
||||
handler = runner._make_profile_message_handler("coder")
|
||||
|
||||
class _Src:
|
||||
profile = None
|
||||
|
||||
class _Evt:
|
||||
source = _Src()
|
||||
|
||||
result = await handler(_Evt())
|
||||
assert result == "ok"
|
||||
assert seen["profile"] == "coder"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_does_not_override_existing_profile(self):
|
||||
runner = GatewayRunner.__new__(GatewayRunner)
|
||||
seen = {}
|
||||
|
||||
async def _fake_handle(event):
|
||||
seen["profile"] = event.source.profile
|
||||
return "ok"
|
||||
|
||||
runner._handle_message = _fake_handle
|
||||
handler = runner._make_profile_message_handler("coder")
|
||||
|
||||
class _Src:
|
||||
profile = "writer" # already stamped (e.g. by URL prefix)
|
||||
|
||||
class _Evt:
|
||||
source = _Src()
|
||||
|
||||
await handler(_Evt())
|
||||
assert seen["profile"] == "writer"
|
||||
|
||||
|
||||
class TestPortBindingHardError:
|
||||
"""A secondary profile enabling a port-binding platform aborts startup."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_secondary_webhook_raises(self, monkeypatch):
|
||||
from gateway.run import MultiplexConfigError
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
|
||||
runner = GatewayRunner.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(multiplex_profiles=True)
|
||||
runner._profile_adapters = {}
|
||||
|
||||
# reviewer profile config enables webhook (a port-binding platform)
|
||||
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
|
||||
reviewer_cfg.platforms = {
|
||||
Platform.WEBHOOK: PlatformConfig(enabled=True, extra={"port": 8644}),
|
||||
}
|
||||
monkeypatch.setattr(
|
||||
"gateway.config.load_gateway_config", lambda: reviewer_cfg
|
||||
)
|
||||
|
||||
with pytest.raises(MultiplexConfigError) as ei:
|
||||
await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
|
||||
assert "webhook" in str(ei.value)
|
||||
assert "reviewer" in str(ei.value)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_secondary_non_binding_platform_ok(self, monkeypatch):
|
||||
"""A non-port-binding platform (e.g. telegram) is NOT rejected."""
|
||||
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||
|
||||
runner = GatewayRunner.__new__(GatewayRunner)
|
||||
runner.config = GatewayConfig(multiplex_profiles=True)
|
||||
runner._profile_adapters = {}
|
||||
|
||||
reviewer_cfg = GatewayConfig(multiplex_profiles=True)
|
||||
reviewer_cfg.platforms = {
|
||||
Platform.TELEGRAM: PlatformConfig(enabled=True, token="t"),
|
||||
}
|
||||
monkeypatch.setattr(
|
||||
"gateway.config.load_gateway_config", lambda: reviewer_cfg
|
||||
)
|
||||
# _create_adapter returns None here (no real telegram token wiring), so
|
||||
# the loop simply connects nothing — the key assertion is NO raise.
|
||||
monkeypatch.setattr(runner, "_create_adapter", lambda p, c: None)
|
||||
|
||||
connected = await runner._start_one_profile_adapters("reviewer", "/tmp/x", {})
|
||||
assert connected == 0 # nothing connected, but no MultiplexConfigError
|
||||
|
||||
def test_port_binding_set_covers_known_listeners(self):
|
||||
from gateway.run import _PORT_BINDING_PLATFORM_VALUES
|
||||
# Every adapter that binds a TCP port must be in the guard set.
|
||||
for p in ("webhook", "api_server", "msgraph_webhook", "feishu",
|
||||
"wecom_callback", "bluebubbles", "sms"):
|
||||
assert p in _PORT_BINDING_PLATFORM_VALUES
|
||||
|
||||
88
tests/gateway/test_multiplex_credential_isolation.py
Normal file
88
tests/gateway/test_multiplex_credential_isolation.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""End-to-end credential isolation proof for multiplex mode (Workstream A).
|
||||
|
||||
These exercise the REAL resolution path (runtime_provider, secret scope, MCP
|
||||
interpolation) rather than mocking it, proving the property that matters: two
|
||||
profiles with different keys never see each other's, and an unscoped read in
|
||||
multiplex mode fails closed instead of leaking.
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from agent import secret_scope as ss
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset(monkeypatch):
|
||||
ss.set_multiplex_active(False)
|
||||
yield
|
||||
ss.set_multiplex_active(False)
|
||||
|
||||
|
||||
class TestRuntimeProviderUsesScope:
|
||||
"""hermes_cli.runtime_provider._getenv resolves through the secret scope."""
|
||||
|
||||
def test_getenv_reads_scope_under_multiplex(self, monkeypatch):
|
||||
from hermes_cli.runtime_provider import _getenv
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-global-leak")
|
||||
ss.set_multiplex_active(True)
|
||||
tok = ss.set_secret_scope({"ANTHROPIC_API_KEY": "sk-profileA"})
|
||||
try:
|
||||
assert _getenv("ANTHROPIC_API_KEY") == "sk-profileA"
|
||||
finally:
|
||||
ss.reset_secret_scope(tok)
|
||||
|
||||
def test_getenv_two_profiles_isolated(self, monkeypatch):
|
||||
from hermes_cli.runtime_provider import _getenv
|
||||
ss.set_multiplex_active(True)
|
||||
|
||||
tok_a = ss.set_secret_scope({"OPENAI_API_KEY": "sk-A"})
|
||||
try:
|
||||
assert _getenv("OPENAI_API_KEY") == "sk-A"
|
||||
finally:
|
||||
ss.reset_secret_scope(tok_a)
|
||||
|
||||
tok_b = ss.set_secret_scope({"OPENAI_API_KEY": "sk-B"})
|
||||
try:
|
||||
assert _getenv("OPENAI_API_KEY") == "sk-B"
|
||||
finally:
|
||||
ss.reset_secret_scope(tok_b)
|
||||
|
||||
def test_getenv_fails_closed_unscoped(self, monkeypatch):
|
||||
from hermes_cli.runtime_provider import _getenv
|
||||
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-leak")
|
||||
ss.set_multiplex_active(True)
|
||||
with pytest.raises(ss.UnscopedSecretError):
|
||||
_getenv("OPENROUTER_API_KEY")
|
||||
|
||||
def test_getenv_global_var_still_reads_environ(self, monkeypatch):
|
||||
from hermes_cli.runtime_provider import _getenv
|
||||
monkeypatch.setenv("HERMES_MAX_ITERATIONS", "42")
|
||||
ss.set_multiplex_active(True)
|
||||
# global var: no scope needed, no raise
|
||||
assert _getenv("HERMES_MAX_ITERATIONS") == "42"
|
||||
|
||||
|
||||
class TestMcpInterpolationUsesScope:
|
||||
"""MCP config ${VAR} interpolation resolves through the secret scope."""
|
||||
|
||||
def test_interpolation_reads_scope(self, monkeypatch):
|
||||
from tools.mcp_tool import _interpolate_env_vars
|
||||
monkeypatch.setenv("MY_MCP_TOKEN", "global-token")
|
||||
ss.set_multiplex_active(True)
|
||||
tok = ss.set_secret_scope({"MY_MCP_TOKEN": "profile-token"})
|
||||
try:
|
||||
cfg = {"env": {"TOKEN": "${MY_MCP_TOKEN}"}}
|
||||
assert _interpolate_env_vars(cfg) == {"env": {"TOKEN": "profile-token"}}
|
||||
finally:
|
||||
ss.reset_secret_scope(tok)
|
||||
|
||||
def test_interpolation_unset_keeps_placeholder(self, monkeypatch):
|
||||
from tools.mcp_tool import _interpolate_env_vars
|
||||
monkeypatch.delenv("UNSET_MCP_VAR", raising=False)
|
||||
# multiplex off: unset var keeps literal placeholder (legacy behavior)
|
||||
assert _interpolate_env_vars("${UNSET_MCP_VAR}") == "${UNSET_MCP_VAR}"
|
||||
|
||||
def test_interpolation_off_reads_environ(self, monkeypatch):
|
||||
from tools.mcp_tool import _interpolate_env_vars
|
||||
monkeypatch.setenv("MY_MCP_TOKEN", "env-token")
|
||||
# multiplex off: legacy os.environ resolution
|
||||
assert _interpolate_env_vars("${MY_MCP_TOKEN}") == "env-token"
|
||||
73
tests/gateway/test_multiplex_http_routing.py
Normal file
73
tests/gateway/test_multiplex_http_routing.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Phase 1: HTTP-inbound /p/<profile>/ routing for the webhook adapter."""
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform
|
||||
from gateway.session import SessionSource, build_session_key
|
||||
|
||||
|
||||
class TestSessionSourceProfileField:
|
||||
def test_profile_roundtrips(self):
|
||||
s = SessionSource(
|
||||
platform=Platform.WEBHOOK if hasattr(Platform, "WEBHOOK") else Platform.TELEGRAM,
|
||||
chat_id="c1",
|
||||
chat_type="webhook",
|
||||
profile="coder",
|
||||
)
|
||||
restored = SessionSource.from_dict(s.to_dict())
|
||||
assert restored.profile == "coder"
|
||||
|
||||
def test_profile_absent_not_serialized(self):
|
||||
s = SessionSource(platform=Platform.TELEGRAM, chat_id="c1", chat_type="dm")
|
||||
assert "profile" not in s.to_dict()
|
||||
|
||||
def test_source_profile_drives_session_key_namespace(self):
|
||||
s = SessionSource(platform=Platform.TELEGRAM, chat_id="99", chat_type="dm")
|
||||
# build_session_key takes profile explicitly; the adapter passes
|
||||
# source.profile through. Verify the namespace follows it.
|
||||
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
|
||||
|
||||
|
||||
class TestWebhookProfileResolution:
|
||||
"""_resolve_request_profile validates the /p/<profile>/ prefix."""
|
||||
|
||||
def _adapter(self, multiplex: bool, served=("default", "coder")):
|
||||
from gateway.platforms.webhook import WebhookAdapter, _PROFILE_REJECTED
|
||||
|
||||
class _FakeReq:
|
||||
def __init__(self, profile):
|
||||
self.match_info = {"profile": profile} if profile is not None else {}
|
||||
|
||||
cfg = GatewayConfig(multiplex_profiles=multiplex)
|
||||
|
||||
class _Runner:
|
||||
config = cfg
|
||||
|
||||
# Construct minimally; we only call _resolve_request_profile.
|
||||
adapter = WebhookAdapter.__new__(WebhookAdapter)
|
||||
adapter.gateway_runner = _Runner()
|
||||
return adapter, _FakeReq, _PROFILE_REJECTED, served
|
||||
|
||||
def test_no_prefix_returns_none(self):
|
||||
adapter, Req, _REJ, _ = self._adapter(multiplex=True)
|
||||
assert adapter._resolve_request_profile(Req(None)) is None
|
||||
|
||||
def test_prefix_ignored_when_multiplex_off(self):
|
||||
adapter, Req, _REJ, _ = self._adapter(multiplex=False)
|
||||
# Even a bogus profile is ignored (not 404'd) when multiplexing is off.
|
||||
assert adapter._resolve_request_profile(Req("anything")) is None
|
||||
|
||||
def test_known_profile_accepted(self, monkeypatch):
|
||||
adapter, Req, _REJ, served = self._adapter(multiplex=True)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.profiles.profiles_to_serve",
|
||||
lambda multiplex: [(n, None) for n in served],
|
||||
)
|
||||
assert adapter._resolve_request_profile(Req("coder")) == "coder"
|
||||
|
||||
def test_unknown_profile_rejected(self, monkeypatch):
|
||||
adapter, Req, REJ, served = self._adapter(multiplex=True)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.profiles.profiles_to_serve",
|
||||
lambda multiplex: [(n, None) for n in served],
|
||||
)
|
||||
assert adapter._resolve_request_profile(Req("ghost")) is REJ
|
||||
55
tests/gateway/test_multiplex_lifecycle.py
Normal file
55
tests/gateway/test_multiplex_lifecycle.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""Phase 4: lifecycle guard + per-profile observability."""
|
||||
import pytest
|
||||
|
||||
|
||||
class TestServedProfilesStatus:
|
||||
def test_write_and_read_served_profiles(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
import importlib
|
||||
import gateway.status as status
|
||||
importlib.reload(status)
|
||||
try:
|
||||
status.write_runtime_status(
|
||||
gateway_state="running", served_profiles=["default", "coder"]
|
||||
)
|
||||
rec = status.read_runtime_status()
|
||||
assert rec.get("served_profiles") == ["default", "coder"]
|
||||
finally:
|
||||
importlib.reload(status)
|
||||
|
||||
def test_served_profiles_absent_by_default(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
import importlib
|
||||
import gateway.status as status
|
||||
importlib.reload(status)
|
||||
try:
|
||||
status.write_runtime_status(gateway_state="running")
|
||||
rec = status.read_runtime_status()
|
||||
assert "served_profiles" not in rec
|
||||
finally:
|
||||
importlib.reload(status)
|
||||
|
||||
|
||||
class TestNamedProfileMultiplexerGuard:
|
||||
"""_guard_named_profile_under_multiplexer is inert unless all conditions hold."""
|
||||
|
||||
def test_inert_for_default_profile(self, monkeypatch):
|
||||
from hermes_cli import gateway as gw
|
||||
monkeypatch.setattr(gw, "_profile_suffix", lambda: "")
|
||||
# Should return without raising (default profile => guard N/A).
|
||||
gw._guard_named_profile_under_multiplexer(force=False)
|
||||
|
||||
def test_force_bypasses(self, monkeypatch):
|
||||
from hermes_cli import gateway as gw
|
||||
# Even if it looks like a named profile, force returns immediately.
|
||||
monkeypatch.setattr(gw, "_profile_suffix", lambda: "coder")
|
||||
gw._guard_named_profile_under_multiplexer(force=True)
|
||||
|
||||
def test_inert_when_no_default_gateway_running(self, monkeypatch, tmp_path):
|
||||
from hermes_cli import gateway as gw
|
||||
monkeypatch.setattr(gw, "_profile_suffix", lambda: "coder")
|
||||
monkeypatch.setattr(
|
||||
"hermes_constants.get_default_hermes_root", lambda: tmp_path
|
||||
)
|
||||
# No gateway.pid in tmp_path => no running default gateway => no raise.
|
||||
gw._guard_named_profile_under_multiplexer(force=False)
|
||||
165
tests/gateway/test_multiplex_phase0.py
Normal file
165
tests/gateway/test_multiplex_phase0.py
Normal file
@@ -0,0 +1,165 @@
|
||||
"""Phase 0 foundations for multi-profile gateway multiplexing.
|
||||
|
||||
Covers the three Phase 0 deliverables:
|
||||
1. ``gateway.multiplex_profiles`` config flag (default False, round-trips).
|
||||
2. ``hermes_cli.profiles.profiles_to_serve`` enumeration.
|
||||
3. Profile-stamped ``build_session_key`` that is BYTE-IDENTICAL when the
|
||||
flag is off (the orphan-every-session guard) and namespace-segmented when
|
||||
on, without disturbing the positional key layout downstream parsers rely
|
||||
on.
|
||||
"""
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from gateway.config import GatewayConfig, Platform
|
||||
from gateway.session import SessionSource, SessionStore, build_session_key
|
||||
|
||||
|
||||
def _src(**kw) -> SessionSource:
|
||||
kw.setdefault("platform", Platform.TELEGRAM)
|
||||
kw.setdefault("chat_id", "99")
|
||||
kw.setdefault("chat_type", "dm")
|
||||
return SessionSource(**kw)
|
||||
|
||||
|
||||
class TestSessionKeyByteIdenticalWhenOff:
|
||||
"""The non-negotiable guard: with no profile (or 'default'), every key is
|
||||
byte-for-byte what it was before Phase 0. A diff here orphans every
|
||||
existing session on upgrade."""
|
||||
|
||||
@pytest.mark.parametrize("profile", [None, "default"])
|
||||
def test_dm_with_chat_id(self, profile):
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99"
|
||||
|
||||
@pytest.mark.parametrize("profile", [None, "default"])
|
||||
def test_dm_with_thread(self, profile):
|
||||
s = _src(chat_id="99", chat_type="dm", thread_id="t1")
|
||||
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:99:t1"
|
||||
|
||||
@pytest.mark.parametrize("profile", [None, "default"])
|
||||
def test_dm_without_chat_id_falls_back_to_user(self, profile):
|
||||
s = _src(chat_id="", chat_type="dm", user_id="jordan")
|
||||
assert build_session_key(s, profile=profile) == "agent:main:telegram:dm:jordan"
|
||||
|
||||
@pytest.mark.parametrize("profile", [None, "default"])
|
||||
def test_group_per_user(self, profile):
|
||||
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
|
||||
assert (
|
||||
build_session_key(s, profile=profile)
|
||||
== "agent:main:discord:group:g1:alice"
|
||||
)
|
||||
|
||||
@pytest.mark.parametrize("profile", [None, "default"])
|
||||
def test_group_shared_when_disabled(self, profile):
|
||||
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
|
||||
assert (
|
||||
build_session_key(s, group_sessions_per_user=False, profile=profile)
|
||||
== "agent:main:discord:group:g1"
|
||||
)
|
||||
|
||||
|
||||
class TestSessionKeyNamespacedWhenOn:
|
||||
"""A named profile occupies the namespace slot, isolating its sessions."""
|
||||
|
||||
def test_named_profile_dm(self):
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
assert build_session_key(s, profile="coder") == "agent:coder:telegram:dm:99"
|
||||
|
||||
def test_named_profile_group_per_user(self):
|
||||
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
|
||||
assert (
|
||||
build_session_key(s, profile="coder")
|
||||
== "agent:coder:discord:group:g1:alice"
|
||||
)
|
||||
|
||||
def test_two_profiles_same_chat_do_not_collide(self):
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
a = build_session_key(s, profile="default")
|
||||
b = build_session_key(s, profile="coder")
|
||||
c = build_session_key(s, profile="writer")
|
||||
assert a != b != c and a != c
|
||||
|
||||
def test_positional_layout_preserved_for_parsers(self):
|
||||
"""Downstream parsers split on ':' and read parts[2]=platform,
|
||||
parts[3]=chat_type, parts[4]=chat_id (see qqbot adapter
|
||||
_parse_gateway_session_key). The profile must occupy parts[1] only."""
|
||||
s = _src(platform=Platform.DISCORD, chat_id="g1", chat_type="group", user_id="alice")
|
||||
parts = build_session_key(s, profile="coder").split(":")
|
||||
assert parts[0] == "agent"
|
||||
assert parts[1] == "coder" # namespace slot (was always 'main')
|
||||
assert parts[2] == "discord" # platform — unchanged offset
|
||||
assert parts[3] == "group" # chat_type — unchanged offset
|
||||
assert parts[4] == "g1" # chat_id — unchanged offset
|
||||
|
||||
def test_default_namespace_layout_matches_named(self):
|
||||
"""Default and named keys differ ONLY in parts[1]."""
|
||||
s = _src(platform=Platform.SLACK, chat_id="c1", chat_type="channel", user_id="u1")
|
||||
d = build_session_key(s, profile="default").split(":")
|
||||
n = build_session_key(s, profile="coder").split(":")
|
||||
assert d[0] == n[0] == "agent"
|
||||
assert d[1] == "main" and n[1] == "coder"
|
||||
assert d[2:] == n[2:] # everything after the namespace is identical
|
||||
|
||||
|
||||
class TestMultiplexConfigFlag:
|
||||
"""gateway.multiplex_profiles defaults off and round-trips."""
|
||||
|
||||
def test_default_is_false(self):
|
||||
assert GatewayConfig().multiplex_profiles is False
|
||||
|
||||
def test_to_dict_includes_flag(self):
|
||||
assert GatewayConfig().to_dict()["multiplex_profiles"] is False
|
||||
|
||||
def test_from_dict_top_level(self):
|
||||
cfg = GatewayConfig.from_dict({"multiplex_profiles": True})
|
||||
assert cfg.multiplex_profiles is True
|
||||
|
||||
def test_from_dict_nested_gateway(self):
|
||||
cfg = GatewayConfig.from_dict({"gateway": {"multiplex_profiles": True}})
|
||||
assert cfg.multiplex_profiles is True
|
||||
|
||||
def test_from_dict_coerces_truthy_string(self):
|
||||
cfg = GatewayConfig.from_dict({"multiplex_profiles": "true"})
|
||||
assert cfg.multiplex_profiles is True
|
||||
|
||||
def test_roundtrip(self):
|
||||
cfg = GatewayConfig.from_dict(GatewayConfig(multiplex_profiles=True).to_dict())
|
||||
assert cfg.multiplex_profiles is True
|
||||
|
||||
|
||||
class TestSessionStoreProfileResolution:
|
||||
"""SessionStore._generate_session_key honors the flag: legacy namespace
|
||||
when off, active-profile namespace when on."""
|
||||
|
||||
def _store(self, tmp_path, **cfg_kw):
|
||||
config = GatewayConfig(**cfg_kw)
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
s = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
s._db = None
|
||||
s._loaded = True
|
||||
return s
|
||||
|
||||
def test_flag_off_uses_legacy_namespace(self, tmp_path):
|
||||
store = self._store(tmp_path) # multiplex_profiles defaults False
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"
|
||||
assert store._generate_session_key(s) == build_session_key(s)
|
||||
|
||||
def test_flag_off_resolve_profile_is_none(self, tmp_path):
|
||||
store = self._store(tmp_path)
|
||||
assert store._resolve_profile_for_key() is None
|
||||
|
||||
def test_flag_on_uses_active_profile_namespace(self, tmp_path):
|
||||
store = self._store(tmp_path, multiplex_profiles=True)
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
with patch("hermes_cli.profiles.get_active_profile_name", return_value="coder"):
|
||||
assert store._generate_session_key(s) == "agent:coder:telegram:dm:99"
|
||||
|
||||
def test_flag_on_default_profile_stays_legacy(self, tmp_path):
|
||||
store = self._store(tmp_path, multiplex_profiles=True)
|
||||
s = _src(chat_id="99", chat_type="dm")
|
||||
with patch("hermes_cli.profiles.get_active_profile_name", return_value="default"):
|
||||
assert store._generate_session_key(s) == "agent:main:telegram:dm:99"
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ from hermes_cli.profiles import (
|
||||
has_bundled_skills_opt_out,
|
||||
NO_BUNDLED_SKILLS_MARKER,
|
||||
backfill_profile_envs,
|
||||
profiles_to_serve,
|
||||
)
|
||||
from hermes_cli.config import DEFAULT_CONFIG
|
||||
|
||||
@@ -1487,3 +1488,48 @@ class TestEdgeCases:
|
||||
delete_profile("coder", yes=True)
|
||||
|
||||
assert get_active_profile() == "default"
|
||||
|
||||
|
||||
class TestProfilesToServe:
|
||||
"""profiles_to_serve(multiplex) — the gateway's profile-enumeration chokepoint."""
|
||||
|
||||
def test_off_returns_only_active_default(self, profile_env):
|
||||
serve = profiles_to_serve(multiplex=False)
|
||||
assert len(serve) == 1
|
||||
name, home = serve[0]
|
||||
assert name == "default"
|
||||
assert home == _get_default_hermes_home()
|
||||
|
||||
def test_off_returns_only_active_named(self, profile_env, monkeypatch):
|
||||
# A named profile's gateway runs with HERMES_HOME pointing at the
|
||||
# profile dir; get_active_profile_name() infers the name from there.
|
||||
create_profile("coder", no_alias=True)
|
||||
monkeypatch.setenv("HERMES_HOME", str(get_profile_dir("coder")))
|
||||
serve = profiles_to_serve(multiplex=False)
|
||||
assert len(serve) == 1
|
||||
assert serve[0][0] == "coder"
|
||||
assert serve[0][1] == get_profile_dir("coder")
|
||||
|
||||
def test_on_returns_default_plus_all_named(self, profile_env):
|
||||
create_profile("coder", no_alias=True)
|
||||
create_profile("writer", no_alias=True)
|
||||
serve = dict(profiles_to_serve(multiplex=True))
|
||||
assert set(serve) == {"default", "coder", "writer"}
|
||||
assert serve["default"] == _get_default_hermes_home()
|
||||
assert serve["coder"] == get_profile_dir("coder")
|
||||
|
||||
def test_on_default_always_first(self, profile_env):
|
||||
create_profile("coder", no_alias=True)
|
||||
serve = profiles_to_serve(multiplex=True)
|
||||
assert serve[0][0] == "default"
|
||||
|
||||
def test_on_active_profile_does_not_change_set(self, profile_env):
|
||||
"""Enumeration is independent of which profile is active."""
|
||||
create_profile("coder", no_alias=True)
|
||||
set_active_profile("coder")
|
||||
serve = dict(profiles_to_serve(multiplex=True))
|
||||
assert set(serve) == {"default", "coder"}
|
||||
|
||||
def test_on_no_named_profiles_returns_just_default(self, profile_env):
|
||||
serve = profiles_to_serve(multiplex=True)
|
||||
assert [n for n, _ in serve] == ["default"]
|
||||
|
||||
@@ -2662,10 +2662,19 @@ def _interrupted_call_result() -> str:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _interpolate_env_vars(value):
|
||||
"""Recursively resolve ``${VAR}`` placeholders from ``os.environ``."""
|
||||
"""Recursively resolve ``${VAR}`` placeholders.
|
||||
|
||||
Resolves from the active profile's secret scope when multiplexing is on
|
||||
(so an MCP server config's ``${API_KEY}`` picks up the routed profile's
|
||||
value, not the process-global ``os.environ`` which may hold another
|
||||
profile's), falling back to ``os.environ`` otherwise. Unset vars keep the
|
||||
literal ``${VAR}`` placeholder, as before.
|
||||
"""
|
||||
from agent.secret_scope import get_secret as _get_secret
|
||||
|
||||
if isinstance(value, str):
|
||||
def _replace(m):
|
||||
return os.environ.get(m.group(1), m.group(0))
|
||||
return _get_secret(m.group(1), m.group(0)) or m.group(0)
|
||||
return _ENV_VAR_PATTERN.sub(_replace, value)
|
||||
if isinstance(value, dict):
|
||||
return {k: _interpolate_env_vars(v) for k, v in value.items()}
|
||||
|
||||
@@ -56,6 +56,139 @@ research gateway start
|
||||
That's it — three independent agents, each on its own process, restarting
|
||||
automatically on crash and on user login.
|
||||
|
||||
## Alternative: one gateway for all profiles (multiplexing)
|
||||
|
||||
The model above runs **one process per profile**. That is the default and is
|
||||
the right choice for most setups. But on a host with many profiles — or a
|
||||
container deployment where one process per profile is operationally heavy — you
|
||||
can instead run a **single multiplexing gateway**: the default profile's gateway
|
||||
becomes the sole inbound process and serves messages for *every* profile on the
|
||||
box.
|
||||
|
||||
This is **opt-in** and **off by default**. When it's off, nothing on this page
|
||||
changes — every behavior below is inert.
|
||||
|
||||
### When to prefer multiplexing
|
||||
|
||||
- A container/VPS deployment where N supervisor units, N ports, and N PID files
|
||||
are a burden.
|
||||
- Many low-traffic profiles that don't each justify a full process.
|
||||
- You want a single thing to start, monitor, and restart.
|
||||
|
||||
Stick with one-process-per-profile when you want hard process-level isolation
|
||||
between profiles (separate memory footprints, independent crash domains, the
|
||||
ability to restart one profile without touching the others).
|
||||
|
||||
### How to opt in
|
||||
|
||||
Set the flag on the **default profile** (it owns the multiplexer) and restart
|
||||
its gateway:
|
||||
|
||||
```bash
|
||||
hermes config set gateway.multiplex_profiles true
|
||||
hermes gateway restart
|
||||
```
|
||||
|
||||
Equivalently, in the default profile's `~/.hermes/config.yaml`:
|
||||
|
||||
```yaml
|
||||
gateway:
|
||||
multiplex_profiles: true
|
||||
```
|
||||
|
||||
(The flag is also accepted as a top-level `multiplex_profiles: true` for
|
||||
convenience.) On the next start the default gateway enumerates every profile,
|
||||
brings up each profile's enabled platforms under that profile's own
|
||||
credentials, and routes each inbound message to the profile it belongs to. Each
|
||||
turn resolves the routed profile's config, skills, memory, SOUL, **and provider
|
||||
keys** — credentials are never shared across profiles.
|
||||
|
||||
You do **not** run `hermes gateway start` for the secondary profiles — the
|
||||
default gateway serves them. See the contract changes below.
|
||||
|
||||
### What changes when multiplexing is on
|
||||
|
||||
Enabling the flag changes how a few things behave. All of these revert the
|
||||
moment the flag is off.
|
||||
|
||||
#### 1. Secondary profiles must not start their own gateway
|
||||
|
||||
With a multiplexer running, a named-profile `hermes gateway start` / `run` is a
|
||||
**hard error**, pointing you back at the multiplexer:
|
||||
|
||||
```
|
||||
The default gateway is running as a profile multiplexer and already serves
|
||||
profile 'coder'. ...
|
||||
```
|
||||
|
||||
The multiplexer is the single inbound process; a second profile gateway would
|
||||
double-bind that profile's platforms. Pass `--force` only if you deliberately
|
||||
want a separate process for that profile (not recommended while the multiplexer
|
||||
is running). The cross-profile lifecycle wrapper script earlier on this page is
|
||||
therefore **not** used in multiplex mode — you only manage the default gateway.
|
||||
|
||||
#### 2. HTTP-inbound platforms are reached via a `/p/<profile>/` URL prefix
|
||||
|
||||
Webhook (and other HTTP-inbound) traffic for a secondary profile arrives on the
|
||||
default listener under a profile prefix, **not** a second port:
|
||||
|
||||
```
|
||||
# default profile
|
||||
POST http://host:8644/webhooks/<route>
|
||||
# the "coder" profile, same listener
|
||||
POST http://host:8644/p/coder/webhooks/<route>
|
||||
```
|
||||
|
||||
An unknown or unconfigured profile in the prefix returns `404`. Because the one
|
||||
shared listener already serves every profile this way, a **secondary profile
|
||||
must not enable a port-binding platform itself** — doing so is a config error
|
||||
and the gateway refuses to start, naming the profile and platform:
|
||||
|
||||
```
|
||||
Profile 'coder' enables the port-binding platform 'webhook', but
|
||||
gateway.multiplex_profiles is on. ... Remove platforms.webhook from profile
|
||||
'coder's config.yaml (configure it only on the default profile).
|
||||
```
|
||||
|
||||
Port-binding platforms covered by this rule: `webhook`, `api_server`,
|
||||
`msgraph_webhook`, `feishu`, `wecom_callback`, `bluebubbles`, `sms`. Configure
|
||||
any of these **only on the default profile**; every profile is reachable through
|
||||
its `/p/<profile>/` prefix.
|
||||
|
||||
#### 3. Per-credential platforms still need their own token per profile
|
||||
|
||||
Polling/connection platforms (Telegram, Discord, Slack, Matrix, Signal, …) work
|
||||
fine multiplexed, but each profile that enables one must supply its **own** bot
|
||||
token — the same token cannot be polled by two profiles at once. If two profiles
|
||||
configure the same `(platform, token)`, startup fails fast naming both profiles
|
||||
(see [Token-conflict safety](#token-conflict-safety) — the rule is unchanged,
|
||||
it's just enforced inside the one process now).
|
||||
|
||||
#### 4. Session keys are namespaced by profile
|
||||
|
||||
Each profile's sessions live under an `agent:<profile>:…` namespace so two
|
||||
profiles on the same platform/chat never collide in the shared session store.
|
||||
The **default** profile keeps the historical `agent:main:…` namespace
|
||||
byte-for-byte, so existing default-profile sessions are unaffected — no
|
||||
migration, no orphaned history.
|
||||
|
||||
#### 5. One PID/lock and one status surface
|
||||
|
||||
There is a single process-level PID and lock (the multiplexer, under the default
|
||||
home). `hermes status` reports the multiplexer and the profiles it serves;
|
||||
`hermes status -p <name>` slices to one profile. Each profile still writes its
|
||||
own `runtime_status.json` under its own home, so existing per-profile readers
|
||||
keep working.
|
||||
|
||||
#### What does **not** change
|
||||
|
||||
Per-profile `.env` credential isolation is preserved and, if anything,
|
||||
stricter: a profile's keys are resolved from its own scope and are never unioned
|
||||
into a shared environment (this also means subprocesses like MCP servers and
|
||||
Kanban workers only ever see their own profile's secrets). Kanban,
|
||||
profile-scoped skills/memory/SOUL, and model routing all behave per-profile
|
||||
exactly as they do with separate gateways.
|
||||
|
||||
## Start, stop, or restart all gateways at once
|
||||
|
||||
The CLI ships with single-profile lifecycle commands. To act across every
|
||||
|
||||
Reference in New Issue
Block a user