Compare commits

...

3 Commits

Author SHA1 Message Date
Ben
db827de802 feat(gateway): external drain trigger + accept-gating (begin/cancel + control channel)
Tasks 2.1 + 2.2 + 2.3 of the safe-shutdown plan — the reversible
quiesce-without-restart machinery NAS drives during a lifecycle action (D4a).
These ship together because the endpoint, the control channel, and the gateway
state machine are one coherent slice.

2.2 — control channel (gateway/drain_control.py, new):
The dashboard has no HTTP path into a running gateway (guardrails: "there is NO
external control channel into a running gateway"); restart/drain is driven only
by markers the gateway reacts to. So begin/cancel-drain writes/removes a
presence-based marker .drain_request.json (HERMES_HOME-scoped, atomic write,
never-raises read; a corrupt marker reads as present-contentless → fail-safe
toward quiescing). This is Q-B option A.

2.2 — gateway state machine (gateway/run.py):
- _external_drain_active flag, DISTINCT from the shutdown _draining flag: this
  one does NOT exit the process and is fully reversible.
- _enter_external_drain / _exit_external_drain: idempotent transitions that
  flip gateway_state→draining / →running via _update_runtime_status (preserving
  the live active_agents count). exit refuses to revert to running during a
  real shutdown or after the loop stops (shutdown wins).
- _drain_control_watcher: 1s background task (modelled on _handoff_watcher)
  reconciling accept-state with the marker; honours a marker that survived a
  restart on its first tick. Registered alongside the other watchers in start.
- New-turn accept gate in _handle_message, placed BEFORE the session-slot
  claim: when draining, refuse to START a new turn (so active_agents can only
  fall → no TOCTOU race), while in-flight turns finish untouched. Internal/
  system events (restart-recovery replays, bg-process completions) bypass it.

2.1 — endpoint (hermes_cli/web_server.py):
POST /api/gateway/drain {action: drain|cancel}. Authenticated by the Task-2.0a
token seam (the drain plugin registered this exact path as a token route);
attributes the request to the verified token principal. Begin writes the
marker, cancel removes it — the gateway process owns the actual transition.
Force-override (D6) is NOT here; it maps onto the existing immediate
/api/gateway/restart force path.

Tests (mocked — necessary-not-sufficient; the HARD live gate Q-B is next):
- tests/gateway/test_external_drain_control.py — marker contract (write/clear/
  read/corrupt/atomic), state machine (enter/exit/idempotency/shutdown-wins/
  loop-stopped), watcher reconcile-enter-then-exit, new-turn refusal, and
  in-flight-not-interrupted. 15 tests.
- tests/hermes_cli/test_web_server.py — /api/gateway/drain begin/default-begin/
  cancel/cancel-idempotent/bad-action-400. 6 tests.
- dashboard.drain_auth config section already added in 2.0b commit.

All touched suites green: 301 (gateway+auth) + 9 (web_server endpoints) passed.

Intentionally deferred:
- HARD live-validation gate (Q-B): real isolated `hermes gateway run`, drive a
  real begin-drain marker, prove the 5-point checklist a–e.
- Spec-doc status flip + Phase-2 PR.

Build status: external-drain, restart-drain, status, dashboard-auth, drain-plugin,
token-auth, and web_server-endpoint suites green.
2026-06-22 11:27:44 +10:00
Ben
ef5b2b3197 feat(dashboard-auth): drain shared-bearer-secret provider plugin
Task 2.0b: the concrete shared-bearer-secret auth provider, the FIRST consumer
of the generic token-auth capability (Task 2.0a). Implements decisions.md Q-A.

plugins/dashboard_auth/drain/ (bundled, discovered like dashboard_auth/basic):
- DrainSecretProvider: non-interactive provider, supports_token=True. Verifies
  an inbound Authorization bearer token against a per-agent shared secret with
  hmac.compare_digest (constant-time, no timing oracle) and, on a match,
  vouches for the caller as the "drain-control" principal scoped to "drain".
  The five interactive ABC methods raise NotImplementedError; verify_session
  returns None (stacks harmlessly in the cookie-verify loop).
- assess_secret_strength(): fail-closed entropy gate. Rejects secrets shorter
  than 43 url-safe-b64 chars (~256 bits), with < 16 distinct characters, or
  below 128 bits Shannon entropy — so a weak/structured/repeated secret can
  never be silently accepted. Enforced both at register() (friendly skip
  reason) and in __init__ (raises — defence in depth).
- register(ctx): no-op + skip reason when HERMES_DASHBOARD_DRAIN_SECRET is
  unset; rejects a weak secret fail-closed (drain endpoint stays gated). On a
  strong secret, registers the provider AND opts /api/gateway/drain into the
  generic token-auth seam via register_token_route().

Config: the secret is a CREDENTIAL → carried via HERMES_DASHBOARD_DRAIN_SECRET
(per-agent, provisioned by NAS at deploy). Behavioural knobs only
(dashboard.drain_auth.{scope,min_secret_chars}) live in config.yaml — added to
DEFAULT_CONFIG with the .env-is-for-secrets rationale documented inline.

Tests: tests/plugins/dashboard_auth/test_drain_provider.py — entropy gate
(strong pass; empty/short/repeated/few-distinct/custom-min reject), verify_token
(match → scoped principal, wrong/empty → None, custom scope), protocol
compliance, interactive-methods-raise, and register() (skip-no-secret,
fail-closed-weak-secret, strong-env-secret registers + route opt-in, config
scope + min_secret_chars). 21 new tests; drain + token-auth suites 44 passed.
Verified the plugin is discovered as dashboard_auth/drain alongside basic/nous.

Intentionally deferred:
- The begin/cancel-drain endpoint handler itself — Task 2.1.
- The dashboard→gateway control channel — Task 2.2.

Build status: dashboard-auth + drain-plugin suites green.
2026-06-22 11:11:38 +10:00
Ben
ab7bda4987 feat(dashboard-auth): generic non-interactive API-token capability
Task 2.0a of the safe-shutdown drain-coordination plan. Widens the dashboard
auth framework GENERICALLY to support non-interactive (service-to-service)
bearer-token auth, mirroring the existing supports_password precedent. This is
a reusable capability — any future machine-credential provider plugs in without
core changes (decisions.md Q-C). The drain bearer-secret plugin (Task 2.0b) is
the first consumer, not the definition.

- base.py: add TokenPrincipal dataclass (the token analog of Session) +
  supports_token capability flag + verify_token() on the ABC (default raises
  NotImplementedError so a misconfigured provider fails loud). Contract mirrors
  verify_session stacking: return None for unrecognised tokens (never raise),
  raise ProviderError only on a genuine backing-store outage.
- registry.py: list_token_providers() — the supports_token subset, in
  registration order. Empty when none registered (token routes fail closed).
- token_auth.py (new): route-agnostic seam. Routes opt in via
  register_token_route(exact path); token_auth_middleware owns the auth
  decision for those routes only — authenticate via stacked providers, attach
  request.state.token_principal + token_authenticated, pass through. 401 on
  missing/unrecognised token, 503 when a provider was unreachable, untouched
  passthrough for non-token routes. Fails closed (never open).
- web_server.py: install the seam OUTERMOST (registered last → runs first).
  Both downstream gates (legacy auth_middleware + gated_auth_middleware) honour
  request.state.token_authenticated and skip enforcement, so a token-authed
  service request is never bounced to /login.
- audit.py: TOKEN_AUTH_SUCCESS / TOKEN_AUTH_FAILURE events.

Tests: tests/hermes_cli/test_dashboard_token_auth.py — ABC flag default,
verify_token NotImplementedError, registry filter, bearer extraction
(case-insensitive scheme, malformed/non-bearer → ""), provider stacking
(first-match-wins, unreachable-remembered, unreachable-then-valid, buggy
provider doesn't crash the gate), and the seam's passthrough/401/503/
fail-closed behaviour. 29 new tests; full dashboard-auth suite 169 passed.

Intentionally deferred:
- The concrete shared-bearer-secret provider plugin — Task 2.0b.
- The begin/cancel-drain endpoint that registers itself as a token route —
  Task 2.1.

Build status: dashboard-auth + plugin-hook suites green.
2026-06-22 11:07:56 +10:00
16 changed files with 1657 additions and 0 deletions

109
gateway/drain_control.py Normal file
View File

@@ -0,0 +1,109 @@
"""External drain-control marker contract (dashboard → gateway).
Task 2.2 of the safe-shutdown plan (decisions.md Q-B, option A): the dashboard
has no way to call into a running gateway — there is no HTTP control channel
into the gateway process (guardrails: "there is NO external control channel
into a running gateway"). Restart/drain is driven only by the gateway reacting
to its own inputs: slash commands, process signals, and file markers it writes
itself (``.restart_notify.json``).
So the begin/cancel-drain dashboard endpoint communicates with the running
gateway the same way: it writes (or removes) a marker file, and a gateway
background watcher reacts to it. This module owns that marker contract so both
sides — the dashboard endpoint (writer) and the gateway watcher (reader) —
share one definition and can never disagree.
Contract (presence-based, mirroring ``.restart_notify.json``):
* begin-drain → write ``{HERMES_HOME}/.drain_request.json`` with
``{"action": "drain", "requested_at": <iso>, "principal": <str>}``.
* cancel-drain → remove the marker.
* The gateway watcher treats **presence** of the marker as "external drain
active": flip ``gateway_state -> "draining"`` and stop accepting new turns.
Absence means "not draining" (revert to ``running`` if we had flipped it).
Reading the marker never raises: a malformed/half-written file reads as
"present but contentless", which the watcher still treats as drain-active
(fail-safe toward quiescing — a corrupt begin marker must not be ignored).
"""
from __future__ import annotations
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from hermes_constants import get_hermes_home
from utils import atomic_json_write
_log = logging.getLogger(__name__)
_DRAIN_REQUEST_FILENAME = ".drain_request.json"
def drain_request_path(home: Optional[Path] = None) -> Path:
"""Absolute path to the drain-request marker, respecting HERMES_HOME."""
base = home if home is not None else get_hermes_home()
return Path(base) / _DRAIN_REQUEST_FILENAME
def write_drain_request(
*, principal: str = "drain-control", home: Optional[Path] = None
) -> dict[str, Any]:
"""Write the begin-drain marker. Returns the payload written.
Atomic write so the gateway watcher never reads a half-written file.
Idempotent: re-writing while a drain is already in progress just refreshes
``requested_at`` (harmless — the watcher keys off presence, not content).
"""
payload = {
"action": "drain",
"requested_at": datetime.now(timezone.utc).isoformat(),
"principal": principal,
}
atomic_json_write(drain_request_path(home), payload)
return payload
def clear_drain_request(*, home: Optional[Path] = None) -> bool:
"""Remove the drain marker (cancel-drain). Returns True if one existed.
Best-effort: a missing file is not an error (cancel is idempotent).
"""
path = drain_request_path(home)
try:
path.unlink()
return True
except FileNotFoundError:
return False
except OSError as e:
_log.warning("drain-control: failed to remove %s: %s", path, e)
return False
def drain_requested(*, home: Optional[Path] = None) -> bool:
"""True iff the begin-drain marker is present (external drain active)."""
return drain_request_path(home).exists()
def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]:
"""Return the marker payload, or ``None`` if absent.
A present-but-unparseable marker returns ``{}`` (truthy-presence preserved
via :func:`drain_requested`; callers that need the body get an empty dict
rather than an exception). Never raises.
"""
path = drain_request_path(home)
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError:
return None
except OSError as e:
_log.warning("drain-control: failed to read %s: %s", path, e)
return None
try:
data = json.loads(raw)
except (ValueError, TypeError):
return {}
return data if isinstance(data, dict) else {}

View File

@@ -2471,6 +2471,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
_restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
_exit_code: Optional[int] = None
_draining: bool = False
_external_drain_active: bool = False
_restart_requested: bool = False
_restart_task_started: bool = False
_restart_detached: bool = False
@@ -2531,6 +2532,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
self._exit_reason: Optional[str] = None
self._exit_code: Optional[int] = None
self._draining = False
# External (NAS-driven) drain state — distinct from the shutdown
# ``_draining`` flag above. Set by ``_drain_control_watcher`` when the
# ``.drain_request.json`` marker is present: the gateway flips
# ``gateway_state -> draining`` and refuses NEW turns, but the process
# does NOT exit (the whole point — quiesce-without-restart, D4a). It is
# fully reversible: removing the marker reverts to ``running`` and
# re-accepts turns. ``_draining`` (shutdown) is one-way and ends in
# process exit; this one is a steady state NAS polls during its
# request -> poll -> proceed loop.
self._external_drain_active = False
self._restart_requested = False
# Set by shutdown_signal_handler when a SIGTERM/SIGINT arrived
# WITHOUT a planned-stop / takeover marker — i.e. an unexpected
@@ -3687,6 +3698,85 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception:
pass
# ------------------------------------------------------------------
# External drain control (NAS-driven quiesce-without-restart, Phase 2).
# The dashboard's begin/cancel-drain endpoint writes/removes the
# ``.drain_request.json`` marker (gateway/drain_control.py); this watcher
# observes the marker and flips the gateway between accepting and refusing
# NEW turns, WITHOUT exiting the process. Reversible by design (D4a): NAS
# POSTs begin-drain, polls /api/status until active_agents hits 0, proceeds
# with its lifecycle action, then (on cancel/abort) the marker is removed
# and the gateway re-accepts turns.
# ------------------------------------------------------------------
def _enter_external_drain(self) -> None:
"""Begin external drain: stop accepting new turns, flip state.
Idempotent — re-entering while already draining is a no-op beyond a
best-effort status re-write. In-flight turns are NOT interrupted (the
whole point is to let them finish); only NEW turns are refused.
"""
if self._external_drain_active:
return
self._external_drain_active = True
logger.info(
"External drain ENGAGED (.drain_request.json present) — refusing "
"new turns; %d in-flight turn(s) will finish. Process stays up.",
self._running_agent_count(),
)
# Flip the persisted lifecycle state so /api/status.gateway_busy /
# gateway_drainable track the drain. Preserve active_agents (the
# read-merge keeps the live count); only the state changes.
self._update_runtime_status("draining")
def _exit_external_drain(self) -> None:
"""Cancel external drain: revert state, re-accept new turns.
Idempotent. Only reverts to ``running`` when we are actually mid-drain
AND not also shutting down (a real shutdown ``_draining`` must win —
never resurrect a stopping gateway to ``running``).
"""
if not self._external_drain_active:
return
self._external_drain_active = False
if self._draining or not self._running:
# A shutdown drain is in progress / the loop has stopped — do not
# clobber the terminal state back to running.
logger.info(
"External drain marker cleared during shutdown — not reverting "
"to running (shutdown takes precedence)."
)
return
logger.info(
"External drain RELEASED (.drain_request.json removed) — "
"re-accepting new turns; gateway_state -> running."
)
self._update_runtime_status("running")
async def _drain_control_watcher(self, interval: float = 1.0) -> None:
"""Background task: reconcile gateway accept-state with the drain marker.
Polls ``.drain_request.json`` (presence-based contract,
gateway/drain_control.py). Marker present -> ``_enter_external_drain``;
marker absent -> ``_exit_external_drain``. The 1s cadence bounds the
observe-the-marker latency the live-validation gate checks (point a).
Reconciles once at startup so a marker that survived a restart is
honoured immediately. Best-effort: any tick error is logged and the
loop continues (a transient stat() failure must not wedge the gateway).
"""
from gateway.drain_control import drain_requested
while self._running:
try:
if drain_requested():
self._enter_external_drain()
else:
self._exit_external_drain()
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug("Drain-control watcher tick error: %s", exc, exc_info=True)
await asyncio.sleep(interval)
def _update_platform_runtime_status(
self,
platform: str,
@@ -5907,6 +5997,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# idle case where the subagent finishes with no agent turn running.
asyncio.create_task(self._async_delegation_watcher())
# Start background drain-control watcher — reconciles the gateway's
# new-turn accept-state with the external ``.drain_request.json`` marker
# the dashboard begin/cancel-drain endpoint writes (Phase 2). Honours a
# marker that survived a restart on its first tick.
asyncio.create_task(self._drain_control_watcher())
logger.info("Press Ctrl+C to stop")
return True
@@ -8419,6 +8515,27 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
return self._telegram_topic_root_lobby_message()
return None
# ── External-drain new-turn gate (Phase 2) ────────────────────
# When NAS has engaged an external drain (.drain_request.json present,
# observed by _drain_control_watcher), refuse to START a new turn so
# the in-flight set can only fall to zero — eliminating the TOCTOU race
# (D4a: stop accepting new turns FIRST, then NAS polls until
# active_agents==0). In-flight turns are untouched; this only blocks the
# claim of a NEW session slot. Internal/system events (restart-recovery
# replays, background-process completions) bypass the gate — they are
# not user-initiated new work and must still flow during a drain.
# Reversible: once the marker is removed the gate opens again.
if self._external_drain_active and not is_internal:
logger.info(
"Refusing new turn for session %s — external drain active.",
_quick_key,
)
return (
"⏳ This agent is draining for a maintenance action and isn't "
"accepting new turns right now. It'll be back in a moment — "
"please resend shortly."
)
# ── Claim this session before any await ───────────────────────
# Between here and _run_agent registering the real AIAgent, there
# are numerous await points (hooks, vision enrichment, STT,

View File

@@ -1741,6 +1741,22 @@ DEFAULT_CONFIG = {
"secret": "", # token-signing key; blank → random per-process
"session_ttl_seconds": 0, # 0 → plugin default (12h)
},
# Drain-control service-credential configuration — read by the
# bundled ``dashboard_auth/drain`` plugin (the first consumer of the
# generic non-interactive token-auth capability). The SECRET itself
# is a credential and is NOT configured here: it is provisioned by
# nous-account-service at deploy time via the
# ``HERMES_DASHBOARD_DRAIN_SECRET`` env var (the .env-is-for-secrets
# rule). These are the behavioural knobs only. The plugin is a no-op
# unless that env var is set to a >=256-bit secret; a weak secret is
# rejected at registration (fail-closed) and the drain endpoint stays
# disabled. ``scope`` is the capability label attached to the verified
# principal; ``min_secret_chars`` is the entropy bar (url-safe-b64
# chars; 43 ~= 256 bits).
"drain_auth": {
"scope": "drain",
"min_secret_chars": 43,
},
# Public URL override (env: ``HERMES_DASHBOARD_PUBLIC_URL``).
# When set, this is the complete authority — scheme + host +
# optional path prefix (e.g. ``https://example.com/hermes``) —

View File

@@ -12,6 +12,7 @@ default. Third parties register their own providers via the plugin hook
from hermes_cli.dashboard_auth.base import (
DashboardAuthProvider,
Session,
TokenPrincipal,
LoginStart,
InvalidCodeError,
InvalidCredentialsError,
@@ -23,12 +24,14 @@ from hermes_cli.dashboard_auth.registry import (
register_provider,
get_provider,
list_providers,
list_token_providers,
clear_providers,
)
__all__ = [
"DashboardAuthProvider",
"Session",
"TokenPrincipal",
"LoginStart",
"InvalidCodeError",
"InvalidCredentialsError",
@@ -38,5 +41,6 @@ __all__ = [
"register_provider",
"get_provider",
"list_providers",
"list_token_providers",
"clear_providers",
]

View File

@@ -47,6 +47,8 @@ class AuditEvent(enum.Enum):
SESSION_VERIFY_FAILURE = "session_verify_failure"
WS_TICKET_MINTED = "ws_ticket_minted"
WS_TICKET_REJECTED = "ws_ticket_rejected"
TOKEN_AUTH_SUCCESS = "token_auth_success"
TOKEN_AUTH_FAILURE = "token_auth_failure"
def _resolve_log_path() -> Path:

View File

@@ -25,6 +25,34 @@ class Session:
refresh_token: str
@dataclass(frozen=True)
class TokenPrincipal:
"""A verified non-interactive (service-to-service) caller.
The token analog of :class:`Session`. Where a ``Session`` represents an
interactive human identity behind a session cookie, a ``TokenPrincipal``
represents a machine/service caller that authenticated by presenting a
bearer token in the ``Authorization`` request header on a single
request — no login, no cookie, no refresh.
Returned by :meth:`DashboardAuthProvider.verify_token` and attached to
``request.state.token_principal`` by the token-auth middleware seam so a
route handler can see *who* called it.
Fields:
* ``principal`` — stable identifier for the caller (e.g. the provider
name, a service account id, or an agent id). Opaque to the seam.
* ``provider`` — the ``name`` of the provider that verified the token.
* ``scopes`` — capability strings this principal is authorised for.
Empty tuple means "unscoped" (the provider vouches for the caller but
attaches no capability list); a route MAY enforce a required scope.
"""
principal: str
provider: str
scopes: tuple[str, ...] = ()
@dataclass(frozen=True)
class LoginStart:
"""First leg of the OAuth round trip.
@@ -131,6 +159,18 @@ class DashboardAuthProvider(ABC):
# and are completely unaffected.
supports_password: bool = False
# When True, this provider can verify a non-interactive bearer token
# (``verify_token``) presented on a single request by a service-to-service
# caller — no login, no cookie, no refresh. This is the generic
# API-token capability flag, mirroring ``supports_password``: a route
# opts into token auth (see ``token_auth`` middleware seam) and the
# gate consults every ``supports_token`` provider in turn until one
# recognises the token. OAuth/password providers leave this False and
# are completely unaffected. The drain bearer-secret plugin is the
# first consumer, but the capability is deliberately generic so any
# future machine-credential provider drops in without core changes.
supports_token: bool = False
@abstractmethod
def start_login(self, *, redirect_uri: str) -> LoginStart: ...
@@ -183,6 +223,39 @@ class DashboardAuthProvider(ABC):
"complete_password_login)"
)
def verify_token(self, *, token: str) -> "Optional[TokenPrincipal]":
"""Verify a non-interactive bearer token; return its principal.
The token analog of ``verify_session``. Only consulted when
``supports_token`` is True. Called by the ``token_auth`` middleware
seam for every request to a token-authable route, in registration
order, until one provider returns a non-None principal.
Contract (mirrors ``verify_session`` stacking semantics):
* Return a :class:`TokenPrincipal` if this provider recognises and
accepts the token.
* Return ``None`` for a token this provider does NOT recognise —
never raise, so the seam can fall through to the next provider.
A malformed/expired/wrong token is "not recognised" → ``None``.
* Raise ``ProviderError`` ONLY for a genuine backing-store outage
(the provider can neither confirm nor deny). The seam treats this
like ``verify_session``: remember it, keep trying other providers,
and surface 503 only if NO provider accepts the token AND at least
one was unreachable.
Implementations MUST use a constant-time comparison
(``hmac.compare_digest``) when matching a shared secret so the
endpoint isn't a timing oracle.
The default raises ``NotImplementedError`` so a provider that sets
``supports_token`` but forgets to implement this fails loudly rather
than silently accepting every caller.
"""
raise NotImplementedError(
f"{type(self).__name__} does not support token auth "
"(set supports_token = True and override verify_token)"
)
def assert_protocol_compliance(cls: type) -> None:
"""Raise ``TypeError`` if ``cls`` doesn't fully implement the provider protocol.

View File

@@ -181,6 +181,13 @@ async def gated_auth_middleware(
if not getattr(request.app.state, "auth_required", False):
return await call_next(request)
# A request already authenticated by the token-auth seam (a service caller
# on a registered token route) carries ``token_authenticated`` — it is NOT
# a cookie session and must not be bounced to /login. Pass it through; the
# seam already attached ``request.state.token_principal``.
if getattr(request.state, "token_authenticated", False):
return await call_next(request)
path = request.url.path
if _path_is_public(path):
return await call_next(request)

View File

@@ -52,6 +52,20 @@ def list_providers() -> List[DashboardAuthProvider]:
return list(_providers.values())
def list_token_providers() -> List[DashboardAuthProvider]:
"""Registered providers that support non-interactive token auth.
The subset of ``list_providers()`` whose ``supports_token`` flag is True,
in registration order. The ``token_auth`` middleware seam consults these
(and only these) when a token-authable route is hit, so OAuth/password-only
providers are never asked to ``verify_token``. Returns an empty list when
no token provider is registered — a token-authable route then fails
closed (401), never open.
"""
with _lock:
return [p for p in _providers.values() if getattr(p, "supports_token", False)]
def clear_providers() -> None:
"""Test-only: drop all registrations."""
with _lock:

View File

@@ -0,0 +1,194 @@
"""Route-agnostic non-interactive (bearer-token) auth seam for the dashboard.
This is the generic API-token capability (decisions.md Q-C): a reusable seam
that ANY service-to-service / machine-credential provider plugs into, NOT a
drain-specific hook. The drain bearer-secret plugin is merely the first
consumer.
How it fits the existing auth framework:
* The interactive gate (``gated_auth_middleware``) authenticates a human
via a session cookie on every non-public route. A service caller has no
cookie — it presents a bearer token in the ``Authorization`` header on a
single request. That is what this seam verifies.
* A route opts in by registering its exact path via
:func:`register_token_route`. Only registered paths are token-authable;
everything else is untouched, so this can never accidentally widen the
auth surface of an existing route.
* :func:`token_auth_middleware` runs OUTERMOST (installed last in
``web_server.py``). For a token route it fully owns the auth decision:
authenticate via the stacked token providers, attach the verified
:class:`~hermes_cli.dashboard_auth.base.TokenPrincipal` to
``request.state.token_principal`` + set ``request.state.token_authenticated``,
and pass through; otherwise reject (401 unauthenticated, or 503 when a
provider's backing store was unreachable). The downstream cookie/session
gates honour ``token_authenticated`` and skip enforcement, so a
token-authed service request is never bounced to ``/login``.
* Fails closed: a token route with no registered token provider, no token,
or an unrecognised token gets 401 — never an open pass-through.
Provider stacking mirrors ``verify_session``: each ``supports_token`` provider
is consulted in registration order until one returns a principal. A provider
that doesn't recognise the token returns ``None`` and the seam moves on; a
provider whose backing store is unreachable raises ``ProviderError``, which the
seam remembers and surfaces as 503 only if NO provider accepts the token.
"""
from __future__ import annotations
import logging
import threading
from typing import Awaitable, Callable, Optional, Tuple
from fastapi import Request
from fastapi.responses import JSONResponse, Response
from hermes_cli.dashboard_auth import list_token_providers
from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log
from hermes_cli.dashboard_auth.base import ProviderError, TokenPrincipal
_log = logging.getLogger(__name__)
# Exact paths that accept non-interactive bearer-token auth. A route registers
# itself here at import/startup; the seam only acts on registered paths.
_token_routes: set[str] = set()
_lock = threading.Lock()
def register_token_route(path: str) -> None:
"""Mark ``path`` (exact match) as token-authable.
Idempotent. Call at module import / app setup so the seam knows which
routes to guard. Registering a route does NOT make it public — it makes
it authenticate by token instead of by session cookie.
"""
with _lock:
_token_routes.add(path)
def is_token_route(path: str) -> bool:
"""True if ``path`` was registered as token-authable (exact match)."""
with _lock:
return path in _token_routes
def clear_token_routes() -> None:
"""Test-only: drop all registered token routes."""
with _lock:
_token_routes.clear()
def _client_ip(request: Request) -> str:
fwd = request.headers.get("x-forwarded-for", "")
if fwd:
return fwd.split(",")[0].strip()
return request.client.host if request.client else ""
def extract_bearer_token(request: Request) -> str:
"""Return the bearer token from the ``Authorization`` header, or "".
Accepts ``<scheme> <token>`` where scheme is "bearer" (case-insensitive).
Returns an empty string for a missing/malformed header or a non-bearer
scheme — the caller treats "" as "no token presented".
"""
auth = request.headers.get("authorization", "")
parts = auth.split(" ", 1)
if len(parts) == 2 and parts[0].strip().lower() == "bearer":
return parts[1].strip()
return ""
def authenticate_token(
request: Request,
) -> Tuple[Optional[TokenPrincipal], Optional[str]]:
"""Try every token provider against the request's bearer token.
Returns ``(principal, unreachable_provider_name)``:
* ``(TokenPrincipal, None)`` — a provider recognised and accepted the token.
* ``(None, None)`` — no token, or no provider recognised it (reject 401).
* ``(None, name)`` — no provider accepted it AND at least one provider's
backing store was unreachable (the caller surfaces 503, not 401, so a
transient outage doesn't read as "bad credentials").
Never raises: a provider ``ProviderError`` is caught and remembered.
"""
token = extract_bearer_token(request)
if not token:
return None, None
unreachable: Optional[str] = None
for provider in list_token_providers():
try:
principal = provider.verify_token(token=token)
except ProviderError as e:
_log.warning(
"dashboard-auth: token provider %r unreachable during verify: %s",
provider.name, e,
)
if unreachable is None:
unreachable = provider.name
continue
except Exception as e: # noqa: BLE001 — a buggy provider must not 500 the gate
_log.warning(
"dashboard-auth: token provider %r raised during verify: %s",
provider.name, e,
)
continue
if principal is not None:
return principal, None
return None, unreachable
async def token_auth_middleware(
request: Request,
call_next: Callable[[Request], Awaitable[Response]],
) -> Response:
"""Outermost auth seam for token-authable routes.
No-op pass-through for any path not registered via
:func:`register_token_route`. For a registered path, token auth is the
only accepted scheme:
* valid token → attach principal + ``token_authenticated`` flag, pass through.
* unreachable → 503 (provider backing store down; not "bad credentials").
* otherwise → 401 unauthenticated.
Runs before the cookie/session gates (installed last in ``web_server.py``).
The cookie gates honour ``request.state.token_authenticated`` and skip
enforcement, so a token-authed request is never redirected to ``/login``.
"""
path = request.url.path
if not is_token_route(path):
return await call_next(request)
principal, unreachable = authenticate_token(request)
if principal is not None:
request.state.token_principal = principal
request.state.token_authenticated = True
return await call_next(request)
if unreachable:
audit_log(
AuditEvent.TOKEN_AUTH_FAILURE,
provider=unreachable,
reason="provider_unreachable",
path=path,
ip=_client_ip(request),
)
return JSONResponse(
{"detail": f"Auth provider {unreachable!r} unreachable"},
status_code=503,
)
audit_log(
AuditEvent.TOKEN_AUTH_FAILURE,
reason="no_provider_recognises_token",
path=path,
ip=_client_ip(request),
)
return JSONResponse(
{"error": "unauthenticated", "detail": "Unauthorized"},
status_code=401,
)

View File

@@ -468,6 +468,11 @@ async def _dashboard_auth_gate(request: Request, call_next):
@app.middleware("http")
async def auth_middleware(request: Request, call_next):
"""Require the session token on all /api/ routes except the public list."""
# A request already authenticated by the token-auth seam (a service caller
# presenting a bearer token on a registered token route) carries
# ``token_authenticated`` — never bounce it through the cookie/session gate.
if getattr(request.state, "token_authenticated", False):
return await call_next(request)
# When the OAuth gate is active, cookie-based auth (gated_auth_middleware
# above) is authoritative. The legacy _SESSION_TOKEN path is loopback-only
# and is skipped here so the gate's session attachment isn't overridden.
@@ -483,6 +488,20 @@ async def auth_middleware(request: Request, call_next):
return await call_next(request)
@app.middleware("http")
async def _token_auth_seam(request: Request, call_next):
"""Outermost auth seam: non-interactive bearer-token auth for opted-in routes.
Registered LAST so it runs FIRST (Starlette middleware is outermost-last).
A registered token route is fully owned here — authenticate by token,
attach the principal + ``token_authenticated`` flag, and let the downstream
cookie/session gates skip enforcement. Non-token routes pass straight
through untouched.
"""
from hermes_cli.dashboard_auth.token_auth import token_auth_middleware
return await token_auth_middleware(request, call_next)
# ---------------------------------------------------------------------------
# Config schema — auto-generated from DEFAULT_CONFIG
# ---------------------------------------------------------------------------
@@ -2481,6 +2500,71 @@ async def restart_gateway(profile: Optional[str] = None):
}
@app.post("/api/gateway/drain")
async def gateway_drain(request: Request):
"""Begin or cancel an external (NAS-driven) gateway drain.
Authenticated by the non-interactive token-auth seam: the
``dashboard_auth/drain`` plugin registers this exact path as a token route
and verifies the ``Authorization`` bearer secret. If that plugin isn't
active (no ``HERMES_DASHBOARD_DRAIN_SECRET``), the route is NOT a token
route, so on a gated bind the cookie gate handles it (a browser session can
still drive it from the dashboard) and on a loopback bind the legacy
session-token gate applies — either way it is never unauthenticated on a
network-exposed bind.
Body: ``{"action": "drain"}`` (begin) or ``{"action": "cancel"}`` (cancel).
Begin writes the ``.drain_request.json`` marker the gateway's
``_drain_control_watcher`` observes (flip to ``draining`` + refuse new
turns); cancel removes it (revert to ``running`` + re-accept). Idempotent
on both sides. This endpoint only writes/removes the marker — the gateway
process owns the actual state transition (there is no HTTP control channel
into the running gateway; the marker IS the channel, decisions.md Q-B).
The force-override (D6: "unless a user commands it") is NOT here — an
immediate, drain-skipping action maps onto the existing
``POST /api/gateway/restart`` force path, which supersedes a drain.
"""
from gateway.drain_control import (
clear_drain_request,
drain_requested,
write_drain_request,
)
try:
body = await request.json()
except Exception:
body = {}
action = str((body or {}).get("action", "drain")).strip().lower()
# Attribute the request to the verified token principal when present
# (token-auth seam attaches it); fall back to a generic label otherwise.
principal_obj = getattr(request.state, "token_principal", None)
principal = getattr(principal_obj, "principal", None) or "dashboard"
if action == "cancel":
existed = clear_drain_request()
_log.info("Gateway drain CANCEL requested by %s (existed=%s)", principal, existed)
return {"ok": True, "action": "cancel", "was_draining": existed}
if action != "drain":
raise HTTPException(
status_code=400,
detail=f"Unknown drain action {action!r}; expected 'drain' or 'cancel'",
)
payload = write_drain_request(principal=str(principal))
_log.info("Gateway drain BEGIN requested by %s", principal)
return {
"ok": True,
"action": "drain",
"requested_at": payload["requested_at"],
# Echo so a caller polling /api/status knows the marker is now set;
# the gateway watcher flips gateway_state -> draining within ~1s.
"draining": drain_requested(),
}
@app.post("/api/hermes/update")
async def update_hermes():
"""Kick off ``hermes update`` in the background."""

View File

@@ -0,0 +1,290 @@
"""DrainSecretProvider — shared-bearer-secret auth for the drain-control endpoint.
Task 2.0b of the safe-shutdown plan, and the FIRST consumer of the generic
non-interactive token-auth capability added in Task 2.0a
(``supports_token`` / ``verify_token`` on the ``DashboardAuthProvider`` ABC +
the route-agnostic ``token_auth`` middleware seam).
What it is
----------
A service-to-service auth provider. ``nous-account-service`` (NAS) provisions a
**per-agent unique** shared secret into each deployed agent's environment; this
provider verifies an inbound ``Authorization`` bearer token against that secret
with a constant-time compare and, on a match, vouches for the caller as the
``drain-control`` principal. It is NOT an interactive identity provider — there
is no login, cookie, session, or refresh. It implements ONLY the token
capability (``supports_token = True`` + ``verify_token``); the five interactive
ABC methods raise ``NotImplementedError``.
Why a plugin (not an ad-hoc header check on the drain route)
------------------------------------------------------------
Decisions.md Q-A: the drain credential MUST be a real auth plugin in the
dashboard auth framework, not a bolt-on. Q-C: the framework widening that
hosts it is generic (Task 2.0a) and this plugin is merely its first consumer.
Security properties (decisions.md Q-A)
--------------------------------------
* **Per-agent unique secret** — each agent gets a distinct secret; a leak's
blast radius is one agent.
* **Entropy gate at registration** — a weak/short/low-entropy secret fails
CLOSED at load (the plugin declines to register and records a skip reason);
it is never silently accepted. Bar: >= 256 bits of entropy / >= 43
url-safe-base64 chars, and the value must not be obviously structured
(all-one-character, too few distinct characters).
* **Constant-time compare** — ``hmac.compare_digest`` on the request path, so
the endpoint is not a timing oracle.
Configuration
-------------
The secret is a CREDENTIAL, so it is carried via an env var (the ``.env``-is-
for-secrets-only rule), provisioned by NAS at deploy time (Phase 3):
HERMES_DASHBOARD_DRAIN_SECRET # the per-agent shared secret (>=43 url-safe-b64 chars)
Behavioural knobs live in config.yaml (canonical surface):
dashboard:
drain_auth:
scope: drain # capability label attached to the principal
min_secret_chars: 43 # entropy bar (optional; default 43 ~= 256 bits)
When ``HERMES_DASHBOARD_DRAIN_SECRET`` is unset, the plugin is a no-op (records
a skip reason) — agents that don't want NAS-driven drain just don't set it.
"""
from __future__ import annotations
import hmac
import logging
import math
import os
from collections import Counter
from typing import Optional
from hermes_cli.dashboard_auth import (
DashboardAuthProvider,
LoginStart,
Session,
TokenPrincipal,
)
logger = logging.getLogger(__name__)
# Default entropy bar: 43 url-safe-base64 chars ~= 256 bits. token_urlsafe(32)
# produces 43 chars, so a correctly-provisioned secret clears this exactly.
_DEFAULT_MIN_SECRET_CHARS = 43
# A secret must contain at least this many DISTINCT characters — rejects
# degenerate values like "aaaa..." that are long but trivially low-entropy.
_MIN_DISTINCT_CHARS = 16
# Shannon entropy floor (bits) over the secret's characters — a second,
# distribution-aware guard on top of the length + distinct-count checks.
_MIN_SHANNON_BITS = 128.0
# The path the begin/cancel-drain endpoint lives on. Registered as a
# token-authable route by ``register()`` so the generic seam guards it. Kept
# here (not imported from web_server) to avoid a heavy import at plugin load.
DRAIN_ROUTE_PATH = "/api/gateway/drain"
LAST_SKIP_REASON: str = ""
def _shannon_bits(value: str) -> float:
"""Total Shannon entropy (bits) of ``value`` over its character distribution.
H = len * sum(-p_i * log2(p_i)). A long string drawn from a wide alphabet
scores high; a long run of one character scores ~0.
"""
if not value:
return 0.0
counts = Counter(value)
n = len(value)
per_char = -sum((c / n) * math.log2(c / n) for c in counts.values())
return per_char * n
def assess_secret_strength(
secret: str, *, min_chars: int = _DEFAULT_MIN_SECRET_CHARS
) -> Optional[str]:
"""Return a rejection reason if ``secret`` is too weak, else ``None``.
Fail-closed entropy gate (decisions.md Q-A). Checks, in order:
* length >= ``min_chars`` (default 43 url-safe-b64 chars ~= 256 bits),
* at least ``_MIN_DISTINCT_CHARS`` distinct characters,
* Shannon entropy >= ``_MIN_SHANNON_BITS`` bits.
A ``None`` return means the secret passes. Any string return is a
human-readable reason the caller logs + records as the skip reason.
"""
if not secret:
return "secret is empty"
if len(secret) < min_chars:
return (
f"secret too short: {len(secret)} chars (need >= {min_chars}; "
"use a >=256-bit value, e.g. `python -c \"import secrets; "
"print(secrets.token_urlsafe(32))\"`)"
)
distinct = len(set(secret))
if distinct < _MIN_DISTINCT_CHARS:
return (
f"secret has only {distinct} distinct characters (need >= "
f"{_MIN_DISTINCT_CHARS}); looks structured/low-entropy"
)
bits = _shannon_bits(secret)
if bits < _MIN_SHANNON_BITS:
return (
f"secret entropy too low: {bits:.0f} bits (need >= "
f"{_MIN_SHANNON_BITS:.0f}); looks structured/repeated"
)
return None
class DrainSecretProvider(DashboardAuthProvider):
"""Non-interactive shared-bearer-secret provider for drain control."""
name = "drain-secret"
display_name = "Drain Control (service credential)"
supports_token = True
def __init__(self, *, secret: str, scope: str = "drain") -> None:
# Defence in depth: construction also enforces the entropy bar, so a
# caller that bypasses register()'s check still can't build a weak
# provider. register() does the friendly skip-reason path; this raises.
reason = assess_secret_strength(secret)
if reason is not None:
raise ValueError(f"drain secret rejected: {reason}")
self._secret = secret
self._scope = scope or "drain"
# ---- token capability (the only thing this provider implements) --------
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
"""Constant-time compare against the per-agent shared secret.
Returns a ``drain-control`` principal on an exact match, else ``None``
(the generic seam falls through / fails closed). Uses
``hmac.compare_digest`` so a wrong token can't be recovered by timing.
"""
if not token:
return None
if hmac.compare_digest(token.encode("utf-8"), self._secret.encode("utf-8")):
return TokenPrincipal(
principal="drain-control",
provider=self.name,
scopes=(self._scope,),
)
return None
# ---- interactive methods: unsupported (service credential only) --------
def start_login(self, *, redirect_uri: str) -> LoginStart:
raise NotImplementedError(
"DrainSecretProvider is a non-interactive service credential; "
"there is no login flow."
)
def complete_login(
self, *, code: str, state: str, code_verifier: str, redirect_uri: str
) -> Session:
raise NotImplementedError(
"DrainSecretProvider is a non-interactive service credential."
)
def verify_session(self, *, access_token: str) -> Optional[Session]:
# Not a cookie-session provider — it never mints a Session, so it can
# never recognise a session cookie. Return None (don't raise) so it
# stacks harmlessly in the cookie-verify loop.
return None
def refresh_session(self, *, refresh_token: str) -> Session:
raise NotImplementedError(
"DrainSecretProvider is a non-interactive service credential."
)
def revoke_session(self, *, refresh_token: str) -> None:
return None
# ---------------------------------------------------------------------------
# Plugin entry point
# ---------------------------------------------------------------------------
def _load_config_drain_auth_section() -> dict:
"""Return ``dashboard.drain_auth`` from config.yaml, or ``{}``."""
try:
from hermes_cli.config import cfg_get, load_config
cfg = load_config()
except Exception as exc: # noqa: BLE001 — broad catch is intentional
logger.debug(
"dashboard-auth-drain: load_config() raised %s; "
"falling back to env-only configuration",
exc,
)
return {}
section = cfg_get(cfg, "dashboard", "drain_auth", default=None)
return section if isinstance(section, dict) else {}
def register(ctx) -> None:
"""Plugin entry — registers DrainSecretProvider when a strong secret is set.
No-op (records a skip reason) when ``HERMES_DASHBOARD_DRAIN_SECRET`` is
unset or fails the entropy gate. On success, also registers the
begin/cancel-drain route as token-authable via the generic seam.
"""
global LAST_SKIP_REASON
LAST_SKIP_REASON = ""
secret = os.environ.get("HERMES_DASHBOARD_DRAIN_SECRET", "").strip()
if not secret:
LAST_SKIP_REASON = (
"HERMES_DASHBOARD_DRAIN_SECRET is not set. Set a per-agent "
">=256-bit secret (e.g. `python -c \"import secrets; "
"print(secrets.token_urlsafe(32))\"`) to enable NAS-driven drain "
"coordination; leave it unset to disable the drain endpoint."
)
logger.debug("dashboard-auth-drain: %s", LAST_SKIP_REASON)
return
section = _load_config_drain_auth_section()
scope = str(section.get("scope", "drain") or "drain").strip() or "drain"
try:
min_chars = int(section.get("min_secret_chars", _DEFAULT_MIN_SECRET_CHARS))
except (TypeError, ValueError):
min_chars = _DEFAULT_MIN_SECRET_CHARS
reason = assess_secret_strength(secret, min_chars=min_chars)
if reason is not None:
LAST_SKIP_REASON = (
f"HERMES_DASHBOARD_DRAIN_SECRET rejected — {reason}. "
"The drain endpoint stays disabled (fail-closed)."
)
logger.warning("dashboard-auth-drain: %s", LAST_SKIP_REASON)
return
try:
provider = DrainSecretProvider(secret=secret, scope=scope)
except ValueError as exc:
LAST_SKIP_REASON = f"DrainSecretProvider construction failed: {exc}"
logger.warning("dashboard-auth-drain: %s", LAST_SKIP_REASON)
return
ctx.register_dashboard_auth_provider(provider)
# Opt the begin/cancel-drain endpoint into the generic token-auth seam so
# the dashboard's interactive cookie gate doesn't bounce NAS's bearer call.
try:
from hermes_cli.dashboard_auth.token_auth import register_token_route
register_token_route(DRAIN_ROUTE_PATH)
except Exception as exc: # noqa: BLE001 — seam import must not crash plugin load
logger.warning(
"dashboard-auth-drain: could not register token route %s: %s",
DRAIN_ROUTE_PATH, exc,
)
logger.info(
"dashboard-auth-drain: registered drain service-credential provider "
"(scope=%s, route=%s)",
scope, DRAIN_ROUTE_PATH,
)

View File

@@ -0,0 +1,7 @@
name: drain
version: 1.0.0
description: "Dashboard auth provider — non-interactive shared-bearer-secret for the gateway drain-control endpoint. The first consumer of the generic token-auth capability (supports_token/verify_token). nous-account-service provisions a per-agent unique secret via HERMES_DASHBOARD_DRAIN_SECRET; this provider verifies an inbound Authorization bearer token against it with a constant-time compare and registers /api/gateway/drain as token-authable. Fails CLOSED: a weak/short/low-entropy secret (< 256 bits) is rejected at registration and the endpoint stays disabled. No-op when the env var is unset. Behavioural knobs (scope, min_secret_chars) live under dashboard.drain_auth in config.yaml."
author: NousResearch
kind: backend
requires_env:
- HERMES_DASHBOARD_DRAIN_SECRET

View File

@@ -0,0 +1,196 @@
"""Tests for the external drain-control marker contract + gateway state machine.
Task 2.2/2.3. Two layers:
* drain_control.py — the presence-based marker contract (write/clear/read,
HERMES_HOME-scoped, never-raises).
* GatewayRunner enter/exit/watcher + the new-turn accept gate — the
reversible state machine driven by the marker.
Mocked tests are necessary-not-sufficient here (the HARD live-validation gate,
Q-B, exercises a real `hermes gateway run`); these lock the unit contract.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from unittest.mock import MagicMock
import pytest
import gateway.drain_control as dc
from gateway.run import GatewayRunner
from gateway.platforms.base import MessageEvent, MessageType
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
# ---------------------------------------------------------------------------
# Marker contract (drain_control.py)
# ---------------------------------------------------------------------------
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
return tmp_path
class TestMarkerContract:
def test_absent_by_default(self, home):
assert dc.drain_requested() is False
assert dc.read_drain_request() is None
def test_write_then_present(self, home):
payload = dc.write_drain_request(principal="nas")
assert dc.drain_requested() is True
assert payload["action"] == "drain"
assert payload["principal"] == "nas"
body = dc.read_drain_request()
assert body is not None and body["principal"] == "nas"
def test_clear_removes(self, home):
dc.write_drain_request()
assert dc.clear_drain_request() is True
assert dc.drain_requested() is False
# idempotent: clearing again is a no-op, returns False
assert dc.clear_drain_request() is False
def test_path_respects_hermes_home(self, home):
assert dc.drain_request_path() == home / ".drain_request.json"
def test_corrupt_marker_reads_as_present_contentless(self, home):
# A half-written / malformed marker must still count as "drain active"
# (fail-safe toward quiescing).
dc.drain_request_path().write_text("{not valid json", encoding="utf-8")
assert dc.drain_requested() is True
assert dc.read_drain_request() == {}
def test_write_is_atomic_json(self, home):
dc.write_drain_request(principal="x")
import json
data = json.loads(dc.drain_request_path().read_text())
assert data["action"] == "drain"
# ---------------------------------------------------------------------------
# Gateway state machine (enter / exit / idempotency)
# ---------------------------------------------------------------------------
def _drain_runner():
runner, adapter = make_restart_runner()
runner._external_drain_active = False
# Bind the real methods under test.
runner._enter_external_drain = GatewayRunner._enter_external_drain.__get__(
runner, GatewayRunner
)
runner._exit_external_drain = GatewayRunner._exit_external_drain.__get__(
runner, GatewayRunner
)
return runner, adapter
class TestDrainStateMachine:
def test_enter_sets_flag_and_flips_state(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
assert runner._external_drain_active is True
runner._update_runtime_status.assert_called_with("draining")
def test_enter_idempotent(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._enter_external_drain() # second call — no-op
runner._update_runtime_status.assert_not_called()
def test_exit_reverts_to_running(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._exit_external_drain()
assert runner._external_drain_active is False
runner._update_runtime_status.assert_called_with("running")
def test_exit_idempotent_when_not_draining(self):
runner, _ = _drain_runner()
runner._exit_external_drain() # never entered — no-op
runner._update_runtime_status.assert_not_called()
def test_exit_during_shutdown_does_not_revert_to_running(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
# A shutdown drain is now in progress — exit must NOT resurrect running.
runner._draining = True
runner._exit_external_drain()
assert runner._external_drain_active is False
runner._update_runtime_status.assert_not_called()
def test_exit_when_loop_stopped_does_not_revert(self):
runner, _ = _drain_runner()
runner._enter_external_drain()
runner._update_runtime_status.reset_mock()
runner._running = False
runner._exit_external_drain()
runner._update_runtime_status.assert_not_called()
# ---------------------------------------------------------------------------
# Watcher reconciliation
# ---------------------------------------------------------------------------
class TestDrainWatcher:
@pytest.mark.asyncio
async def test_watcher_enters_then_exits_with_marker(self, home):
runner, _ = _drain_runner()
runner._drain_control_watcher = GatewayRunner._drain_control_watcher.__get__(
runner, GatewayRunner
)
# Drive a few ticks manually rather than spinning the loop.
dc.write_drain_request()
task = asyncio.create_task(runner._drain_control_watcher(interval=0.02))
await asyncio.sleep(0.06)
assert runner._external_drain_active is True
dc.clear_drain_request()
await asyncio.sleep(0.06)
assert runner._external_drain_active is False
runner._running = False
await asyncio.sleep(0.04)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# ---------------------------------------------------------------------------
# New-turn accept gate
# ---------------------------------------------------------------------------
class TestNewTurnGate:
@pytest.mark.asyncio
async def test_new_turn_refused_during_external_drain(self):
runner, _ = _drain_runner()
runner._external_drain_active = True
event = MessageEvent(
text="hello",
message_type=MessageType.TEXT,
source=make_restart_source(),
message_id="m1",
)
result = await runner._handle_message(event)
assert result is not None
assert "draining" in result.lower()
@pytest.mark.asyncio
async def test_in_flight_turn_not_interrupted_by_drain(self):
# Entering drain must NOT touch the running-agents set.
runner, _ = _drain_runner()
sentinel = MagicMock()
runner._running_agents["k"] = sentinel
runner._enter_external_drain()
assert runner._running_agents.get("k") is sentinel
sentinel.interrupt.assert_not_called()

View File

@@ -0,0 +1,316 @@
"""Contract tests for the generic non-interactive (bearer-token) auth seam.
Covers Task 2.0a: the reusable token-auth capability in the dashboard auth
framework — NOT the drain plugin (that's 2.0b/2.1). Asserts the ABC capability
flag, the registry filter, bearer extraction, provider stacking (verify_token),
and the route-agnostic middleware seam's fail-closed / 503 / pass-through
behaviour.
"""
from __future__ import annotations
import asyncio
from typing import Optional
import pytest
from hermes_cli.dashboard_auth import (
DashboardAuthProvider,
LoginStart,
Session,
TokenPrincipal,
clear_providers,
list_token_providers,
register_provider,
)
from hermes_cli.dashboard_auth.base import ProviderError
from hermes_cli.dashboard_auth import token_auth
# --------------------------------------------------------------------------
# Test doubles
# --------------------------------------------------------------------------
class _OAuthOnly(DashboardAuthProvider):
"""A pure interactive provider — never token-authable."""
name = "oauth-only"
display_name = "OAuth Only"
def start_login(self, *, redirect_uri):
return LoginStart(redirect_url="x", cookie_payload={})
def complete_login(self, *, code, state, code_verifier, redirect_uri):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def verify_session(self, *, access_token):
return None
def refresh_session(self, *, refresh_token):
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
def revoke_session(self, *, refresh_token):
return None
class _TokenProvider(_OAuthOnly):
"""A token provider that accepts exactly one secret."""
name = "tok"
display_name = "Token Provider"
supports_token = True
def __init__(self, *, secret: str = "good-secret", scopes=("drain",)):
self._secret = secret
self._scopes = tuple(scopes)
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
if token == self._secret:
return TokenPrincipal(
principal=self.name, provider=self.name, scopes=self._scopes
)
return None
class _UnreachableTokenProvider(_OAuthOnly):
name = "tok-down"
display_name = "Unreachable Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise ProviderError("backing store down")
class _BuggyTokenProvider(_OAuthOnly):
name = "tok-buggy"
display_name = "Buggy Token Provider"
supports_token = True
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
raise RuntimeError("kaboom")
# --------------------------------------------------------------------------
# Fixtures
# --------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _isolated_state():
clear_providers()
token_auth.clear_token_routes()
yield
clear_providers()
token_auth.clear_token_routes()
class _FakeURL:
def __init__(self, path):
self.path = path
class _FakeClient:
host = "1.2.3.4"
class _FakeRequest:
"""Minimal Request stand-in for the seam (no real Starlette needed)."""
def __init__(self, path="/api/gateway/drain", headers=None):
self.url = _FakeURL(path)
self.headers = headers or {}
self.client = _FakeClient()
class _State:
pass
self.state = _State()
def _run(coro):
return asyncio.run(coro)
# --------------------------------------------------------------------------
# ABC + registry
# --------------------------------------------------------------------------
def test_oauth_provider_defaults_supports_token_false():
assert _OAuthOnly().supports_token is False
def test_oauth_provider_verify_token_raises_not_implemented():
with pytest.raises(NotImplementedError):
_OAuthOnly().verify_token(token="x")
def test_list_token_providers_filters_to_supports_token():
register_provider(_OAuthOnly())
register_provider(_TokenProvider())
names = [p.name for p in list_token_providers()]
assert names == ["tok"]
def test_list_token_providers_empty_when_none_registered():
register_provider(_OAuthOnly())
assert list_token_providers() == []
# --------------------------------------------------------------------------
# Bearer extraction
# --------------------------------------------------------------------------
@pytest.mark.parametrize(
"header,expected",
[
("Bearer abc123", "abc123"),
("bearer abc123", "abc123"),
("BEARER abc123", "abc123"),
("Bearer spaced ", "spaced"),
("Basic abc123", ""),
("abc123", ""),
("", ""),
],
)
def test_extract_bearer_token(header, expected):
req = _FakeRequest(headers={"authorization": header} if header else {})
assert token_auth.extract_bearer_token(req) == expected
# --------------------------------------------------------------------------
# authenticate_token (provider stacking)
# --------------------------------------------------------------------------
def test_authenticate_token_accepts_valid():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer good-secret"})
principal, unreachable = token_auth.authenticate_token(req)
assert unreachable is None
assert principal is not None
assert principal.provider == "tok"
assert principal.scopes == ("drain",)
def test_authenticate_token_rejects_wrong_secret():
register_provider(_TokenProvider(secret="good-secret"))
req = _FakeRequest(headers={"authorization": "Bearer wrong"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable is None
def test_authenticate_token_no_token_returns_none():
register_provider(_TokenProvider())
req = _FakeRequest(headers={})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None and unreachable is None
def test_authenticate_token_stacks_first_match_wins():
register_provider(_TokenProvider(secret="aaa"))
second = _TokenProvider(secret="bbb")
second.name = "tok2"
register_provider(second)
req = _FakeRequest(headers={"authorization": "Bearer bbb"})
principal, _ = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok2"
def test_authenticate_token_unreachable_remembered():
register_provider(_UnreachableTokenProvider())
req = _FakeRequest(headers={"authorization": "Bearer anything"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is None
assert unreachable == "tok-down"
def test_authenticate_token_unreachable_then_valid_provider_wins():
register_provider(_UnreachableTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
# A later provider accepting the token beats the earlier outage.
assert principal is not None and principal.provider == "tok"
assert unreachable is None
def test_authenticate_token_buggy_provider_does_not_crash():
register_provider(_BuggyTokenProvider())
register_provider(_TokenProvider(secret="good"))
req = _FakeRequest(headers={"authorization": "Bearer good"})
principal, unreachable = token_auth.authenticate_token(req)
assert principal is not None and principal.provider == "tok"
# --------------------------------------------------------------------------
# Middleware seam (route-agnostic)
# --------------------------------------------------------------------------
async def _call_next_ok(request):
from fastapi.responses import JSONResponse
return JSONResponse({"ok": True}, status_code=200)
def test_seam_passthrough_for_unregistered_route():
register_provider(_TokenProvider())
req = _FakeRequest(path="/api/something-else")
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert getattr(req.state, "token_authenticated", False) is False
def test_seam_accepts_valid_token_on_registered_route():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain",
headers={"authorization": "Bearer good"},
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 200
assert req.state.token_authenticated is True
assert req.state.token_principal.provider == "tok"
def test_seam_rejects_missing_token_401():
register_provider(_TokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(path="/api/gateway/drain", headers={})
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_rejects_wrong_token_401():
register_provider(_TokenProvider(secret="good"))
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer bad"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_fails_closed_when_no_token_provider():
# Route registered but NO supports_token provider → 401, never open.
register_provider(_OAuthOnly())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer anything"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 401
def test_seam_503_on_provider_unreachable():
register_provider(_UnreachableTokenProvider())
token_auth.register_token_route("/api/gateway/drain")
req = _FakeRequest(
path="/api/gateway/drain", headers={"authorization": "Bearer x"}
)
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
assert resp.status_code == 503

View File

@@ -249,6 +249,50 @@ class TestWebServerEndpoints:
assert "active_sessions" in data
assert data["can_update_hermes"] is True
def test_gateway_drain_begin_writes_marker(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={"action": "drain"})
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True and data["action"] == "drain"
assert data["draining"] is True
assert drain_control.drain_requested() is True
# cleanup
drain_control.clear_drain_request()
def test_gateway_drain_defaults_to_begin(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={})
assert resp.status_code == 200
assert resp.json()["action"] == "drain"
assert drain_control.drain_requested() is True
drain_control.clear_drain_request()
def test_gateway_drain_cancel_removes_marker(self):
from gateway import drain_control
drain_control.write_drain_request()
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
assert resp.status_code == 200
data = resp.json()
assert data["ok"] is True and data["action"] == "cancel"
assert data["was_draining"] is True
assert drain_control.drain_requested() is False
def test_gateway_drain_cancel_idempotent(self):
from gateway import drain_control
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
assert resp.status_code == 200
assert resp.json()["was_draining"] is False
assert drain_control.drain_requested() is False
def test_gateway_drain_bad_action_400(self):
resp = self.client.post("/api/gateway/drain", json={"action": "explode"})
assert resp.status_code == 400
def test_get_status_hides_update_capability_in_managed_runtime(self, monkeypatch):
import hermes_cli.web_server as web_server

View File

@@ -0,0 +1,184 @@
"""Tests for the DrainSecretProvider plugin (non-interactive bearer secret).
Task 2.0b. Loads the bundled drain plugin module directly and exercises:
* the entropy gate (assess_secret_strength) — fail-closed on weak secrets,
* constant-time verify_token returning a scoped TokenPrincipal,
* the register(ctx) entry point's env/config resolution, skip reasons, and
token-route registration.
"""
from __future__ import annotations
import secrets
from unittest.mock import MagicMock
import pytest
import plugins.dashboard_auth.drain as drain_plugin
from hermes_cli.dashboard_auth import TokenPrincipal, assert_protocol_compliance
from hermes_cli.dashboard_auth import token_auth
@pytest.fixture(scope="module")
def drain():
return drain_plugin
@pytest.fixture(autouse=True)
def _clean_env_and_routes(monkeypatch):
monkeypatch.delenv("HERMES_DASHBOARD_DRAIN_SECRET", raising=False)
token_auth.clear_token_routes()
yield
token_auth.clear_token_routes()
def _strong_secret() -> str:
# token_urlsafe(32) → 43 url-safe-b64 chars ≈ 256 bits.
return secrets.token_urlsafe(32)
# ---------------------------------------------------------------------------
# Entropy gate
# ---------------------------------------------------------------------------
class TestEntropyGate:
def test_strong_secret_passes(self, drain):
assert drain.assess_secret_strength(_strong_secret()) is None
def test_empty_rejected(self, drain):
assert drain.assess_secret_strength("") is not None
def test_too_short_rejected(self, drain):
# 42 chars — one under the 43-char bar.
assert drain.assess_secret_strength("a1B2c3" * 7) is not None
def test_long_but_repeated_rejected(self, drain):
# 60 chars, one distinct character → low distinct count + low entropy.
assert drain.assess_secret_strength("a" * 60) is not None
def test_long_but_few_distinct_rejected(self, drain):
# 60 chars cycling through only 4 distinct characters.
assert drain.assess_secret_strength("abcd" * 15) is not None
def test_custom_min_chars_enforced(self, drain):
s = _strong_secret() # 43 chars
assert drain.assess_secret_strength(s, min_chars=999) is not None
# ---------------------------------------------------------------------------
# Provider behaviour
# ---------------------------------------------------------------------------
class TestProvider:
def test_protocol_compliance(self, drain):
assert_protocol_compliance(drain.DrainSecretProvider)
def test_supports_token_flag(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.supports_token is True
def test_verify_token_accepts_matching_secret(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="drain")
principal = p.verify_token(token=s)
assert isinstance(principal, TokenPrincipal)
assert principal.principal == "drain-control"
assert principal.provider == "drain-secret"
assert principal.scopes == ("drain",)
def test_verify_token_rejects_wrong_secret(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token=_strong_secret()) is None
def test_verify_token_rejects_empty(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_token(token="") is None
def test_custom_scope_attached(self, drain):
s = _strong_secret()
p = drain.DrainSecretProvider(secret=s, scope="lifecycle")
assert p.verify_token(token=s).scopes == ("lifecycle",)
def test_construction_rejects_weak_secret(self, drain):
with pytest.raises(ValueError):
drain.DrainSecretProvider(secret="weak")
def test_verify_session_returns_none_not_raises(self, drain):
# Stacks harmlessly in the cookie-verify loop.
p = drain.DrainSecretProvider(secret=_strong_secret())
assert p.verify_session(access_token="anything") is None
def test_interactive_methods_raise(self, drain):
p = drain.DrainSecretProvider(secret=_strong_secret())
with pytest.raises(NotImplementedError):
p.start_login(redirect_uri="r")
with pytest.raises(NotImplementedError):
p.complete_login(code="c", state="s", code_verifier="v", redirect_uri="r")
with pytest.raises(NotImplementedError):
p.refresh_session(refresh_token="r")
# ---------------------------------------------------------------------------
# register() entry point
# ---------------------------------------------------------------------------
class TestRegister:
def test_skips_when_no_secret(self, drain, monkeypatch):
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "HERMES_DASHBOARD_DRAIN_SECRET" in drain.LAST_SKIP_REASON
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_skips_and_fails_closed_on_weak_secret(self, drain, monkeypatch):
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", "tooweak")
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON
# fail-closed: the route is NOT token-authable, so it stays gated.
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_registers_with_strong_env_secret(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_called_once()
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert isinstance(provider, drain.DrainSecretProvider)
assert provider.verify_token(token=s) is not None
assert drain.LAST_SKIP_REASON == ""
# The drain endpoint is now token-authable.
assert token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
def test_config_scope_applied(self, drain, monkeypatch):
s = _strong_secret()
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain, "_load_config_drain_auth_section", lambda: {"scope": "lifecycle"}
)
ctx = MagicMock()
drain.register(ctx)
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
assert provider.verify_token(token=s).scopes == ("lifecycle",)
def test_config_min_secret_chars_can_reject_otherwise_ok_secret(
self, drain, monkeypatch
):
s = _strong_secret() # 43 chars — fine by default, too short at 999
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
monkeypatch.setattr(
drain,
"_load_config_drain_auth_section",
lambda: {"min_secret_chars": 999},
)
ctx = MagicMock()
drain.register(ctx)
ctx.register_dashboard_auth_provider.assert_not_called()
assert "rejected" in drain.LAST_SKIP_REASON