mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-24 02:43:18 +08:00
Compare commits
3 Commits
salvage/em
...
feat/dashb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
db827de802 | ||
|
|
ef5b2b3197 | ||
|
|
ab7bda4987 |
109
gateway/drain_control.py
Normal file
109
gateway/drain_control.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""External drain-control marker contract (dashboard → gateway).
|
||||
|
||||
Task 2.2 of the safe-shutdown plan (decisions.md Q-B, option A): the dashboard
|
||||
has no way to call into a running gateway — there is no HTTP control channel
|
||||
into the gateway process (guardrails: "there is NO external control channel
|
||||
into a running gateway"). Restart/drain is driven only by the gateway reacting
|
||||
to its own inputs: slash commands, process signals, and file markers it writes
|
||||
itself (``.restart_notify.json``).
|
||||
|
||||
So the begin/cancel-drain dashboard endpoint communicates with the running
|
||||
gateway the same way: it writes (or removes) a marker file, and a gateway
|
||||
background watcher reacts to it. This module owns that marker contract so both
|
||||
sides — the dashboard endpoint (writer) and the gateway watcher (reader) —
|
||||
share one definition and can never disagree.
|
||||
|
||||
Contract (presence-based, mirroring ``.restart_notify.json``):
|
||||
|
||||
* begin-drain → write ``{HERMES_HOME}/.drain_request.json`` with
|
||||
``{"action": "drain", "requested_at": <iso>, "principal": <str>}``.
|
||||
* cancel-drain → remove the marker.
|
||||
* The gateway watcher treats **presence** of the marker as "external drain
|
||||
active": flip ``gateway_state -> "draining"`` and stop accepting new turns.
|
||||
Absence means "not draining" (revert to ``running`` if we had flipped it).
|
||||
|
||||
Reading the marker never raises: a malformed/half-written file reads as
|
||||
"present but contentless", which the watcher still treats as drain-active
|
||||
(fail-safe toward quiescing — a corrupt begin marker must not be ignored).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from utils import atomic_json_write
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
_DRAIN_REQUEST_FILENAME = ".drain_request.json"
|
||||
|
||||
|
||||
def drain_request_path(home: Optional[Path] = None) -> Path:
|
||||
"""Absolute path to the drain-request marker, respecting HERMES_HOME."""
|
||||
base = home if home is not None else get_hermes_home()
|
||||
return Path(base) / _DRAIN_REQUEST_FILENAME
|
||||
|
||||
|
||||
def write_drain_request(
|
||||
*, principal: str = "drain-control", home: Optional[Path] = None
|
||||
) -> dict[str, Any]:
|
||||
"""Write the begin-drain marker. Returns the payload written.
|
||||
|
||||
Atomic write so the gateway watcher never reads a half-written file.
|
||||
Idempotent: re-writing while a drain is already in progress just refreshes
|
||||
``requested_at`` (harmless — the watcher keys off presence, not content).
|
||||
"""
|
||||
payload = {
|
||||
"action": "drain",
|
||||
"requested_at": datetime.now(timezone.utc).isoformat(),
|
||||
"principal": principal,
|
||||
}
|
||||
atomic_json_write(drain_request_path(home), payload)
|
||||
return payload
|
||||
|
||||
|
||||
def clear_drain_request(*, home: Optional[Path] = None) -> bool:
|
||||
"""Remove the drain marker (cancel-drain). Returns True if one existed.
|
||||
|
||||
Best-effort: a missing file is not an error (cancel is idempotent).
|
||||
"""
|
||||
path = drain_request_path(home)
|
||||
try:
|
||||
path.unlink()
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
except OSError as e:
|
||||
_log.warning("drain-control: failed to remove %s: %s", path, e)
|
||||
return False
|
||||
|
||||
|
||||
def drain_requested(*, home: Optional[Path] = None) -> bool:
|
||||
"""True iff the begin-drain marker is present (external drain active)."""
|
||||
return drain_request_path(home).exists()
|
||||
|
||||
|
||||
def read_drain_request(*, home: Optional[Path] = None) -> Optional[dict[str, Any]]:
|
||||
"""Return the marker payload, or ``None`` if absent.
|
||||
|
||||
A present-but-unparseable marker returns ``{}`` (truthy-presence preserved
|
||||
via :func:`drain_requested`; callers that need the body get an empty dict
|
||||
rather than an exception). Never raises.
|
||||
"""
|
||||
path = drain_request_path(home)
|
||||
try:
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
except OSError as e:
|
||||
_log.warning("drain-control: failed to read %s: %s", path, e)
|
||||
return None
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except (ValueError, TypeError):
|
||||
return {}
|
||||
return data if isinstance(data, dict) else {}
|
||||
117
gateway/run.py
117
gateway/run.py
@@ -2471,6 +2471,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
_restart_drain_timeout: float = DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
|
||||
_exit_code: Optional[int] = None
|
||||
_draining: bool = False
|
||||
_external_drain_active: bool = False
|
||||
_restart_requested: bool = False
|
||||
_restart_task_started: bool = False
|
||||
_restart_detached: bool = False
|
||||
@@ -2531,6 +2532,16 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
self._exit_reason: Optional[str] = None
|
||||
self._exit_code: Optional[int] = None
|
||||
self._draining = False
|
||||
# External (NAS-driven) drain state — distinct from the shutdown
|
||||
# ``_draining`` flag above. Set by ``_drain_control_watcher`` when the
|
||||
# ``.drain_request.json`` marker is present: the gateway flips
|
||||
# ``gateway_state -> draining`` and refuses NEW turns, but the process
|
||||
# does NOT exit (the whole point — quiesce-without-restart, D4a). It is
|
||||
# fully reversible: removing the marker reverts to ``running`` and
|
||||
# re-accepts turns. ``_draining`` (shutdown) is one-way and ends in
|
||||
# process exit; this one is a steady state NAS polls during its
|
||||
# request -> poll -> proceed loop.
|
||||
self._external_drain_active = False
|
||||
self._restart_requested = False
|
||||
# Set by shutdown_signal_handler when a SIGTERM/SIGINT arrived
|
||||
# WITHOUT a planned-stop / takeover marker — i.e. an unexpected
|
||||
@@ -3687,6 +3698,85 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# External drain control (NAS-driven quiesce-without-restart, Phase 2).
|
||||
# The dashboard's begin/cancel-drain endpoint writes/removes the
|
||||
# ``.drain_request.json`` marker (gateway/drain_control.py); this watcher
|
||||
# observes the marker and flips the gateway between accepting and refusing
|
||||
# NEW turns, WITHOUT exiting the process. Reversible by design (D4a): NAS
|
||||
# POSTs begin-drain, polls /api/status until active_agents hits 0, proceeds
|
||||
# with its lifecycle action, then (on cancel/abort) the marker is removed
|
||||
# and the gateway re-accepts turns.
|
||||
# ------------------------------------------------------------------
|
||||
def _enter_external_drain(self) -> None:
|
||||
"""Begin external drain: stop accepting new turns, flip state.
|
||||
|
||||
Idempotent — re-entering while already draining is a no-op beyond a
|
||||
best-effort status re-write. In-flight turns are NOT interrupted (the
|
||||
whole point is to let them finish); only NEW turns are refused.
|
||||
"""
|
||||
if self._external_drain_active:
|
||||
return
|
||||
self._external_drain_active = True
|
||||
logger.info(
|
||||
"External drain ENGAGED (.drain_request.json present) — refusing "
|
||||
"new turns; %d in-flight turn(s) will finish. Process stays up.",
|
||||
self._running_agent_count(),
|
||||
)
|
||||
# Flip the persisted lifecycle state so /api/status.gateway_busy /
|
||||
# gateway_drainable track the drain. Preserve active_agents (the
|
||||
# read-merge keeps the live count); only the state changes.
|
||||
self._update_runtime_status("draining")
|
||||
|
||||
def _exit_external_drain(self) -> None:
|
||||
"""Cancel external drain: revert state, re-accept new turns.
|
||||
|
||||
Idempotent. Only reverts to ``running`` when we are actually mid-drain
|
||||
AND not also shutting down (a real shutdown ``_draining`` must win —
|
||||
never resurrect a stopping gateway to ``running``).
|
||||
"""
|
||||
if not self._external_drain_active:
|
||||
return
|
||||
self._external_drain_active = False
|
||||
if self._draining or not self._running:
|
||||
# A shutdown drain is in progress / the loop has stopped — do not
|
||||
# clobber the terminal state back to running.
|
||||
logger.info(
|
||||
"External drain marker cleared during shutdown — not reverting "
|
||||
"to running (shutdown takes precedence)."
|
||||
)
|
||||
return
|
||||
logger.info(
|
||||
"External drain RELEASED (.drain_request.json removed) — "
|
||||
"re-accepting new turns; gateway_state -> running."
|
||||
)
|
||||
self._update_runtime_status("running")
|
||||
|
||||
async def _drain_control_watcher(self, interval: float = 1.0) -> None:
|
||||
"""Background task: reconcile gateway accept-state with the drain marker.
|
||||
|
||||
Polls ``.drain_request.json`` (presence-based contract,
|
||||
gateway/drain_control.py). Marker present -> ``_enter_external_drain``;
|
||||
marker absent -> ``_exit_external_drain``. The 1s cadence bounds the
|
||||
observe-the-marker latency the live-validation gate checks (point a).
|
||||
Reconciles once at startup so a marker that survived a restart is
|
||||
honoured immediately. Best-effort: any tick error is logged and the
|
||||
loop continues (a transient stat() failure must not wedge the gateway).
|
||||
"""
|
||||
from gateway.drain_control import drain_requested
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
if drain_requested():
|
||||
self._enter_external_drain()
|
||||
else:
|
||||
self._exit_external_drain()
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.debug("Drain-control watcher tick error: %s", exc, exc_info=True)
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
def _update_platform_runtime_status(
|
||||
self,
|
||||
platform: str,
|
||||
@@ -5907,6 +5997,12 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
# idle case where the subagent finishes with no agent turn running.
|
||||
asyncio.create_task(self._async_delegation_watcher())
|
||||
|
||||
# Start background drain-control watcher — reconciles the gateway's
|
||||
# new-turn accept-state with the external ``.drain_request.json`` marker
|
||||
# the dashboard begin/cancel-drain endpoint writes (Phase 2). Honours a
|
||||
# marker that survived a restart on its first tick.
|
||||
asyncio.create_task(self._drain_control_watcher())
|
||||
|
||||
logger.info("Press Ctrl+C to stop")
|
||||
|
||||
return True
|
||||
@@ -8419,6 +8515,27 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
||||
return self._telegram_topic_root_lobby_message()
|
||||
return None
|
||||
|
||||
# ── External-drain new-turn gate (Phase 2) ────────────────────
|
||||
# When NAS has engaged an external drain (.drain_request.json present,
|
||||
# observed by _drain_control_watcher), refuse to START a new turn so
|
||||
# the in-flight set can only fall to zero — eliminating the TOCTOU race
|
||||
# (D4a: stop accepting new turns FIRST, then NAS polls until
|
||||
# active_agents==0). In-flight turns are untouched; this only blocks the
|
||||
# claim of a NEW session slot. Internal/system events (restart-recovery
|
||||
# replays, background-process completions) bypass the gate — they are
|
||||
# not user-initiated new work and must still flow during a drain.
|
||||
# Reversible: once the marker is removed the gate opens again.
|
||||
if self._external_drain_active and not is_internal:
|
||||
logger.info(
|
||||
"Refusing new turn for session %s — external drain active.",
|
||||
_quick_key,
|
||||
)
|
||||
return (
|
||||
"⏳ This agent is draining for a maintenance action and isn't "
|
||||
"accepting new turns right now. It'll be back in a moment — "
|
||||
"please resend shortly."
|
||||
)
|
||||
|
||||
# ── Claim this session before any await ───────────────────────
|
||||
# Between here and _run_agent registering the real AIAgent, there
|
||||
# are numerous await points (hooks, vision enrichment, STT,
|
||||
|
||||
@@ -1741,6 +1741,22 @@ DEFAULT_CONFIG = {
|
||||
"secret": "", # token-signing key; blank → random per-process
|
||||
"session_ttl_seconds": 0, # 0 → plugin default (12h)
|
||||
},
|
||||
# Drain-control service-credential configuration — read by the
|
||||
# bundled ``dashboard_auth/drain`` plugin (the first consumer of the
|
||||
# generic non-interactive token-auth capability). The SECRET itself
|
||||
# is a credential and is NOT configured here: it is provisioned by
|
||||
# nous-account-service at deploy time via the
|
||||
# ``HERMES_DASHBOARD_DRAIN_SECRET`` env var (the .env-is-for-secrets
|
||||
# rule). These are the behavioural knobs only. The plugin is a no-op
|
||||
# unless that env var is set to a >=256-bit secret; a weak secret is
|
||||
# rejected at registration (fail-closed) and the drain endpoint stays
|
||||
# disabled. ``scope`` is the capability label attached to the verified
|
||||
# principal; ``min_secret_chars`` is the entropy bar (url-safe-b64
|
||||
# chars; 43 ~= 256 bits).
|
||||
"drain_auth": {
|
||||
"scope": "drain",
|
||||
"min_secret_chars": 43,
|
||||
},
|
||||
# Public URL override (env: ``HERMES_DASHBOARD_PUBLIC_URL``).
|
||||
# When set, this is the complete authority — scheme + host +
|
||||
# optional path prefix (e.g. ``https://example.com/hermes``) —
|
||||
|
||||
@@ -12,6 +12,7 @@ default. Third parties register their own providers via the plugin hook
|
||||
from hermes_cli.dashboard_auth.base import (
|
||||
DashboardAuthProvider,
|
||||
Session,
|
||||
TokenPrincipal,
|
||||
LoginStart,
|
||||
InvalidCodeError,
|
||||
InvalidCredentialsError,
|
||||
@@ -23,12 +24,14 @@ from hermes_cli.dashboard_auth.registry import (
|
||||
register_provider,
|
||||
get_provider,
|
||||
list_providers,
|
||||
list_token_providers,
|
||||
clear_providers,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"DashboardAuthProvider",
|
||||
"Session",
|
||||
"TokenPrincipal",
|
||||
"LoginStart",
|
||||
"InvalidCodeError",
|
||||
"InvalidCredentialsError",
|
||||
@@ -38,5 +41,6 @@ __all__ = [
|
||||
"register_provider",
|
||||
"get_provider",
|
||||
"list_providers",
|
||||
"list_token_providers",
|
||||
"clear_providers",
|
||||
]
|
||||
|
||||
@@ -47,6 +47,8 @@ class AuditEvent(enum.Enum):
|
||||
SESSION_VERIFY_FAILURE = "session_verify_failure"
|
||||
WS_TICKET_MINTED = "ws_ticket_minted"
|
||||
WS_TICKET_REJECTED = "ws_ticket_rejected"
|
||||
TOKEN_AUTH_SUCCESS = "token_auth_success"
|
||||
TOKEN_AUTH_FAILURE = "token_auth_failure"
|
||||
|
||||
|
||||
def _resolve_log_path() -> Path:
|
||||
|
||||
@@ -25,6 +25,34 @@ class Session:
|
||||
refresh_token: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TokenPrincipal:
|
||||
"""A verified non-interactive (service-to-service) caller.
|
||||
|
||||
The token analog of :class:`Session`. Where a ``Session`` represents an
|
||||
interactive human identity behind a session cookie, a ``TokenPrincipal``
|
||||
represents a machine/service caller that authenticated by presenting a
|
||||
bearer token in the ``Authorization`` request header on a single
|
||||
request — no login, no cookie, no refresh.
|
||||
|
||||
Returned by :meth:`DashboardAuthProvider.verify_token` and attached to
|
||||
``request.state.token_principal`` by the token-auth middleware seam so a
|
||||
route handler can see *who* called it.
|
||||
|
||||
Fields:
|
||||
* ``principal`` — stable identifier for the caller (e.g. the provider
|
||||
name, a service account id, or an agent id). Opaque to the seam.
|
||||
* ``provider`` — the ``name`` of the provider that verified the token.
|
||||
* ``scopes`` — capability strings this principal is authorised for.
|
||||
Empty tuple means "unscoped" (the provider vouches for the caller but
|
||||
attaches no capability list); a route MAY enforce a required scope.
|
||||
"""
|
||||
|
||||
principal: str
|
||||
provider: str
|
||||
scopes: tuple[str, ...] = ()
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class LoginStart:
|
||||
"""First leg of the OAuth round trip.
|
||||
@@ -131,6 +159,18 @@ class DashboardAuthProvider(ABC):
|
||||
# and are completely unaffected.
|
||||
supports_password: bool = False
|
||||
|
||||
# When True, this provider can verify a non-interactive bearer token
|
||||
# (``verify_token``) presented on a single request by a service-to-service
|
||||
# caller — no login, no cookie, no refresh. This is the generic
|
||||
# API-token capability flag, mirroring ``supports_password``: a route
|
||||
# opts into token auth (see ``token_auth`` middleware seam) and the
|
||||
# gate consults every ``supports_token`` provider in turn until one
|
||||
# recognises the token. OAuth/password providers leave this False and
|
||||
# are completely unaffected. The drain bearer-secret plugin is the
|
||||
# first consumer, but the capability is deliberately generic so any
|
||||
# future machine-credential provider drops in without core changes.
|
||||
supports_token: bool = False
|
||||
|
||||
@abstractmethod
|
||||
def start_login(self, *, redirect_uri: str) -> LoginStart: ...
|
||||
|
||||
@@ -183,6 +223,39 @@ class DashboardAuthProvider(ABC):
|
||||
"complete_password_login)"
|
||||
)
|
||||
|
||||
def verify_token(self, *, token: str) -> "Optional[TokenPrincipal]":
|
||||
"""Verify a non-interactive bearer token; return its principal.
|
||||
|
||||
The token analog of ``verify_session``. Only consulted when
|
||||
``supports_token`` is True. Called by the ``token_auth`` middleware
|
||||
seam for every request to a token-authable route, in registration
|
||||
order, until one provider returns a non-None principal.
|
||||
|
||||
Contract (mirrors ``verify_session`` stacking semantics):
|
||||
* Return a :class:`TokenPrincipal` if this provider recognises and
|
||||
accepts the token.
|
||||
* Return ``None`` for a token this provider does NOT recognise —
|
||||
never raise, so the seam can fall through to the next provider.
|
||||
A malformed/expired/wrong token is "not recognised" → ``None``.
|
||||
* Raise ``ProviderError`` ONLY for a genuine backing-store outage
|
||||
(the provider can neither confirm nor deny). The seam treats this
|
||||
like ``verify_session``: remember it, keep trying other providers,
|
||||
and surface 503 only if NO provider accepts the token AND at least
|
||||
one was unreachable.
|
||||
|
||||
Implementations MUST use a constant-time comparison
|
||||
(``hmac.compare_digest``) when matching a shared secret so the
|
||||
endpoint isn't a timing oracle.
|
||||
|
||||
The default raises ``NotImplementedError`` so a provider that sets
|
||||
``supports_token`` but forgets to implement this fails loudly rather
|
||||
than silently accepting every caller.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"{type(self).__name__} does not support token auth "
|
||||
"(set supports_token = True and override verify_token)"
|
||||
)
|
||||
|
||||
|
||||
def assert_protocol_compliance(cls: type) -> None:
|
||||
"""Raise ``TypeError`` if ``cls`` doesn't fully implement the provider protocol.
|
||||
|
||||
@@ -181,6 +181,13 @@ async def gated_auth_middleware(
|
||||
if not getattr(request.app.state, "auth_required", False):
|
||||
return await call_next(request)
|
||||
|
||||
# A request already authenticated by the token-auth seam (a service caller
|
||||
# on a registered token route) carries ``token_authenticated`` — it is NOT
|
||||
# a cookie session and must not be bounced to /login. Pass it through; the
|
||||
# seam already attached ``request.state.token_principal``.
|
||||
if getattr(request.state, "token_authenticated", False):
|
||||
return await call_next(request)
|
||||
|
||||
path = request.url.path
|
||||
if _path_is_public(path):
|
||||
return await call_next(request)
|
||||
|
||||
@@ -52,6 +52,20 @@ def list_providers() -> List[DashboardAuthProvider]:
|
||||
return list(_providers.values())
|
||||
|
||||
|
||||
def list_token_providers() -> List[DashboardAuthProvider]:
|
||||
"""Registered providers that support non-interactive token auth.
|
||||
|
||||
The subset of ``list_providers()`` whose ``supports_token`` flag is True,
|
||||
in registration order. The ``token_auth`` middleware seam consults these
|
||||
(and only these) when a token-authable route is hit, so OAuth/password-only
|
||||
providers are never asked to ``verify_token``. Returns an empty list when
|
||||
no token provider is registered — a token-authable route then fails
|
||||
closed (401), never open.
|
||||
"""
|
||||
with _lock:
|
||||
return [p for p in _providers.values() if getattr(p, "supports_token", False)]
|
||||
|
||||
|
||||
def clear_providers() -> None:
|
||||
"""Test-only: drop all registrations."""
|
||||
with _lock:
|
||||
|
||||
194
hermes_cli/dashboard_auth/token_auth.py
Normal file
194
hermes_cli/dashboard_auth/token_auth.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Route-agnostic non-interactive (bearer-token) auth seam for the dashboard.
|
||||
|
||||
This is the generic API-token capability (decisions.md Q-C): a reusable seam
|
||||
that ANY service-to-service / machine-credential provider plugs into, NOT a
|
||||
drain-specific hook. The drain bearer-secret plugin is merely the first
|
||||
consumer.
|
||||
|
||||
How it fits the existing auth framework:
|
||||
|
||||
* The interactive gate (``gated_auth_middleware``) authenticates a human
|
||||
via a session cookie on every non-public route. A service caller has no
|
||||
cookie — it presents a bearer token in the ``Authorization`` header on a
|
||||
single request. That is what this seam verifies.
|
||||
|
||||
* A route opts in by registering its exact path via
|
||||
:func:`register_token_route`. Only registered paths are token-authable;
|
||||
everything else is untouched, so this can never accidentally widen the
|
||||
auth surface of an existing route.
|
||||
|
||||
* :func:`token_auth_middleware` runs OUTERMOST (installed last in
|
||||
``web_server.py``). For a token route it fully owns the auth decision:
|
||||
authenticate via the stacked token providers, attach the verified
|
||||
:class:`~hermes_cli.dashboard_auth.base.TokenPrincipal` to
|
||||
``request.state.token_principal`` + set ``request.state.token_authenticated``,
|
||||
and pass through; otherwise reject (401 unauthenticated, or 503 when a
|
||||
provider's backing store was unreachable). The downstream cookie/session
|
||||
gates honour ``token_authenticated`` and skip enforcement, so a
|
||||
token-authed service request is never bounced to ``/login``.
|
||||
|
||||
* Fails closed: a token route with no registered token provider, no token,
|
||||
or an unrecognised token gets 401 — never an open pass-through.
|
||||
|
||||
Provider stacking mirrors ``verify_session``: each ``supports_token`` provider
|
||||
is consulted in registration order until one returns a principal. A provider
|
||||
that doesn't recognise the token returns ``None`` and the seam moves on; a
|
||||
provider whose backing store is unreachable raises ``ProviderError``, which the
|
||||
seam remembers and surfaces as 503 only if NO provider accepts the token.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
from typing import Awaitable, Callable, Optional, Tuple
|
||||
|
||||
from fastapi import Request
|
||||
from fastapi.responses import JSONResponse, Response
|
||||
|
||||
from hermes_cli.dashboard_auth import list_token_providers
|
||||
from hermes_cli.dashboard_auth.audit import AuditEvent, audit_log
|
||||
from hermes_cli.dashboard_auth.base import ProviderError, TokenPrincipal
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
# Exact paths that accept non-interactive bearer-token auth. A route registers
|
||||
# itself here at import/startup; the seam only acts on registered paths.
|
||||
_token_routes: set[str] = set()
|
||||
_lock = threading.Lock()
|
||||
|
||||
|
||||
def register_token_route(path: str) -> None:
|
||||
"""Mark ``path`` (exact match) as token-authable.
|
||||
|
||||
Idempotent. Call at module import / app setup so the seam knows which
|
||||
routes to guard. Registering a route does NOT make it public — it makes
|
||||
it authenticate by token instead of by session cookie.
|
||||
"""
|
||||
with _lock:
|
||||
_token_routes.add(path)
|
||||
|
||||
|
||||
def is_token_route(path: str) -> bool:
|
||||
"""True if ``path`` was registered as token-authable (exact match)."""
|
||||
with _lock:
|
||||
return path in _token_routes
|
||||
|
||||
|
||||
def clear_token_routes() -> None:
|
||||
"""Test-only: drop all registered token routes."""
|
||||
with _lock:
|
||||
_token_routes.clear()
|
||||
|
||||
|
||||
def _client_ip(request: Request) -> str:
|
||||
fwd = request.headers.get("x-forwarded-for", "")
|
||||
if fwd:
|
||||
return fwd.split(",")[0].strip()
|
||||
return request.client.host if request.client else ""
|
||||
|
||||
|
||||
def extract_bearer_token(request: Request) -> str:
|
||||
"""Return the bearer token from the ``Authorization`` header, or "".
|
||||
|
||||
Accepts ``<scheme> <token>`` where scheme is "bearer" (case-insensitive).
|
||||
Returns an empty string for a missing/malformed header or a non-bearer
|
||||
scheme — the caller treats "" as "no token presented".
|
||||
"""
|
||||
auth = request.headers.get("authorization", "")
|
||||
parts = auth.split(" ", 1)
|
||||
if len(parts) == 2 and parts[0].strip().lower() == "bearer":
|
||||
return parts[1].strip()
|
||||
return ""
|
||||
|
||||
|
||||
def authenticate_token(
|
||||
request: Request,
|
||||
) -> Tuple[Optional[TokenPrincipal], Optional[str]]:
|
||||
"""Try every token provider against the request's bearer token.
|
||||
|
||||
Returns ``(principal, unreachable_provider_name)``:
|
||||
* ``(TokenPrincipal, None)`` — a provider recognised and accepted the token.
|
||||
* ``(None, None)`` — no token, or no provider recognised it (reject 401).
|
||||
* ``(None, name)`` — no provider accepted it AND at least one provider's
|
||||
backing store was unreachable (the caller surfaces 503, not 401, so a
|
||||
transient outage doesn't read as "bad credentials").
|
||||
|
||||
Never raises: a provider ``ProviderError`` is caught and remembered.
|
||||
"""
|
||||
token = extract_bearer_token(request)
|
||||
if not token:
|
||||
return None, None
|
||||
unreachable: Optional[str] = None
|
||||
for provider in list_token_providers():
|
||||
try:
|
||||
principal = provider.verify_token(token=token)
|
||||
except ProviderError as e:
|
||||
_log.warning(
|
||||
"dashboard-auth: token provider %r unreachable during verify: %s",
|
||||
provider.name, e,
|
||||
)
|
||||
if unreachable is None:
|
||||
unreachable = provider.name
|
||||
continue
|
||||
except Exception as e: # noqa: BLE001 — a buggy provider must not 500 the gate
|
||||
_log.warning(
|
||||
"dashboard-auth: token provider %r raised during verify: %s",
|
||||
provider.name, e,
|
||||
)
|
||||
continue
|
||||
if principal is not None:
|
||||
return principal, None
|
||||
return None, unreachable
|
||||
|
||||
|
||||
async def token_auth_middleware(
|
||||
request: Request,
|
||||
call_next: Callable[[Request], Awaitable[Response]],
|
||||
) -> Response:
|
||||
"""Outermost auth seam for token-authable routes.
|
||||
|
||||
No-op pass-through for any path not registered via
|
||||
:func:`register_token_route`. For a registered path, token auth is the
|
||||
only accepted scheme:
|
||||
|
||||
* valid token → attach principal + ``token_authenticated`` flag, pass through.
|
||||
* unreachable → 503 (provider backing store down; not "bad credentials").
|
||||
* otherwise → 401 unauthenticated.
|
||||
|
||||
Runs before the cookie/session gates (installed last in ``web_server.py``).
|
||||
The cookie gates honour ``request.state.token_authenticated`` and skip
|
||||
enforcement, so a token-authed request is never redirected to ``/login``.
|
||||
"""
|
||||
path = request.url.path
|
||||
if not is_token_route(path):
|
||||
return await call_next(request)
|
||||
|
||||
principal, unreachable = authenticate_token(request)
|
||||
if principal is not None:
|
||||
request.state.token_principal = principal
|
||||
request.state.token_authenticated = True
|
||||
return await call_next(request)
|
||||
|
||||
if unreachable:
|
||||
audit_log(
|
||||
AuditEvent.TOKEN_AUTH_FAILURE,
|
||||
provider=unreachable,
|
||||
reason="provider_unreachable",
|
||||
path=path,
|
||||
ip=_client_ip(request),
|
||||
)
|
||||
return JSONResponse(
|
||||
{"detail": f"Auth provider {unreachable!r} unreachable"},
|
||||
status_code=503,
|
||||
)
|
||||
|
||||
audit_log(
|
||||
AuditEvent.TOKEN_AUTH_FAILURE,
|
||||
reason="no_provider_recognises_token",
|
||||
path=path,
|
||||
ip=_client_ip(request),
|
||||
)
|
||||
return JSONResponse(
|
||||
{"error": "unauthenticated", "detail": "Unauthorized"},
|
||||
status_code=401,
|
||||
)
|
||||
@@ -468,6 +468,11 @@ async def _dashboard_auth_gate(request: Request, call_next):
|
||||
@app.middleware("http")
|
||||
async def auth_middleware(request: Request, call_next):
|
||||
"""Require the session token on all /api/ routes except the public list."""
|
||||
# A request already authenticated by the token-auth seam (a service caller
|
||||
# presenting a bearer token on a registered token route) carries
|
||||
# ``token_authenticated`` — never bounce it through the cookie/session gate.
|
||||
if getattr(request.state, "token_authenticated", False):
|
||||
return await call_next(request)
|
||||
# When the OAuth gate is active, cookie-based auth (gated_auth_middleware
|
||||
# above) is authoritative. The legacy _SESSION_TOKEN path is loopback-only
|
||||
# and is skipped here so the gate's session attachment isn't overridden.
|
||||
@@ -483,6 +488,20 @@ async def auth_middleware(request: Request, call_next):
|
||||
return await call_next(request)
|
||||
|
||||
|
||||
@app.middleware("http")
|
||||
async def _token_auth_seam(request: Request, call_next):
|
||||
"""Outermost auth seam: non-interactive bearer-token auth for opted-in routes.
|
||||
|
||||
Registered LAST so it runs FIRST (Starlette middleware is outermost-last).
|
||||
A registered token route is fully owned here — authenticate by token,
|
||||
attach the principal + ``token_authenticated`` flag, and let the downstream
|
||||
cookie/session gates skip enforcement. Non-token routes pass straight
|
||||
through untouched.
|
||||
"""
|
||||
from hermes_cli.dashboard_auth.token_auth import token_auth_middleware
|
||||
return await token_auth_middleware(request, call_next)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config schema — auto-generated from DEFAULT_CONFIG
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -2481,6 +2500,71 @@ async def restart_gateway(profile: Optional[str] = None):
|
||||
}
|
||||
|
||||
|
||||
@app.post("/api/gateway/drain")
|
||||
async def gateway_drain(request: Request):
|
||||
"""Begin or cancel an external (NAS-driven) gateway drain.
|
||||
|
||||
Authenticated by the non-interactive token-auth seam: the
|
||||
``dashboard_auth/drain`` plugin registers this exact path as a token route
|
||||
and verifies the ``Authorization`` bearer secret. If that plugin isn't
|
||||
active (no ``HERMES_DASHBOARD_DRAIN_SECRET``), the route is NOT a token
|
||||
route, so on a gated bind the cookie gate handles it (a browser session can
|
||||
still drive it from the dashboard) and on a loopback bind the legacy
|
||||
session-token gate applies — either way it is never unauthenticated on a
|
||||
network-exposed bind.
|
||||
|
||||
Body: ``{"action": "drain"}`` (begin) or ``{"action": "cancel"}`` (cancel).
|
||||
Begin writes the ``.drain_request.json`` marker the gateway's
|
||||
``_drain_control_watcher`` observes (flip to ``draining`` + refuse new
|
||||
turns); cancel removes it (revert to ``running`` + re-accept). Idempotent
|
||||
on both sides. This endpoint only writes/removes the marker — the gateway
|
||||
process owns the actual state transition (there is no HTTP control channel
|
||||
into the running gateway; the marker IS the channel, decisions.md Q-B).
|
||||
|
||||
The force-override (D6: "unless a user commands it") is NOT here — an
|
||||
immediate, drain-skipping action maps onto the existing
|
||||
``POST /api/gateway/restart`` force path, which supersedes a drain.
|
||||
"""
|
||||
from gateway.drain_control import (
|
||||
clear_drain_request,
|
||||
drain_requested,
|
||||
write_drain_request,
|
||||
)
|
||||
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
body = {}
|
||||
action = str((body or {}).get("action", "drain")).strip().lower()
|
||||
|
||||
# Attribute the request to the verified token principal when present
|
||||
# (token-auth seam attaches it); fall back to a generic label otherwise.
|
||||
principal_obj = getattr(request.state, "token_principal", None)
|
||||
principal = getattr(principal_obj, "principal", None) or "dashboard"
|
||||
|
||||
if action == "cancel":
|
||||
existed = clear_drain_request()
|
||||
_log.info("Gateway drain CANCEL requested by %s (existed=%s)", principal, existed)
|
||||
return {"ok": True, "action": "cancel", "was_draining": existed}
|
||||
|
||||
if action != "drain":
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Unknown drain action {action!r}; expected 'drain' or 'cancel'",
|
||||
)
|
||||
|
||||
payload = write_drain_request(principal=str(principal))
|
||||
_log.info("Gateway drain BEGIN requested by %s", principal)
|
||||
return {
|
||||
"ok": True,
|
||||
"action": "drain",
|
||||
"requested_at": payload["requested_at"],
|
||||
# Echo so a caller polling /api/status knows the marker is now set;
|
||||
# the gateway watcher flips gateway_state -> draining within ~1s.
|
||||
"draining": drain_requested(),
|
||||
}
|
||||
|
||||
|
||||
@app.post("/api/hermes/update")
|
||||
async def update_hermes():
|
||||
"""Kick off ``hermes update`` in the background."""
|
||||
|
||||
290
plugins/dashboard_auth/drain/__init__.py
Normal file
290
plugins/dashboard_auth/drain/__init__.py
Normal file
@@ -0,0 +1,290 @@
|
||||
"""DrainSecretProvider — shared-bearer-secret auth for the drain-control endpoint.
|
||||
|
||||
Task 2.0b of the safe-shutdown plan, and the FIRST consumer of the generic
|
||||
non-interactive token-auth capability added in Task 2.0a
|
||||
(``supports_token`` / ``verify_token`` on the ``DashboardAuthProvider`` ABC +
|
||||
the route-agnostic ``token_auth`` middleware seam).
|
||||
|
||||
What it is
|
||||
----------
|
||||
A service-to-service auth provider. ``nous-account-service`` (NAS) provisions a
|
||||
**per-agent unique** shared secret into each deployed agent's environment; this
|
||||
provider verifies an inbound ``Authorization`` bearer token against that secret
|
||||
with a constant-time compare and, on a match, vouches for the caller as the
|
||||
``drain-control`` principal. It is NOT an interactive identity provider — there
|
||||
is no login, cookie, session, or refresh. It implements ONLY the token
|
||||
capability (``supports_token = True`` + ``verify_token``); the five interactive
|
||||
ABC methods raise ``NotImplementedError``.
|
||||
|
||||
Why a plugin (not an ad-hoc header check on the drain route)
|
||||
------------------------------------------------------------
|
||||
Decisions.md Q-A: the drain credential MUST be a real auth plugin in the
|
||||
dashboard auth framework, not a bolt-on. Q-C: the framework widening that
|
||||
hosts it is generic (Task 2.0a) and this plugin is merely its first consumer.
|
||||
|
||||
Security properties (decisions.md Q-A)
|
||||
--------------------------------------
|
||||
* **Per-agent unique secret** — each agent gets a distinct secret; a leak's
|
||||
blast radius is one agent.
|
||||
* **Entropy gate at registration** — a weak/short/low-entropy secret fails
|
||||
CLOSED at load (the plugin declines to register and records a skip reason);
|
||||
it is never silently accepted. Bar: >= 256 bits of entropy / >= 43
|
||||
url-safe-base64 chars, and the value must not be obviously structured
|
||||
(all-one-character, too few distinct characters).
|
||||
* **Constant-time compare** — ``hmac.compare_digest`` on the request path, so
|
||||
the endpoint is not a timing oracle.
|
||||
|
||||
Configuration
|
||||
-------------
|
||||
The secret is a CREDENTIAL, so it is carried via an env var (the ``.env``-is-
|
||||
for-secrets-only rule), provisioned by NAS at deploy time (Phase 3):
|
||||
|
||||
HERMES_DASHBOARD_DRAIN_SECRET # the per-agent shared secret (>=43 url-safe-b64 chars)
|
||||
|
||||
Behavioural knobs live in config.yaml (canonical surface):
|
||||
|
||||
dashboard:
|
||||
drain_auth:
|
||||
scope: drain # capability label attached to the principal
|
||||
min_secret_chars: 43 # entropy bar (optional; default 43 ~= 256 bits)
|
||||
|
||||
When ``HERMES_DASHBOARD_DRAIN_SECRET`` is unset, the plugin is a no-op (records
|
||||
a skip reason) — agents that don't want NAS-driven drain just don't set it.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hmac
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
from collections import Counter
|
||||
from typing import Optional
|
||||
|
||||
from hermes_cli.dashboard_auth import (
|
||||
DashboardAuthProvider,
|
||||
LoginStart,
|
||||
Session,
|
||||
TokenPrincipal,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Default entropy bar: 43 url-safe-base64 chars ~= 256 bits. token_urlsafe(32)
|
||||
# produces 43 chars, so a correctly-provisioned secret clears this exactly.
|
||||
_DEFAULT_MIN_SECRET_CHARS = 43
|
||||
# A secret must contain at least this many DISTINCT characters — rejects
|
||||
# degenerate values like "aaaa..." that are long but trivially low-entropy.
|
||||
_MIN_DISTINCT_CHARS = 16
|
||||
# Shannon entropy floor (bits) over the secret's characters — a second,
|
||||
# distribution-aware guard on top of the length + distinct-count checks.
|
||||
_MIN_SHANNON_BITS = 128.0
|
||||
|
||||
# The path the begin/cancel-drain endpoint lives on. Registered as a
|
||||
# token-authable route by ``register()`` so the generic seam guards it. Kept
|
||||
# here (not imported from web_server) to avoid a heavy import at plugin load.
|
||||
DRAIN_ROUTE_PATH = "/api/gateway/drain"
|
||||
|
||||
LAST_SKIP_REASON: str = ""
|
||||
|
||||
|
||||
def _shannon_bits(value: str) -> float:
|
||||
"""Total Shannon entropy (bits) of ``value`` over its character distribution.
|
||||
|
||||
H = len * sum(-p_i * log2(p_i)). A long string drawn from a wide alphabet
|
||||
scores high; a long run of one character scores ~0.
|
||||
"""
|
||||
if not value:
|
||||
return 0.0
|
||||
counts = Counter(value)
|
||||
n = len(value)
|
||||
per_char = -sum((c / n) * math.log2(c / n) for c in counts.values())
|
||||
return per_char * n
|
||||
|
||||
|
||||
def assess_secret_strength(
|
||||
secret: str, *, min_chars: int = _DEFAULT_MIN_SECRET_CHARS
|
||||
) -> Optional[str]:
|
||||
"""Return a rejection reason if ``secret`` is too weak, else ``None``.
|
||||
|
||||
Fail-closed entropy gate (decisions.md Q-A). Checks, in order:
|
||||
* length >= ``min_chars`` (default 43 url-safe-b64 chars ~= 256 bits),
|
||||
* at least ``_MIN_DISTINCT_CHARS`` distinct characters,
|
||||
* Shannon entropy >= ``_MIN_SHANNON_BITS`` bits.
|
||||
|
||||
A ``None`` return means the secret passes. Any string return is a
|
||||
human-readable reason the caller logs + records as the skip reason.
|
||||
"""
|
||||
if not secret:
|
||||
return "secret is empty"
|
||||
if len(secret) < min_chars:
|
||||
return (
|
||||
f"secret too short: {len(secret)} chars (need >= {min_chars}; "
|
||||
"use a >=256-bit value, e.g. `python -c \"import secrets; "
|
||||
"print(secrets.token_urlsafe(32))\"`)"
|
||||
)
|
||||
distinct = len(set(secret))
|
||||
if distinct < _MIN_DISTINCT_CHARS:
|
||||
return (
|
||||
f"secret has only {distinct} distinct characters (need >= "
|
||||
f"{_MIN_DISTINCT_CHARS}); looks structured/low-entropy"
|
||||
)
|
||||
bits = _shannon_bits(secret)
|
||||
if bits < _MIN_SHANNON_BITS:
|
||||
return (
|
||||
f"secret entropy too low: {bits:.0f} bits (need >= "
|
||||
f"{_MIN_SHANNON_BITS:.0f}); looks structured/repeated"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
class DrainSecretProvider(DashboardAuthProvider):
|
||||
"""Non-interactive shared-bearer-secret provider for drain control."""
|
||||
|
||||
name = "drain-secret"
|
||||
display_name = "Drain Control (service credential)"
|
||||
supports_token = True
|
||||
|
||||
def __init__(self, *, secret: str, scope: str = "drain") -> None:
|
||||
# Defence in depth: construction also enforces the entropy bar, so a
|
||||
# caller that bypasses register()'s check still can't build a weak
|
||||
# provider. register() does the friendly skip-reason path; this raises.
|
||||
reason = assess_secret_strength(secret)
|
||||
if reason is not None:
|
||||
raise ValueError(f"drain secret rejected: {reason}")
|
||||
self._secret = secret
|
||||
self._scope = scope or "drain"
|
||||
|
||||
# ---- token capability (the only thing this provider implements) --------
|
||||
|
||||
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
|
||||
"""Constant-time compare against the per-agent shared secret.
|
||||
|
||||
Returns a ``drain-control`` principal on an exact match, else ``None``
|
||||
(the generic seam falls through / fails closed). Uses
|
||||
``hmac.compare_digest`` so a wrong token can't be recovered by timing.
|
||||
"""
|
||||
if not token:
|
||||
return None
|
||||
if hmac.compare_digest(token.encode("utf-8"), self._secret.encode("utf-8")):
|
||||
return TokenPrincipal(
|
||||
principal="drain-control",
|
||||
provider=self.name,
|
||||
scopes=(self._scope,),
|
||||
)
|
||||
return None
|
||||
|
||||
# ---- interactive methods: unsupported (service credential only) --------
|
||||
|
||||
def start_login(self, *, redirect_uri: str) -> LoginStart:
|
||||
raise NotImplementedError(
|
||||
"DrainSecretProvider is a non-interactive service credential; "
|
||||
"there is no login flow."
|
||||
)
|
||||
|
||||
def complete_login(
|
||||
self, *, code: str, state: str, code_verifier: str, redirect_uri: str
|
||||
) -> Session:
|
||||
raise NotImplementedError(
|
||||
"DrainSecretProvider is a non-interactive service credential."
|
||||
)
|
||||
|
||||
def verify_session(self, *, access_token: str) -> Optional[Session]:
|
||||
# Not a cookie-session provider — it never mints a Session, so it can
|
||||
# never recognise a session cookie. Return None (don't raise) so it
|
||||
# stacks harmlessly in the cookie-verify loop.
|
||||
return None
|
||||
|
||||
def refresh_session(self, *, refresh_token: str) -> Session:
|
||||
raise NotImplementedError(
|
||||
"DrainSecretProvider is a non-interactive service credential."
|
||||
)
|
||||
|
||||
def revoke_session(self, *, refresh_token: str) -> None:
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Plugin entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _load_config_drain_auth_section() -> dict:
|
||||
"""Return ``dashboard.drain_auth`` from config.yaml, or ``{}``."""
|
||||
try:
|
||||
from hermes_cli.config import cfg_get, load_config
|
||||
|
||||
cfg = load_config()
|
||||
except Exception as exc: # noqa: BLE001 — broad catch is intentional
|
||||
logger.debug(
|
||||
"dashboard-auth-drain: load_config() raised %s; "
|
||||
"falling back to env-only configuration",
|
||||
exc,
|
||||
)
|
||||
return {}
|
||||
section = cfg_get(cfg, "dashboard", "drain_auth", default=None)
|
||||
return section if isinstance(section, dict) else {}
|
||||
|
||||
|
||||
def register(ctx) -> None:
|
||||
"""Plugin entry — registers DrainSecretProvider when a strong secret is set.
|
||||
|
||||
No-op (records a skip reason) when ``HERMES_DASHBOARD_DRAIN_SECRET`` is
|
||||
unset or fails the entropy gate. On success, also registers the
|
||||
begin/cancel-drain route as token-authable via the generic seam.
|
||||
"""
|
||||
global LAST_SKIP_REASON
|
||||
LAST_SKIP_REASON = ""
|
||||
|
||||
secret = os.environ.get("HERMES_DASHBOARD_DRAIN_SECRET", "").strip()
|
||||
if not secret:
|
||||
LAST_SKIP_REASON = (
|
||||
"HERMES_DASHBOARD_DRAIN_SECRET is not set. Set a per-agent "
|
||||
">=256-bit secret (e.g. `python -c \"import secrets; "
|
||||
"print(secrets.token_urlsafe(32))\"`) to enable NAS-driven drain "
|
||||
"coordination; leave it unset to disable the drain endpoint."
|
||||
)
|
||||
logger.debug("dashboard-auth-drain: %s", LAST_SKIP_REASON)
|
||||
return
|
||||
|
||||
section = _load_config_drain_auth_section()
|
||||
scope = str(section.get("scope", "drain") or "drain").strip() or "drain"
|
||||
try:
|
||||
min_chars = int(section.get("min_secret_chars", _DEFAULT_MIN_SECRET_CHARS))
|
||||
except (TypeError, ValueError):
|
||||
min_chars = _DEFAULT_MIN_SECRET_CHARS
|
||||
|
||||
reason = assess_secret_strength(secret, min_chars=min_chars)
|
||||
if reason is not None:
|
||||
LAST_SKIP_REASON = (
|
||||
f"HERMES_DASHBOARD_DRAIN_SECRET rejected — {reason}. "
|
||||
"The drain endpoint stays disabled (fail-closed)."
|
||||
)
|
||||
logger.warning("dashboard-auth-drain: %s", LAST_SKIP_REASON)
|
||||
return
|
||||
|
||||
try:
|
||||
provider = DrainSecretProvider(secret=secret, scope=scope)
|
||||
except ValueError as exc:
|
||||
LAST_SKIP_REASON = f"DrainSecretProvider construction failed: {exc}"
|
||||
logger.warning("dashboard-auth-drain: %s", LAST_SKIP_REASON)
|
||||
return
|
||||
|
||||
ctx.register_dashboard_auth_provider(provider)
|
||||
|
||||
# Opt the begin/cancel-drain endpoint into the generic token-auth seam so
|
||||
# the dashboard's interactive cookie gate doesn't bounce NAS's bearer call.
|
||||
try:
|
||||
from hermes_cli.dashboard_auth.token_auth import register_token_route
|
||||
|
||||
register_token_route(DRAIN_ROUTE_PATH)
|
||||
except Exception as exc: # noqa: BLE001 — seam import must not crash plugin load
|
||||
logger.warning(
|
||||
"dashboard-auth-drain: could not register token route %s: %s",
|
||||
DRAIN_ROUTE_PATH, exc,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"dashboard-auth-drain: registered drain service-credential provider "
|
||||
"(scope=%s, route=%s)",
|
||||
scope, DRAIN_ROUTE_PATH,
|
||||
)
|
||||
7
plugins/dashboard_auth/drain/plugin.yaml
Normal file
7
plugins/dashboard_auth/drain/plugin.yaml
Normal file
@@ -0,0 +1,7 @@
|
||||
name: drain
|
||||
version: 1.0.0
|
||||
description: "Dashboard auth provider — non-interactive shared-bearer-secret for the gateway drain-control endpoint. The first consumer of the generic token-auth capability (supports_token/verify_token). nous-account-service provisions a per-agent unique secret via HERMES_DASHBOARD_DRAIN_SECRET; this provider verifies an inbound Authorization bearer token against it with a constant-time compare and registers /api/gateway/drain as token-authable. Fails CLOSED: a weak/short/low-entropy secret (< 256 bits) is rejected at registration and the endpoint stays disabled. No-op when the env var is unset. Behavioural knobs (scope, min_secret_chars) live under dashboard.drain_auth in config.yaml."
|
||||
author: NousResearch
|
||||
kind: backend
|
||||
requires_env:
|
||||
- HERMES_DASHBOARD_DRAIN_SECRET
|
||||
196
tests/gateway/test_external_drain_control.py
Normal file
196
tests/gateway/test_external_drain_control.py
Normal file
@@ -0,0 +1,196 @@
|
||||
"""Tests for the external drain-control marker contract + gateway state machine.
|
||||
|
||||
Task 2.2/2.3. Two layers:
|
||||
* drain_control.py — the presence-based marker contract (write/clear/read,
|
||||
HERMES_HOME-scoped, never-raises).
|
||||
* GatewayRunner enter/exit/watcher + the new-turn accept gate — the
|
||||
reversible state machine driven by the marker.
|
||||
|
||||
Mocked tests are necessary-not-sufficient here (the HARD live-validation gate,
|
||||
Q-B, exercises a real `hermes gateway run`); these lock the unit contract.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
import gateway.drain_control as dc
|
||||
from gateway.run import GatewayRunner
|
||||
from gateway.platforms.base import MessageEvent, MessageType
|
||||
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Marker contract (drain_control.py)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def home(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
return tmp_path
|
||||
|
||||
|
||||
class TestMarkerContract:
|
||||
def test_absent_by_default(self, home):
|
||||
assert dc.drain_requested() is False
|
||||
assert dc.read_drain_request() is None
|
||||
|
||||
def test_write_then_present(self, home):
|
||||
payload = dc.write_drain_request(principal="nas")
|
||||
assert dc.drain_requested() is True
|
||||
assert payload["action"] == "drain"
|
||||
assert payload["principal"] == "nas"
|
||||
body = dc.read_drain_request()
|
||||
assert body is not None and body["principal"] == "nas"
|
||||
|
||||
def test_clear_removes(self, home):
|
||||
dc.write_drain_request()
|
||||
assert dc.clear_drain_request() is True
|
||||
assert dc.drain_requested() is False
|
||||
# idempotent: clearing again is a no-op, returns False
|
||||
assert dc.clear_drain_request() is False
|
||||
|
||||
def test_path_respects_hermes_home(self, home):
|
||||
assert dc.drain_request_path() == home / ".drain_request.json"
|
||||
|
||||
def test_corrupt_marker_reads_as_present_contentless(self, home):
|
||||
# A half-written / malformed marker must still count as "drain active"
|
||||
# (fail-safe toward quiescing).
|
||||
dc.drain_request_path().write_text("{not valid json", encoding="utf-8")
|
||||
assert dc.drain_requested() is True
|
||||
assert dc.read_drain_request() == {}
|
||||
|
||||
def test_write_is_atomic_json(self, home):
|
||||
dc.write_drain_request(principal="x")
|
||||
import json
|
||||
|
||||
data = json.loads(dc.drain_request_path().read_text())
|
||||
assert data["action"] == "drain"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway state machine (enter / exit / idempotency)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _drain_runner():
|
||||
runner, adapter = make_restart_runner()
|
||||
runner._external_drain_active = False
|
||||
# Bind the real methods under test.
|
||||
runner._enter_external_drain = GatewayRunner._enter_external_drain.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
runner._exit_external_drain = GatewayRunner._exit_external_drain.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
return runner, adapter
|
||||
|
||||
|
||||
class TestDrainStateMachine:
|
||||
def test_enter_sets_flag_and_flips_state(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._enter_external_drain()
|
||||
assert runner._external_drain_active is True
|
||||
runner._update_runtime_status.assert_called_with("draining")
|
||||
|
||||
def test_enter_idempotent(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._enter_external_drain()
|
||||
runner._update_runtime_status.reset_mock()
|
||||
runner._enter_external_drain() # second call — no-op
|
||||
runner._update_runtime_status.assert_not_called()
|
||||
|
||||
def test_exit_reverts_to_running(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._enter_external_drain()
|
||||
runner._update_runtime_status.reset_mock()
|
||||
runner._exit_external_drain()
|
||||
assert runner._external_drain_active is False
|
||||
runner._update_runtime_status.assert_called_with("running")
|
||||
|
||||
def test_exit_idempotent_when_not_draining(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._exit_external_drain() # never entered — no-op
|
||||
runner._update_runtime_status.assert_not_called()
|
||||
|
||||
def test_exit_during_shutdown_does_not_revert_to_running(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._enter_external_drain()
|
||||
runner._update_runtime_status.reset_mock()
|
||||
# A shutdown drain is now in progress — exit must NOT resurrect running.
|
||||
runner._draining = True
|
||||
runner._exit_external_drain()
|
||||
assert runner._external_drain_active is False
|
||||
runner._update_runtime_status.assert_not_called()
|
||||
|
||||
def test_exit_when_loop_stopped_does_not_revert(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._enter_external_drain()
|
||||
runner._update_runtime_status.reset_mock()
|
||||
runner._running = False
|
||||
runner._exit_external_drain()
|
||||
runner._update_runtime_status.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Watcher reconciliation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDrainWatcher:
|
||||
@pytest.mark.asyncio
|
||||
async def test_watcher_enters_then_exits_with_marker(self, home):
|
||||
runner, _ = _drain_runner()
|
||||
runner._drain_control_watcher = GatewayRunner._drain_control_watcher.__get__(
|
||||
runner, GatewayRunner
|
||||
)
|
||||
# Drive a few ticks manually rather than spinning the loop.
|
||||
dc.write_drain_request()
|
||||
task = asyncio.create_task(runner._drain_control_watcher(interval=0.02))
|
||||
await asyncio.sleep(0.06)
|
||||
assert runner._external_drain_active is True
|
||||
dc.clear_drain_request()
|
||||
await asyncio.sleep(0.06)
|
||||
assert runner._external_drain_active is False
|
||||
runner._running = False
|
||||
await asyncio.sleep(0.04)
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# New-turn accept gate
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNewTurnGate:
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_turn_refused_during_external_drain(self):
|
||||
runner, _ = _drain_runner()
|
||||
runner._external_drain_active = True
|
||||
event = MessageEvent(
|
||||
text="hello",
|
||||
message_type=MessageType.TEXT,
|
||||
source=make_restart_source(),
|
||||
message_id="m1",
|
||||
)
|
||||
result = await runner._handle_message(event)
|
||||
assert result is not None
|
||||
assert "draining" in result.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_in_flight_turn_not_interrupted_by_drain(self):
|
||||
# Entering drain must NOT touch the running-agents set.
|
||||
runner, _ = _drain_runner()
|
||||
sentinel = MagicMock()
|
||||
runner._running_agents["k"] = sentinel
|
||||
runner._enter_external_drain()
|
||||
assert runner._running_agents.get("k") is sentinel
|
||||
sentinel.interrupt.assert_not_called()
|
||||
316
tests/hermes_cli/test_dashboard_token_auth.py
Normal file
316
tests/hermes_cli/test_dashboard_token_auth.py
Normal file
@@ -0,0 +1,316 @@
|
||||
"""Contract tests for the generic non-interactive (bearer-token) auth seam.
|
||||
|
||||
Covers Task 2.0a: the reusable token-auth capability in the dashboard auth
|
||||
framework — NOT the drain plugin (that's 2.0b/2.1). Asserts the ABC capability
|
||||
flag, the registry filter, bearer extraction, provider stacking (verify_token),
|
||||
and the route-agnostic middleware seam's fail-closed / 503 / pass-through
|
||||
behaviour.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_cli.dashboard_auth import (
|
||||
DashboardAuthProvider,
|
||||
LoginStart,
|
||||
Session,
|
||||
TokenPrincipal,
|
||||
clear_providers,
|
||||
list_token_providers,
|
||||
register_provider,
|
||||
)
|
||||
from hermes_cli.dashboard_auth.base import ProviderError
|
||||
from hermes_cli.dashboard_auth import token_auth
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Test doubles
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _OAuthOnly(DashboardAuthProvider):
|
||||
"""A pure interactive provider — never token-authable."""
|
||||
|
||||
name = "oauth-only"
|
||||
display_name = "OAuth Only"
|
||||
|
||||
def start_login(self, *, redirect_uri):
|
||||
return LoginStart(redirect_url="x", cookie_payload={})
|
||||
|
||||
def complete_login(self, *, code, state, code_verifier, redirect_uri):
|
||||
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
|
||||
|
||||
def verify_session(self, *, access_token):
|
||||
return None
|
||||
|
||||
def refresh_session(self, *, refresh_token):
|
||||
return Session("u", "e", "n", "o", self.name, 0, "a", "r")
|
||||
|
||||
def revoke_session(self, *, refresh_token):
|
||||
return None
|
||||
|
||||
|
||||
class _TokenProvider(_OAuthOnly):
|
||||
"""A token provider that accepts exactly one secret."""
|
||||
|
||||
name = "tok"
|
||||
display_name = "Token Provider"
|
||||
supports_token = True
|
||||
|
||||
def __init__(self, *, secret: str = "good-secret", scopes=("drain",)):
|
||||
self._secret = secret
|
||||
self._scopes = tuple(scopes)
|
||||
|
||||
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
|
||||
if token == self._secret:
|
||||
return TokenPrincipal(
|
||||
principal=self.name, provider=self.name, scopes=self._scopes
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
class _UnreachableTokenProvider(_OAuthOnly):
|
||||
name = "tok-down"
|
||||
display_name = "Unreachable Token Provider"
|
||||
supports_token = True
|
||||
|
||||
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
|
||||
raise ProviderError("backing store down")
|
||||
|
||||
|
||||
class _BuggyTokenProvider(_OAuthOnly):
|
||||
name = "tok-buggy"
|
||||
display_name = "Buggy Token Provider"
|
||||
supports_token = True
|
||||
|
||||
def verify_token(self, *, token: str) -> Optional[TokenPrincipal]:
|
||||
raise RuntimeError("kaboom")
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _isolated_state():
|
||||
clear_providers()
|
||||
token_auth.clear_token_routes()
|
||||
yield
|
||||
clear_providers()
|
||||
token_auth.clear_token_routes()
|
||||
|
||||
|
||||
class _FakeURL:
|
||||
def __init__(self, path):
|
||||
self.path = path
|
||||
|
||||
|
||||
class _FakeClient:
|
||||
host = "1.2.3.4"
|
||||
|
||||
|
||||
class _FakeRequest:
|
||||
"""Minimal Request stand-in for the seam (no real Starlette needed)."""
|
||||
|
||||
def __init__(self, path="/api/gateway/drain", headers=None):
|
||||
self.url = _FakeURL(path)
|
||||
self.headers = headers or {}
|
||||
self.client = _FakeClient()
|
||||
|
||||
class _State:
|
||||
pass
|
||||
|
||||
self.state = _State()
|
||||
|
||||
|
||||
def _run(coro):
|
||||
return asyncio.run(coro)
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# ABC + registry
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_oauth_provider_defaults_supports_token_false():
|
||||
assert _OAuthOnly().supports_token is False
|
||||
|
||||
|
||||
def test_oauth_provider_verify_token_raises_not_implemented():
|
||||
with pytest.raises(NotImplementedError):
|
||||
_OAuthOnly().verify_token(token="x")
|
||||
|
||||
|
||||
def test_list_token_providers_filters_to_supports_token():
|
||||
register_provider(_OAuthOnly())
|
||||
register_provider(_TokenProvider())
|
||||
names = [p.name for p in list_token_providers()]
|
||||
assert names == ["tok"]
|
||||
|
||||
|
||||
def test_list_token_providers_empty_when_none_registered():
|
||||
register_provider(_OAuthOnly())
|
||||
assert list_token_providers() == []
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Bearer extraction
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"header,expected",
|
||||
[
|
||||
("Bearer abc123", "abc123"),
|
||||
("bearer abc123", "abc123"),
|
||||
("BEARER abc123", "abc123"),
|
||||
("Bearer spaced ", "spaced"),
|
||||
("Basic abc123", ""),
|
||||
("abc123", ""),
|
||||
("", ""),
|
||||
],
|
||||
)
|
||||
def test_extract_bearer_token(header, expected):
|
||||
req = _FakeRequest(headers={"authorization": header} if header else {})
|
||||
assert token_auth.extract_bearer_token(req) == expected
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# authenticate_token (provider stacking)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_authenticate_token_accepts_valid():
|
||||
register_provider(_TokenProvider(secret="good-secret"))
|
||||
req = _FakeRequest(headers={"authorization": "Bearer good-secret"})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
assert unreachable is None
|
||||
assert principal is not None
|
||||
assert principal.provider == "tok"
|
||||
assert principal.scopes == ("drain",)
|
||||
|
||||
|
||||
def test_authenticate_token_rejects_wrong_secret():
|
||||
register_provider(_TokenProvider(secret="good-secret"))
|
||||
req = _FakeRequest(headers={"authorization": "Bearer wrong"})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
assert principal is None
|
||||
assert unreachable is None
|
||||
|
||||
|
||||
def test_authenticate_token_no_token_returns_none():
|
||||
register_provider(_TokenProvider())
|
||||
req = _FakeRequest(headers={})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
assert principal is None and unreachable is None
|
||||
|
||||
|
||||
def test_authenticate_token_stacks_first_match_wins():
|
||||
register_provider(_TokenProvider(secret="aaa"))
|
||||
second = _TokenProvider(secret="bbb")
|
||||
second.name = "tok2"
|
||||
register_provider(second)
|
||||
req = _FakeRequest(headers={"authorization": "Bearer bbb"})
|
||||
principal, _ = token_auth.authenticate_token(req)
|
||||
assert principal is not None and principal.provider == "tok2"
|
||||
|
||||
|
||||
def test_authenticate_token_unreachable_remembered():
|
||||
register_provider(_UnreachableTokenProvider())
|
||||
req = _FakeRequest(headers={"authorization": "Bearer anything"})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
assert principal is None
|
||||
assert unreachable == "tok-down"
|
||||
|
||||
|
||||
def test_authenticate_token_unreachable_then_valid_provider_wins():
|
||||
register_provider(_UnreachableTokenProvider())
|
||||
register_provider(_TokenProvider(secret="good"))
|
||||
req = _FakeRequest(headers={"authorization": "Bearer good"})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
# A later provider accepting the token beats the earlier outage.
|
||||
assert principal is not None and principal.provider == "tok"
|
||||
assert unreachable is None
|
||||
|
||||
|
||||
def test_authenticate_token_buggy_provider_does_not_crash():
|
||||
register_provider(_BuggyTokenProvider())
|
||||
register_provider(_TokenProvider(secret="good"))
|
||||
req = _FakeRequest(headers={"authorization": "Bearer good"})
|
||||
principal, unreachable = token_auth.authenticate_token(req)
|
||||
assert principal is not None and principal.provider == "tok"
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------
|
||||
# Middleware seam (route-agnostic)
|
||||
# --------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _call_next_ok(request):
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
return JSONResponse({"ok": True}, status_code=200)
|
||||
|
||||
|
||||
def test_seam_passthrough_for_unregistered_route():
|
||||
register_provider(_TokenProvider())
|
||||
req = _FakeRequest(path="/api/something-else")
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 200
|
||||
assert getattr(req.state, "token_authenticated", False) is False
|
||||
|
||||
|
||||
def test_seam_accepts_valid_token_on_registered_route():
|
||||
register_provider(_TokenProvider(secret="good"))
|
||||
token_auth.register_token_route("/api/gateway/drain")
|
||||
req = _FakeRequest(
|
||||
path="/api/gateway/drain",
|
||||
headers={"authorization": "Bearer good"},
|
||||
)
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 200
|
||||
assert req.state.token_authenticated is True
|
||||
assert req.state.token_principal.provider == "tok"
|
||||
|
||||
|
||||
def test_seam_rejects_missing_token_401():
|
||||
register_provider(_TokenProvider())
|
||||
token_auth.register_token_route("/api/gateway/drain")
|
||||
req = _FakeRequest(path="/api/gateway/drain", headers={})
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_seam_rejects_wrong_token_401():
|
||||
register_provider(_TokenProvider(secret="good"))
|
||||
token_auth.register_token_route("/api/gateway/drain")
|
||||
req = _FakeRequest(
|
||||
path="/api/gateway/drain", headers={"authorization": "Bearer bad"}
|
||||
)
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_seam_fails_closed_when_no_token_provider():
|
||||
# Route registered but NO supports_token provider → 401, never open.
|
||||
register_provider(_OAuthOnly())
|
||||
token_auth.register_token_route("/api/gateway/drain")
|
||||
req = _FakeRequest(
|
||||
path="/api/gateway/drain", headers={"authorization": "Bearer anything"}
|
||||
)
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 401
|
||||
|
||||
|
||||
def test_seam_503_on_provider_unreachable():
|
||||
register_provider(_UnreachableTokenProvider())
|
||||
token_auth.register_token_route("/api/gateway/drain")
|
||||
req = _FakeRequest(
|
||||
path="/api/gateway/drain", headers={"authorization": "Bearer x"}
|
||||
)
|
||||
resp = _run(token_auth.token_auth_middleware(req, _call_next_ok))
|
||||
assert resp.status_code == 503
|
||||
@@ -249,6 +249,50 @@ class TestWebServerEndpoints:
|
||||
assert "active_sessions" in data
|
||||
assert data["can_update_hermes"] is True
|
||||
|
||||
def test_gateway_drain_begin_writes_marker(self):
|
||||
from gateway import drain_control
|
||||
|
||||
resp = self.client.post("/api/gateway/drain", json={"action": "drain"})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["ok"] is True and data["action"] == "drain"
|
||||
assert data["draining"] is True
|
||||
assert drain_control.drain_requested() is True
|
||||
# cleanup
|
||||
drain_control.clear_drain_request()
|
||||
|
||||
def test_gateway_drain_defaults_to_begin(self):
|
||||
from gateway import drain_control
|
||||
|
||||
resp = self.client.post("/api/gateway/drain", json={})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["action"] == "drain"
|
||||
assert drain_control.drain_requested() is True
|
||||
drain_control.clear_drain_request()
|
||||
|
||||
def test_gateway_drain_cancel_removes_marker(self):
|
||||
from gateway import drain_control
|
||||
|
||||
drain_control.write_drain_request()
|
||||
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert data["ok"] is True and data["action"] == "cancel"
|
||||
assert data["was_draining"] is True
|
||||
assert drain_control.drain_requested() is False
|
||||
|
||||
def test_gateway_drain_cancel_idempotent(self):
|
||||
from gateway import drain_control
|
||||
|
||||
resp = self.client.post("/api/gateway/drain", json={"action": "cancel"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["was_draining"] is False
|
||||
assert drain_control.drain_requested() is False
|
||||
|
||||
def test_gateway_drain_bad_action_400(self):
|
||||
resp = self.client.post("/api/gateway/drain", json={"action": "explode"})
|
||||
assert resp.status_code == 400
|
||||
|
||||
def test_get_status_hides_update_capability_in_managed_runtime(self, monkeypatch):
|
||||
import hermes_cli.web_server as web_server
|
||||
|
||||
|
||||
184
tests/plugins/dashboard_auth/test_drain_provider.py
Normal file
184
tests/plugins/dashboard_auth/test_drain_provider.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""Tests for the DrainSecretProvider plugin (non-interactive bearer secret).
|
||||
|
||||
Task 2.0b. Loads the bundled drain plugin module directly and exercises:
|
||||
* the entropy gate (assess_secret_strength) — fail-closed on weak secrets,
|
||||
* constant-time verify_token returning a scoped TokenPrincipal,
|
||||
* the register(ctx) entry point's env/config resolution, skip reasons, and
|
||||
token-route registration.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
import plugins.dashboard_auth.drain as drain_plugin
|
||||
from hermes_cli.dashboard_auth import TokenPrincipal, assert_protocol_compliance
|
||||
from hermes_cli.dashboard_auth import token_auth
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def drain():
|
||||
return drain_plugin
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clean_env_and_routes(monkeypatch):
|
||||
monkeypatch.delenv("HERMES_DASHBOARD_DRAIN_SECRET", raising=False)
|
||||
token_auth.clear_token_routes()
|
||||
yield
|
||||
token_auth.clear_token_routes()
|
||||
|
||||
|
||||
def _strong_secret() -> str:
|
||||
# token_urlsafe(32) → 43 url-safe-b64 chars ≈ 256 bits.
|
||||
return secrets.token_urlsafe(32)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Entropy gate
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEntropyGate:
|
||||
def test_strong_secret_passes(self, drain):
|
||||
assert drain.assess_secret_strength(_strong_secret()) is None
|
||||
|
||||
def test_empty_rejected(self, drain):
|
||||
assert drain.assess_secret_strength("") is not None
|
||||
|
||||
def test_too_short_rejected(self, drain):
|
||||
# 42 chars — one under the 43-char bar.
|
||||
assert drain.assess_secret_strength("a1B2c3" * 7) is not None
|
||||
|
||||
def test_long_but_repeated_rejected(self, drain):
|
||||
# 60 chars, one distinct character → low distinct count + low entropy.
|
||||
assert drain.assess_secret_strength("a" * 60) is not None
|
||||
|
||||
def test_long_but_few_distinct_rejected(self, drain):
|
||||
# 60 chars cycling through only 4 distinct characters.
|
||||
assert drain.assess_secret_strength("abcd" * 15) is not None
|
||||
|
||||
def test_custom_min_chars_enforced(self, drain):
|
||||
s = _strong_secret() # 43 chars
|
||||
assert drain.assess_secret_strength(s, min_chars=999) is not None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provider behaviour
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestProvider:
|
||||
def test_protocol_compliance(self, drain):
|
||||
assert_protocol_compliance(drain.DrainSecretProvider)
|
||||
|
||||
def test_supports_token_flag(self, drain):
|
||||
p = drain.DrainSecretProvider(secret=_strong_secret())
|
||||
assert p.supports_token is True
|
||||
|
||||
def test_verify_token_accepts_matching_secret(self, drain):
|
||||
s = _strong_secret()
|
||||
p = drain.DrainSecretProvider(secret=s, scope="drain")
|
||||
principal = p.verify_token(token=s)
|
||||
assert isinstance(principal, TokenPrincipal)
|
||||
assert principal.principal == "drain-control"
|
||||
assert principal.provider == "drain-secret"
|
||||
assert principal.scopes == ("drain",)
|
||||
|
||||
def test_verify_token_rejects_wrong_secret(self, drain):
|
||||
p = drain.DrainSecretProvider(secret=_strong_secret())
|
||||
assert p.verify_token(token=_strong_secret()) is None
|
||||
|
||||
def test_verify_token_rejects_empty(self, drain):
|
||||
p = drain.DrainSecretProvider(secret=_strong_secret())
|
||||
assert p.verify_token(token="") is None
|
||||
|
||||
def test_custom_scope_attached(self, drain):
|
||||
s = _strong_secret()
|
||||
p = drain.DrainSecretProvider(secret=s, scope="lifecycle")
|
||||
assert p.verify_token(token=s).scopes == ("lifecycle",)
|
||||
|
||||
def test_construction_rejects_weak_secret(self, drain):
|
||||
with pytest.raises(ValueError):
|
||||
drain.DrainSecretProvider(secret="weak")
|
||||
|
||||
def test_verify_session_returns_none_not_raises(self, drain):
|
||||
# Stacks harmlessly in the cookie-verify loop.
|
||||
p = drain.DrainSecretProvider(secret=_strong_secret())
|
||||
assert p.verify_session(access_token="anything") is None
|
||||
|
||||
def test_interactive_methods_raise(self, drain):
|
||||
p = drain.DrainSecretProvider(secret=_strong_secret())
|
||||
with pytest.raises(NotImplementedError):
|
||||
p.start_login(redirect_uri="r")
|
||||
with pytest.raises(NotImplementedError):
|
||||
p.complete_login(code="c", state="s", code_verifier="v", redirect_uri="r")
|
||||
with pytest.raises(NotImplementedError):
|
||||
p.refresh_session(refresh_token="r")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# register() entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegister:
|
||||
def test_skips_when_no_secret(self, drain, monkeypatch):
|
||||
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
||||
ctx = MagicMock()
|
||||
drain.register(ctx)
|
||||
ctx.register_dashboard_auth_provider.assert_not_called()
|
||||
assert "HERMES_DASHBOARD_DRAIN_SECRET" in drain.LAST_SKIP_REASON
|
||||
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
||||
|
||||
def test_skips_and_fails_closed_on_weak_secret(self, drain, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", "tooweak")
|
||||
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
||||
ctx = MagicMock()
|
||||
drain.register(ctx)
|
||||
ctx.register_dashboard_auth_provider.assert_not_called()
|
||||
assert "rejected" in drain.LAST_SKIP_REASON
|
||||
# fail-closed: the route is NOT token-authable, so it stays gated.
|
||||
assert not token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
||||
|
||||
def test_registers_with_strong_env_secret(self, drain, monkeypatch):
|
||||
s = _strong_secret()
|
||||
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
||||
monkeypatch.setattr(drain, "_load_config_drain_auth_section", lambda: {})
|
||||
ctx = MagicMock()
|
||||
drain.register(ctx)
|
||||
ctx.register_dashboard_auth_provider.assert_called_once()
|
||||
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
|
||||
assert isinstance(provider, drain.DrainSecretProvider)
|
||||
assert provider.verify_token(token=s) is not None
|
||||
assert drain.LAST_SKIP_REASON == ""
|
||||
# The drain endpoint is now token-authable.
|
||||
assert token_auth.is_token_route(drain.DRAIN_ROUTE_PATH)
|
||||
|
||||
def test_config_scope_applied(self, drain, monkeypatch):
|
||||
s = _strong_secret()
|
||||
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
||||
monkeypatch.setattr(
|
||||
drain, "_load_config_drain_auth_section", lambda: {"scope": "lifecycle"}
|
||||
)
|
||||
ctx = MagicMock()
|
||||
drain.register(ctx)
|
||||
provider = ctx.register_dashboard_auth_provider.call_args.args[0]
|
||||
assert provider.verify_token(token=s).scopes == ("lifecycle",)
|
||||
|
||||
def test_config_min_secret_chars_can_reject_otherwise_ok_secret(
|
||||
self, drain, monkeypatch
|
||||
):
|
||||
s = _strong_secret() # 43 chars — fine by default, too short at 999
|
||||
monkeypatch.setenv("HERMES_DASHBOARD_DRAIN_SECRET", s)
|
||||
monkeypatch.setattr(
|
||||
drain,
|
||||
"_load_config_drain_auth_section",
|
||||
lambda: {"min_secret_chars": 999},
|
||||
)
|
||||
ctx = MagicMock()
|
||||
drain.register(ctx)
|
||||
ctx.register_dashboard_auth_provider.assert_not_called()
|
||||
assert "rejected" in drain.LAST_SKIP_REASON
|
||||
Reference in New Issue
Block a user