Compare commits

...

11 Commits

Author SHA1 Message Date
Teknium
e8c0f26202 refactor: simplify credential pool code — extract helpers, collapse extras, dedup patterns
- _load_config_safe(): replace 4 identical try/except/import blocks
- _iter_custom_providers(): shared generator for custom provider iteration
- PooledCredential.extra dict: collapse 11 round-trip-only fields
  (token_type, scope, client_id, portal_base_url, obtained_at,
  expires_in, agent_key_id, agent_key_expires_in, agent_key_reused,
  agent_key_obtained_at, tls) into a single extra dict with
  __getattr__ for backward-compatible access
- _available_entries(): shared exhaustion-check between select and peek
- Dedup anthropic OAuth seeding (hermes_pkce + claude_code identical)
- SimpleNamespace replaces class _Args boilerplate in auth_commands
- _try_resolve_from_custom_pool(): shared pool-check in runtime_provider

Net -17 lines. All 383 targeted tests pass.
2026-03-31 03:01:52 -07:00
Teknium
8b4e0c7ae0 chore: remove excalidraw diagram from repo (external link only) 2026-03-31 02:42:32 -07:00
Teknium
71ba4cf39d docs: add comprehensive credential pool documentation
- New page: website/docs/user-guide/features/credential-pools.md
  Full guide covering quick start, CLI commands, rotation strategies,
  error recovery, custom endpoint pools, auto-discovery, thread safety,
  architecture, and storage format.
- Updated fallback-providers.md to reference credential pools as the
  first layer of resilience (same-provider rotation before cross-provider)
- Added hermes auth to CLI commands reference with usage examples
- Added credential_pool_strategies to configuration guide
2026-03-31 02:32:45 -07:00
Teknium
09c065b94b fix(tests): update setup wizard pool tests for unified select_provider_and_model flow
The setup wizard now delegates to select_provider_and_model() instead
of using its own prompt_choice-based provider picker. Tests needed:
- Mock select_provider_and_model as no-op (provider pre-written to config)
- Call _stub_tts BEFORE custom prompt_choice mock (it overwrites it)
- Pre-write model.provider to config so the pool step is reached
2026-03-31 02:23:47 -07:00
Teknium
596bbc9ec3 docs: add Excalidraw diagram of full credential pool flow
Comprehensive architecture diagram showing:
- Credential sources (env vars, auth.json OAuth, config.yaml, CLI)
- Pool storage and auto-seeding
- Runtime resolution paths (registry, custom, OpenRouter)
- Error recovery (429 retry-then-rotate, 402 immediate, 401 refresh)
- CLI management commands and strategy configuration

Open at: https://excalidraw.com/#json=2Ycqhqpi6f12E_3ITyiwh,c7u9jSt5BwrmiVzHGbm87g
2026-03-31 02:08:48 -07:00
Teknium
7ec943a883 feat(auth): support custom endpoint credential pools keyed by provider name
Custom OpenAI-compatible endpoints all share provider='custom', making
the provider-keyed pool useless. Now pools for custom endpoints are
keyed by 'custom:<normalized_name>' where the name comes from the
custom_providers config list (auto-generated from URL hostname).

- Pool key format: 'custom:together.ai', 'custom:local-(localhost:8080)'
- load_pool('custom:name') seeds from custom_providers api_key AND
  model.api_key when base_url matches
- hermes auth add/list now shows custom endpoints alongside registry
  providers
- _resolve_openrouter_runtime and _resolve_named_custom_runtime check
  pool before falling back to single config key
- 6 new tests covering custom pool keying, seeding, and listing
2026-03-31 02:04:58 -07:00
Teknium
15f3229f13 fix(tests): update runtime_provider tests for config.yaml source of truth (#4165)
Tests were using OPENAI_BASE_URL env var which is no longer consulted
after #4165. Updated to use model config (provider, base_url, api_key)
which is the new single source of truth for custom endpoint URLs.
2026-03-31 01:47:28 -07:00
Teknium
ae698c3195 feat(auth): add interactive mode for bare 'hermes auth' command
When 'hermes auth' is called without a subcommand, it now launches an
interactive wizard that:

1. Shows full credential pool status across all providers
2. Offers a menu: add, remove, reset cooldowns, set strategy
3. For OAuth-capable providers (anthropic, nous, openai-codex), the
   add flow explicitly asks 'API key or OAuth login?' — making it
   clear that both auth types are supported for the same provider
4. Strategy picker shows all 4 options (fill_first, round_robin,
   least_used, random) with the current selection marked
5. Remove flow shows entries with indices for easy selection

The subcommand paths (hermes auth add/list/remove/reset) still work
exactly as before for scripted/non-interactive use.
2026-03-31 01:44:31 -07:00
Teknium
7be2fa5492 feat(auth): add thread safety, least_used strategy, and request counting
- Add threading.Lock to CredentialPool for gateway thread safety
  (concurrent requests from multiple gateway sessions could race on
  pool state mutations without this)
- Add 'least_used' rotation strategy that selects the credential
  with the lowest request_count, distributing load more evenly
- Add request_count field to PooledCredential for usage tracking
- Add mark_used() method to increment per-credential request counts
- Wrap select(), mark_exhausted_and_rotate(), and try_refresh_current()
  with lock acquisition
- Add tests: least_used selection, mark_used counting, concurrent
  thread safety (4 threads × 20 selects with no corruption)
2026-03-31 01:44:31 -07:00
Teknium
b34f893906 fix(tests): prevent pool auto-seeding from host env in credential pool tests
Tests for non-pool Anthropic paths and auth remove were failing when
host env vars (ANTHROPIC_API_KEY) or file-backed OAuth credentials
were present. The pool auto-seeding picked these up, causing unexpected
pool entries in tests.

- Mock _select_pool_entry in auxiliary_client OAuth flag tests
- Clear Anthropic env vars and mock _seed_from_singletons in auth remove test
2026-03-31 01:44:31 -07:00
kshitijk4poor
b60a43dd7e feat(auth): add same-provider credential pools and rotation UX
Add same-provider credential pooling so Hermes can rotate across
multiple credentials for a single provider, recover from exhausted
credentials without jumping providers immediately, and configure
that behavior directly in hermes setup.

- agent/credential_pool.py: persisted per-provider credential pools
- hermes auth add/list/remove/reset CLI commands
- 429/402/401 recovery with pool rotation in run_agent.py
- Setup wizard integration for pool strategy configuration
- Auto-seeding from env vars and existing OAuth state

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
Salvaged from PR #2647
2026-03-31 01:44:31 -07:00
24 changed files with 4757 additions and 201 deletions

View File

@@ -307,74 +307,89 @@ def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool:
return now_ms < (expires_at - 60_000)
def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]:
"""Attempt to refresh an expired Claude Code OAuth token.
Uses the same token endpoint and client_id as Claude Code / OpenCode.
Only works for credentials that have a refresh token (from claude /login
or claude setup-token with OAuth flow).
Tries the new platform.claude.com endpoint first (Claude Code >=2.1.81),
then falls back to console.anthropic.com for older tokens.
Returns the new access token, or None if refresh fails.
"""
def refresh_anthropic_oauth_pure(refresh_token: str, *, use_json: bool = False) -> Dict[str, Any]:
"""Refresh an Anthropic OAuth token without mutating local credential files."""
import time
import urllib.parse
import urllib.request
if not refresh_token:
raise ValueError("refresh_token is required")
client_id = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
if use_json:
data = json.dumps({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}).encode()
content_type = "application/json"
else:
data = urllib.parse.urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}).encode()
content_type = "application/x-www-form-urlencoded"
token_endpoints = [
"https://platform.claude.com/v1/oauth/token",
"https://console.anthropic.com/v1/oauth/token",
]
last_error = None
for endpoint in token_endpoints:
req = urllib.request.Request(
endpoint,
data=data,
headers={
"Content-Type": content_type,
"User-Agent": f"claude-cli/{_get_claude_code_version()} (external, cli)",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
except Exception as exc:
last_error = exc
logger.debug("Anthropic token refresh failed at %s: %s", endpoint, exc)
continue
access_token = result.get("access_token", "")
if not access_token:
raise ValueError("Anthropic refresh response was missing access_token")
next_refresh = result.get("refresh_token", refresh_token)
expires_in = result.get("expires_in", 3600)
return {
"access_token": access_token,
"refresh_token": next_refresh,
"expires_at_ms": int(time.time() * 1000) + (expires_in * 1000),
}
if last_error is not None:
raise last_error
raise ValueError("Anthropic token refresh failed")
def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]:
"""Attempt to refresh an expired Claude Code OAuth token."""
refresh_token = creds.get("refreshToken", "")
if not refresh_token:
logger.debug("No refresh token available — cannot refresh")
return None
# Client ID used by Claude Code's OAuth flow
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
# Anthropic migrated OAuth from console.anthropic.com to platform.claude.com
# (Claude Code v2.1.81+). Try new endpoint first, fall back to old.
token_endpoints = [
"https://platform.claude.com/v1/oauth/token",
"https://console.anthropic.com/v1/oauth/token",
]
payload = json.dumps({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
}).encode()
headers = {
"Content-Type": "application/json",
"User-Agent": f"claude-cli/{_get_claude_code_version()} (external, cli)",
}
for endpoint in token_endpoints:
req = urllib.request.Request(
endpoint, data=payload, headers=headers, method="POST",
try:
refreshed = refresh_anthropic_oauth_pure(refresh_token, use_json=False)
_write_claude_code_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
new_access = result.get("access_token", "")
new_refresh = result.get("refresh_token", refresh_token)
expires_in = result.get("expires_in", 3600)
if new_access:
new_expires_ms = int(time.time() * 1000) + (expires_in * 1000)
# Parse scopes from refresh response — Claude Code >=2.1.81
# requires a "scopes" field in the credential store and checks
# for "user:inference" before accepting the token as valid.
scope_str = result.get("scope", "")
scopes = scope_str.split() if scope_str else None
_write_claude_code_credentials(
new_access, new_refresh, new_expires_ms, scopes=scopes,
)
logger.debug("Refreshed Claude Code OAuth token via %s", endpoint)
return new_access
except Exception as e:
logger.debug("Token refresh failed at %s: %s", endpoint, e)
return None
logger.debug("Successfully refreshed Claude Code OAuth token")
return refreshed["access_token"]
except Exception as e:
logger.debug("Failed to refresh Claude Code token: %s", e)
return None
def _write_claude_code_credentials(
@@ -570,10 +585,208 @@ def run_oauth_setup_token() -> Optional[str]:
return None
# ── Hermes-native PKCE OAuth flow ────────────────────────────────────────
# Mirrors the flow used by Claude Code, pi-ai, and OpenCode.
# Stores credentials in ~/.hermes/.anthropic_oauth.json (our own file).
_OAUTH_CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
_OAUTH_TOKEN_URL = "https://console.anthropic.com/v1/oauth/token"
_OAUTH_REDIRECT_URI = "https://console.anthropic.com/oauth/code/callback"
_OAUTH_SCOPES = "org:create_api_key user:profile user:inference"
_HERMES_OAUTH_FILE = get_hermes_home() / ".anthropic_oauth.json"
def _generate_pkce() -> tuple:
"""Generate PKCE code_verifier and code_challenge (S256)."""
import base64
import hashlib
import secrets
verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode()
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b"=").decode()
return verifier, challenge
def run_hermes_oauth_login_pure() -> Optional[Dict[str, Any]]:
"""Run Hermes-native OAuth PKCE flow and return credential state."""
import time
import webbrowser
verifier, challenge = _generate_pkce()
params = {
"code": "true",
"client_id": _OAUTH_CLIENT_ID,
"response_type": "code",
"redirect_uri": _OAUTH_REDIRECT_URI,
"scope": _OAUTH_SCOPES,
"code_challenge": challenge,
"code_challenge_method": "S256",
"state": verifier,
}
from urllib.parse import urlencode
auth_url = f"https://claude.ai/oauth/authorize?{urlencode(params)}"
print()
print("Authorize Hermes with your Claude Pro/Max subscription.")
print()
print("╭─ Claude Pro/Max Authorization ────────────────────╮")
print("│ │")
print("│ Open this link in your browser: │")
print("╰───────────────────────────────────────────────────╯")
print()
print(f" {auth_url}")
print()
try:
webbrowser.open(auth_url)
print(" (Browser opened automatically)")
except Exception:
pass
print()
print("After authorizing, you'll see a code. Paste it below.")
print()
try:
auth_code = input("Authorization code: ").strip()
except (KeyboardInterrupt, EOFError):
return None
if not auth_code:
print("No code entered.")
return None
splits = auth_code.split("#")
code = splits[0]
state = splits[1] if len(splits) > 1 else ""
try:
import urllib.request
exchange_data = json.dumps({
"grant_type": "authorization_code",
"client_id": _OAUTH_CLIENT_ID,
"code": code,
"state": state,
"redirect_uri": _OAUTH_REDIRECT_URI,
"code_verifier": verifier,
}).encode()
req = urllib.request.Request(
_OAUTH_TOKEN_URL,
data=exchange_data,
headers={
"Content-Type": "application/json",
"User-Agent": f"claude-cli/{_get_claude_code_version()} (external, cli)",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read().decode())
except Exception as e:
print(f"Token exchange failed: {e}")
return None
access_token = result.get("access_token", "")
refresh_token = result.get("refresh_token", "")
expires_in = result.get("expires_in", 3600)
if not access_token:
print("No access token in response.")
return None
expires_at_ms = int(time.time() * 1000) + (expires_in * 1000)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_at_ms": expires_at_ms,
}
def run_hermes_oauth_login() -> Optional[str]:
"""Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription.
Opens a browser to claude.ai for authorization, prompts for the code,
exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json.
Returns the access token on success, None on failure.
"""
result = run_hermes_oauth_login_pure()
if not result:
return None
access_token = result["access_token"]
refresh_token = result["refresh_token"]
expires_at_ms = result["expires_at_ms"]
_save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms)
_write_claude_code_credentials(access_token, refresh_token, expires_at_ms)
print("Authentication successful!")
return access_token
def _save_hermes_oauth_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None:
"""Save OAuth credentials to ~/.hermes/.anthropic_oauth.json."""
data = {
"accessToken": access_token,
"refreshToken": refresh_token,
"expiresAt": expires_at_ms,
}
try:
_HERMES_OAUTH_FILE.parent.mkdir(parents=True, exist_ok=True)
_HERMES_OAUTH_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8")
_HERMES_OAUTH_FILE.chmod(0o600)
except (OSError, IOError) as e:
logger.debug("Failed to save Hermes OAuth credentials: %s", e)
def read_hermes_oauth_credentials() -> Optional[Dict[str, Any]]:
"""Read Hermes-managed OAuth credentials from ~/.hermes/.anthropic_oauth.json."""
if _HERMES_OAUTH_FILE.exists():
try:
data = json.loads(_HERMES_OAUTH_FILE.read_text(encoding="utf-8"))
if data.get("accessToken"):
return data
except (json.JSONDecodeError, OSError, IOError) as e:
logger.debug("Failed to read Hermes OAuth credentials: %s", e)
return None
def refresh_hermes_oauth_token() -> Optional[str]:
"""Refresh the Hermes-managed OAuth token using the stored refresh token.
Returns the new access token, or None if refresh fails.
"""
creds = read_hermes_oauth_credentials()
if not creds or not creds.get("refreshToken"):
return None
try:
refreshed = refresh_anthropic_oauth_pure(
creds["refreshToken"],
use_json=True,
)
_save_hermes_oauth_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
_write_claude_code_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
logger.debug("Successfully refreshed Hermes OAuth token")
return refreshed["access_token"]
except Exception as e:
logger.debug("Failed to refresh Hermes OAuth token: %s", e)
return None
# ---------------------------------------------------------------------------
@@ -1106,4 +1319,4 @@ def normalize_anthropic_response(
reasoning_details=None,
),
finish_reason,
)
)

View File

@@ -47,6 +47,7 @@ from typing import Any, Dict, List, Optional, Tuple
from openai import OpenAI
from agent.credential_pool import load_pool
from hermes_cli.config import get_hermes_home
from hermes_constants import OPENROUTER_BASE_URL
@@ -96,6 +97,45 @@ _CODEX_AUX_MODEL = "gpt-5.2-codex"
_CODEX_AUX_BASE_URL = "https://chatgpt.com/backend-api/codex"
def _select_pool_entry(provider: str) -> Tuple[bool, Optional[Any]]:
"""Return (pool_exists_for_provider, selected_entry)."""
try:
pool = load_pool(provider)
except Exception as exc:
logger.debug("Auxiliary client: could not load pool for %s: %s", provider, exc)
return False, None
if not pool or not pool.has_credentials():
return False, None
try:
return True, pool.select()
except Exception as exc:
logger.debug("Auxiliary client: could not select pool entry for %s: %s", provider, exc)
return True, None
def _pool_runtime_api_key(entry: Any) -> str:
if entry is None:
return ""
# Use the PooledCredential.runtime_api_key property which handles
# provider-specific fallback (e.g. agent_key for nous).
key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
return str(key or "").strip()
def _pool_runtime_base_url(entry: Any, fallback: str = "") -> str:
if entry is None:
return str(fallback or "").strip().rstrip("/")
# runtime_base_url handles provider-specific logic (e.g. nous prefers inference_base_url).
# Fall back through inference_base_url and base_url for non-PooledCredential entries.
url = (
getattr(entry, "runtime_base_url", None)
or getattr(entry, "inference_base_url", None)
or getattr(entry, "base_url", None)
or fallback
)
return str(url or "").strip().rstrip("/")
# ── Codex Responses → chat.completions adapter ─────────────────────────────
# All auxiliary consumers call client.chat.completions.create(**kwargs) and
# read response.choices[0].message.content. This adapter translates those
@@ -439,6 +479,22 @@ def _read_nous_auth() -> Optional[dict]:
Returns the provider state dict if Nous is active with tokens,
otherwise None.
"""
pool_present, entry = _select_pool_entry("nous")
if pool_present:
if entry is None:
return None
return {
"access_token": getattr(entry, "access_token", ""),
"refresh_token": getattr(entry, "refresh_token", None),
"agent_key": getattr(entry, "agent_key", None),
"inference_base_url": _pool_runtime_base_url(entry, _NOUS_DEFAULT_BASE_URL),
"portal_base_url": getattr(entry, "portal_base_url", None),
"client_id": getattr(entry, "client_id", None),
"scope": getattr(entry, "scope", None),
"token_type": getattr(entry, "token_type", "Bearer"),
"source": "pool",
}
try:
if not _AUTH_JSON_PATH.is_file():
return None
@@ -467,6 +523,11 @@ def _nous_base_url() -> str:
def _read_codex_access_token() -> Optional[str]:
"""Read a valid, non-expired Codex OAuth access token from Hermes auth store."""
pool_present, entry = _select_pool_entry("openai-codex")
if pool_present:
token = _pool_runtime_api_key(entry)
return token or None
try:
from hermes_cli.auth import _read_codex_tokens
data = _read_codex_tokens()
@@ -513,6 +574,24 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if provider_id == "anthropic":
return _try_anthropic()
pool_present, entry = _select_pool_entry(provider_id)
if pool_present:
api_key = _pool_runtime_api_key(entry)
if not api_key:
continue
base_url = _pool_runtime_base_url(entry, pconfig.inference_base_url) or pconfig.inference_base_url
model = _API_KEY_PROVIDER_AUX_MODELS.get(provider_id, "default")
logger.debug("Auxiliary text client: %s (%s) via pool", pconfig.name, model)
extra = {}
if "api.kimi.com" in base_url.lower():
extra["default_headers"] = {"User-Agent": "KimiCLI/1.0"}
elif "api.githubcopilot.com" in base_url.lower():
from hermes_cli.models import copilot_default_headers
extra["default_headers"] = copilot_default_headers()
return OpenAI(api_key=api_key, base_url=base_url, **extra), model
creds = resolve_api_key_provider_credentials(provider_id)
api_key = str(creds.get("api_key", "")).strip()
if not api_key:
@@ -562,6 +641,16 @@ def _get_auxiliary_env_override(task: str, suffix: str) -> Optional[str]:
def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]:
pool_present, entry = _select_pool_entry("openrouter")
if pool_present:
or_key = _pool_runtime_api_key(entry)
if not or_key:
return None, None
base_url = _pool_runtime_base_url(entry, OPENROUTER_BASE_URL) or OPENROUTER_BASE_URL
logger.debug("Auxiliary client: OpenRouter via pool")
return OpenAI(api_key=or_key, base_url=base_url,
default_headers=_OR_HEADERS), _OPENROUTER_MODEL
or_key = os.getenv("OPENROUTER_API_KEY")
if not or_key:
return None, None
@@ -577,9 +666,13 @@ def _try_nous() -> Tuple[Optional[OpenAI], Optional[str]]:
global auxiliary_is_nous
auxiliary_is_nous = True
logger.debug("Auxiliary client: Nous Portal")
model = "gemini-3-flash" if nous.get("source") == "pool" else _NOUS_MODEL
return (
OpenAI(api_key=_nous_api_key(nous), base_url=_nous_base_url()),
_NOUS_MODEL,
OpenAI(
api_key=_nous_api_key(nous),
base_url=str(nous.get("inference_base_url") or _nous_base_url()).rstrip("/"),
),
model,
)
@@ -655,11 +748,19 @@ def _try_custom_endpoint() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_codex() -> Tuple[Optional[Any], Optional[str]]:
codex_token = _read_codex_access_token()
if not codex_token:
return None, None
pool_present, entry = _select_pool_entry("openai-codex")
if pool_present:
codex_token = _pool_runtime_api_key(entry)
if not codex_token:
return None, None
base_url = _pool_runtime_base_url(entry, _CODEX_AUX_BASE_URL) or _CODEX_AUX_BASE_URL
else:
codex_token = _read_codex_access_token()
if not codex_token:
return None, None
base_url = _CODEX_AUX_BASE_URL
logger.debug("Auxiliary client: Codex OAuth (%s via Responses API)", _CODEX_AUX_MODEL)
real_client = OpenAI(api_key=codex_token, base_url=_CODEX_AUX_BASE_URL)
real_client = OpenAI(api_key=codex_token, base_url=base_url)
return CodexAuxiliaryClient(real_client, _CODEX_AUX_MODEL), _CODEX_AUX_MODEL
@@ -669,14 +770,21 @@ def _try_anthropic() -> Tuple[Optional[Any], Optional[str]]:
except ImportError:
return None, None
token = resolve_anthropic_token()
pool_present, entry = _select_pool_entry("anthropic")
if pool_present:
if entry is None:
return None, None
token = _pool_runtime_api_key(entry)
else:
entry = None
token = resolve_anthropic_token()
if not token:
return None, None
# Allow base URL override from config.yaml model.base_url, but only
# when the configured provider is anthropic — otherwise a non-Anthropic
# base_url (e.g. Codex endpoint) would leak into Anthropic requests.
base_url = _ANTHROPIC_DEFAULT_BASE_URL
base_url = _pool_runtime_base_url(entry, _ANTHROPIC_DEFAULT_BASE_URL) if pool_present else _ANTHROPIC_DEFAULT_BASE_URL
try:
from hermes_cli.config import load_config
cfg = load_config()

844
agent/credential_pool.py Normal file
View File

@@ -0,0 +1,844 @@
"""Persistent multi-credential pool for same-provider failover."""
from __future__ import annotations
import logging
import random
import threading
import time
import uuid
import os
from dataclasses import dataclass, fields, replace
from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import OPENROUTER_BASE_URL
import hermes_cli.auth as auth_mod
from hermes_cli.auth import (
ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
PROVIDER_REGISTRY,
_agent_key_is_usable,
_codex_access_token_is_expiring,
_decode_jwt_claims,
_is_expiring,
_load_auth_store,
_load_provider_state,
read_credential_pool,
write_credential_pool,
)
logger = logging.getLogger(__name__)
def _load_config_safe() -> Optional[dict]:
"""Load config.yaml, returning None on any error."""
try:
from hermes_cli.config import load_config
return load_config()
except Exception:
return None
# --- Status and type constants ---
STATUS_OK = "ok"
STATUS_EXHAUSTED = "exhausted"
AUTH_TYPE_OAUTH = "oauth"
AUTH_TYPE_API_KEY = "api_key"
SOURCE_MANUAL = "manual"
STRATEGY_FILL_FIRST = "fill_first"
STRATEGY_ROUND_ROBIN = "round_robin"
STRATEGY_RANDOM = "random"
STRATEGY_LEAST_USED = "least_used"
SUPPORTED_POOL_STRATEGIES = {
STRATEGY_FILL_FIRST,
STRATEGY_ROUND_ROBIN,
STRATEGY_RANDOM,
STRATEGY_LEAST_USED,
}
# Cooldown before retrying an exhausted credential.
# 429 (rate-limited) cools down faster since quotas reset frequently.
# 402 (billing/quota) and other codes use a longer default.
EXHAUSTED_TTL_429_SECONDS = 60 * 60 # 1 hour
EXHAUSTED_TTL_DEFAULT_SECONDS = 24 * 60 * 60 # 24 hours
# Pool key prefix for custom OpenAI-compatible endpoints.
# Custom endpoints all share provider='custom' but are keyed by their
# custom_providers name: 'custom:<normalized_name>'.
CUSTOM_POOL_PREFIX = "custom:"
# Fields that are only round-tripped through JSON — never used for logic as attributes.
_EXTRA_KEYS = frozenset({
"token_type", "scope", "client_id", "portal_base_url", "obtained_at",
"expires_in", "agent_key_id", "agent_key_expires_in", "agent_key_reused",
"agent_key_obtained_at", "tls",
})
@dataclass
class PooledCredential:
provider: str
id: str
label: str
auth_type: str
priority: int
source: str
access_token: str
refresh_token: Optional[str] = None
last_status: Optional[str] = None
last_status_at: Optional[float] = None
last_error_code: Optional[int] = None
base_url: Optional[str] = None
expires_at: Optional[str] = None
expires_at_ms: Optional[int] = None
last_refresh: Optional[str] = None
inference_base_url: Optional[str] = None
agent_key: Optional[str] = None
agent_key_expires_at: Optional[str] = None
request_count: int = 0
extra: Dict[str, Any] = None # type: ignore[assignment]
def __post_init__(self):
if self.extra is None:
self.extra = {}
def __getattr__(self, name: str):
if name in _EXTRA_KEYS:
return self.extra.get(name)
raise AttributeError(f"'{type(self).__name__}' object has no attribute {name!r}")
@classmethod
def from_dict(cls, provider: str, payload: Dict[str, Any]) -> "PooledCredential":
field_names = {f.name for f in fields(cls) if f.name != "provider"}
data = {k: payload.get(k) for k in field_names if k in payload}
extra = {k: payload[k] for k in _EXTRA_KEYS if k in payload and payload[k] is not None}
data["extra"] = extra
data.setdefault("id", uuid.uuid4().hex[:6])
data.setdefault("label", payload.get("source", provider))
data.setdefault("auth_type", AUTH_TYPE_API_KEY)
data.setdefault("priority", 0)
data.setdefault("source", SOURCE_MANUAL)
data.setdefault("access_token", "")
return cls(provider=provider, **data)
def to_dict(self) -> Dict[str, Any]:
_ALWAYS_EMIT = {"last_status", "last_status_at", "last_error_code"}
result: Dict[str, Any] = {}
for field_def in fields(self):
if field_def.name in ("provider", "extra"):
continue
value = getattr(self, field_def.name)
if value is not None or field_def.name in _ALWAYS_EMIT:
result[field_def.name] = value
for k, v in self.extra.items():
if v is not None:
result[k] = v
return result
@property
def runtime_api_key(self) -> str:
if self.provider == "nous":
return str(self.agent_key or self.access_token or "")
return str(self.access_token or "")
@property
def runtime_base_url(self) -> Optional[str]:
if self.provider == "nous":
return self.inference_base_url or self.base_url
return self.base_url
def label_from_token(token: str, fallback: str) -> str:
claims = _decode_jwt_claims(token)
for key in ("email", "preferred_username", "upn"):
value = claims.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return fallback
def _next_priority(entries: List[PooledCredential]) -> int:
return max((entry.priority for entry in entries), default=-1) + 1
def _is_manual_source(source: str) -> bool:
normalized = (source or "").strip().lower()
return normalized == SOURCE_MANUAL or normalized.startswith(f"{SOURCE_MANUAL}:")
def _exhausted_ttl(error_code: Optional[int]) -> int:
"""Return cooldown seconds based on the HTTP status that caused exhaustion."""
if error_code == 429:
return EXHAUSTED_TTL_429_SECONDS
return EXHAUSTED_TTL_DEFAULT_SECONDS
def _normalize_custom_pool_name(name: str) -> str:
"""Normalize a custom provider name for use as a pool key suffix."""
return name.strip().lower().replace(" ", "-")
def _iter_custom_providers(config: Optional[dict] = None):
"""Yield (normalized_name, entry_dict) for each valid custom_providers entry."""
if config is None:
config = _load_config_safe()
if config is None:
return
custom_providers = config.get("custom_providers")
if not isinstance(custom_providers, list):
return
for entry in custom_providers:
if not isinstance(entry, dict):
continue
name = entry.get("name")
if not isinstance(name, str):
continue
yield _normalize_custom_pool_name(name), entry
def get_custom_provider_pool_key(base_url: str) -> Optional[str]:
"""Look up the custom_providers list in config.yaml and return 'custom:<name>' for a matching base_url.
Returns None if no match is found.
"""
if not base_url:
return None
normalized_url = base_url.strip().rstrip("/")
for norm_name, entry in _iter_custom_providers():
entry_url = str(entry.get("base_url") or "").strip().rstrip("/")
if entry_url and entry_url == normalized_url:
return f"{CUSTOM_POOL_PREFIX}{norm_name}"
return None
def list_custom_pool_providers() -> List[str]:
"""Return all 'custom:*' pool keys that have entries in auth.json."""
pool_data = read_credential_pool(None)
return sorted(
key for key in pool_data
if key.startswith(CUSTOM_POOL_PREFIX)
and isinstance(pool_data.get(key), list)
and pool_data[key]
)
def _get_custom_provider_config(pool_key: str) -> Optional[Dict[str, Any]]:
"""Return the custom_providers config entry matching a pool key like 'custom:together.ai'."""
if not pool_key.startswith(CUSTOM_POOL_PREFIX):
return None
suffix = pool_key[len(CUSTOM_POOL_PREFIX):]
for norm_name, entry in _iter_custom_providers():
if norm_name == suffix:
return entry
return None
def get_pool_strategy(provider: str) -> str:
"""Return the configured selection strategy for a provider."""
config = _load_config_safe()
if config is None:
return STRATEGY_FILL_FIRST
strategies = config.get("credential_pool_strategies")
if not isinstance(strategies, dict):
return STRATEGY_FILL_FIRST
strategy = str(strategies.get(provider, "") or "").strip().lower()
if strategy in SUPPORTED_POOL_STRATEGIES:
return strategy
return STRATEGY_FILL_FIRST
class CredentialPool:
def __init__(self, provider: str, entries: List[PooledCredential]):
self.provider = provider
self._entries = sorted(entries, key=lambda entry: entry.priority)
self._current_id: Optional[str] = None
self._strategy = get_pool_strategy(provider)
self._lock = threading.Lock()
def has_credentials(self) -> bool:
return bool(self._entries)
def entries(self) -> List[PooledCredential]:
return list(self._entries)
def current(self) -> Optional[PooledCredential]:
if not self._current_id:
return None
return next((entry for entry in self._entries if entry.id == self._current_id), None)
def _replace_entry(self, old: PooledCredential, new: PooledCredential) -> None:
"""Swap an entry in-place by id, preserving sort order."""
for idx, entry in enumerate(self._entries):
if entry.id == old.id:
self._entries[idx] = new
return
def _persist(self) -> None:
write_credential_pool(
self.provider,
[entry.to_dict() for entry in self._entries],
)
def _mark_exhausted(self, entry: PooledCredential, status_code: Optional[int]) -> PooledCredential:
updated = replace(
entry,
last_status=STATUS_EXHAUSTED,
last_status_at=time.time(),
last_error_code=status_code,
)
self._replace_entry(entry, updated)
self._persist()
return updated
def _refresh_entry(self, entry: PooledCredential, *, force: bool) -> Optional[PooledCredential]:
if entry.auth_type != AUTH_TYPE_OAUTH or not entry.refresh_token:
if force:
self._mark_exhausted(entry, None)
return None
try:
if self.provider == "anthropic":
from agent.anthropic_adapter import refresh_anthropic_oauth_pure
refreshed = refresh_anthropic_oauth_pure(
entry.refresh_token,
use_json=entry.source.endswith("hermes_pkce"),
)
updated = replace(
entry,
access_token=refreshed["access_token"],
refresh_token=refreshed["refresh_token"],
expires_at_ms=refreshed["expires_at_ms"],
)
elif self.provider == "openai-codex":
refreshed = auth_mod.refresh_codex_oauth_pure(
entry.access_token,
entry.refresh_token,
)
updated = replace(
entry,
access_token=refreshed["access_token"],
refresh_token=refreshed["refresh_token"],
last_refresh=refreshed.get("last_refresh"),
)
elif self.provider == "nous":
nous_state = {
"access_token": entry.access_token,
"refresh_token": entry.refresh_token,
"client_id": entry.client_id,
"portal_base_url": entry.portal_base_url,
"inference_base_url": entry.inference_base_url,
"token_type": entry.token_type,
"scope": entry.scope,
"obtained_at": entry.obtained_at,
"expires_at": entry.expires_at,
"agent_key": entry.agent_key,
"agent_key_expires_at": entry.agent_key_expires_at,
"tls": entry.tls,
}
refreshed = auth_mod.refresh_nous_oauth_from_state(
nous_state,
min_key_ttl_seconds=DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
force_refresh=force,
force_mint=force,
)
# Apply returned fields: dataclass fields via replace, extras via dict update
field_updates = {}
extra_updates = dict(entry.extra)
_field_names = {f.name for f in fields(entry)}
for k, v in refreshed.items():
if k in _field_names:
field_updates[k] = v
elif k in _EXTRA_KEYS:
extra_updates[k] = v
updated = replace(entry, extra=extra_updates, **field_updates)
else:
return entry
except Exception as exc:
logger.debug("Credential refresh failed for %s/%s: %s", self.provider, entry.id, exc)
self._mark_exhausted(entry, None)
return None
updated = replace(updated, last_status=STATUS_OK, last_status_at=None, last_error_code=None)
self._replace_entry(entry, updated)
self._persist()
return updated
def _entry_needs_refresh(self, entry: PooledCredential) -> bool:
if entry.auth_type != AUTH_TYPE_OAUTH:
return False
if self.provider == "anthropic":
if entry.expires_at_ms is None:
return False
return int(entry.expires_at_ms) <= int(time.time() * 1000) + 120_000
if self.provider == "openai-codex":
return _codex_access_token_is_expiring(
entry.access_token,
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
)
if self.provider == "nous":
# Nous refresh/mint can require network access and should happen when
# runtime credentials are actually resolved, not merely when the pool
# is enumerated for listing, migration, or selection.
return False
return False
def mark_used(self, entry_id: Optional[str] = None) -> None:
"""Increment request_count for tracking. Used by least_used strategy."""
target_id = entry_id or self._current_id
if not target_id:
return
with self._lock:
for idx, entry in enumerate(self._entries):
if entry.id == target_id:
self._entries[idx] = replace(entry, request_count=entry.request_count + 1)
return
def select(self) -> Optional[PooledCredential]:
with self._lock:
return self._select_unlocked()
def _available_entries(self, *, clear_expired: bool = False, refresh: bool = False) -> List[PooledCredential]:
"""Return entries not currently in exhaustion cooldown.
When *clear_expired* is True, entries whose cooldown has elapsed are
reset to STATUS_OK and persisted. When *refresh* is True, entries
that need a token refresh are refreshed (skipped on failure).
"""
now = time.time()
cleared_any = False
available: List[PooledCredential] = []
for entry in self._entries:
if entry.last_status == STATUS_EXHAUSTED:
ttl = _exhausted_ttl(entry.last_error_code)
if entry.last_status_at and now - entry.last_status_at < ttl:
continue
if clear_expired:
cleared = replace(entry, last_status=STATUS_OK, last_status_at=None, last_error_code=None)
self._replace_entry(entry, cleared)
entry = cleared
cleared_any = True
if refresh and self._entry_needs_refresh(entry):
refreshed = self._refresh_entry(entry, force=False)
if refreshed is None:
continue
entry = refreshed
available.append(entry)
if cleared_any:
self._persist()
return available
def _select_unlocked(self) -> Optional[PooledCredential]:
available = self._available_entries(clear_expired=True, refresh=True)
if not available:
self._current_id = None
return None
if self._strategy == STRATEGY_RANDOM:
entry = random.choice(available)
self._current_id = entry.id
return entry
if self._strategy == STRATEGY_LEAST_USED and len(available) > 1:
entry = min(available, key=lambda e: e.request_count)
self._current_id = entry.id
return entry
if self._strategy == STRATEGY_ROUND_ROBIN and len(available) > 1:
entry = available[0]
rotated = [candidate for candidate in self._entries if candidate.id != entry.id]
rotated.append(replace(entry, priority=len(self._entries) - 1))
self._entries = [replace(candidate, priority=idx) for idx, candidate in enumerate(rotated)]
self._persist()
self._current_id = entry.id
return self.current() or entry
entry = available[0]
self._current_id = entry.id
return entry
def peek(self) -> Optional[PooledCredential]:
current = self.current()
if current is not None:
return current
available = self._available_entries()
return available[0] if available else None
def mark_exhausted_and_rotate(self, *, status_code: Optional[int]) -> Optional[PooledCredential]:
with self._lock:
entry = self.current() or self._select_unlocked()
if entry is None:
return None
self._mark_exhausted(entry, status_code)
self._current_id = None
return self._select_unlocked()
def try_refresh_current(self) -> Optional[PooledCredential]:
with self._lock:
return self._try_refresh_current_unlocked()
def _try_refresh_current_unlocked(self) -> Optional[PooledCredential]:
entry = self.current()
if entry is None:
return None
refreshed = self._refresh_entry(entry, force=True)
if refreshed is not None:
self._current_id = refreshed.id
return refreshed
def reset_statuses(self) -> int:
count = 0
new_entries = []
for entry in self._entries:
if entry.last_status or entry.last_status_at or entry.last_error_code:
new_entries.append(replace(entry, last_status=None, last_status_at=None, last_error_code=None))
count += 1
else:
new_entries.append(entry)
if count:
self._entries = new_entries
self._persist()
return count
def remove_index(self, index: int) -> Optional[PooledCredential]:
if index < 1 or index > len(self._entries):
return None
removed = self._entries.pop(index - 1)
self._entries = [
replace(entry, priority=new_priority)
for new_priority, entry in enumerate(self._entries)
]
self._persist()
if self._current_id == removed.id:
self._current_id = None
return removed
def add_entry(self, entry: PooledCredential) -> PooledCredential:
entry = replace(entry, priority=_next_priority(self._entries))
self._entries.append(entry)
self._persist()
return entry
def _upsert_entry(entries: List[PooledCredential], provider: str, source: str, payload: Dict[str, Any]) -> bool:
existing_idx = None
for idx, entry in enumerate(entries):
if entry.source == source:
existing_idx = idx
break
if existing_idx is None:
payload.setdefault("id", uuid.uuid4().hex[:6])
payload.setdefault("priority", _next_priority(entries))
payload.setdefault("label", payload.get("label") or source)
entries.append(PooledCredential.from_dict(provider, payload))
return True
existing = entries[existing_idx]
field_updates = {}
extra_updates = {}
_field_names = {f.name for f in fields(existing)}
for key, value in payload.items():
if key in {"id", "priority"} or value is None:
continue
if key == "label" and existing.label:
continue
if key in _field_names:
if getattr(existing, key) != value:
field_updates[key] = value
elif key in _EXTRA_KEYS:
if existing.extra.get(key) != value:
extra_updates[key] = value
if field_updates or extra_updates:
if extra_updates:
field_updates["extra"] = {**existing.extra, **extra_updates}
entries[existing_idx] = replace(existing, **field_updates)
return True
return False
def _normalize_pool_priorities(provider: str, entries: List[PooledCredential]) -> bool:
if provider != "anthropic":
return False
source_rank = {
"env:ANTHROPIC_TOKEN": 0,
"env:CLAUDE_CODE_OAUTH_TOKEN": 1,
"hermes_pkce": 2,
"claude_code": 3,
"env:ANTHROPIC_API_KEY": 4,
}
manual_entries = sorted(
(entry for entry in entries if _is_manual_source(entry.source)),
key=lambda entry: entry.priority,
)
seeded_entries = sorted(
(entry for entry in entries if not _is_manual_source(entry.source)),
key=lambda entry: (
source_rank.get(entry.source, len(source_rank)),
entry.priority,
entry.label,
),
)
ordered = [*manual_entries, *seeded_entries]
id_to_idx = {entry.id: idx for idx, entry in enumerate(entries)}
changed = False
for new_priority, entry in enumerate(ordered):
if entry.priority != new_priority:
entries[id_to_idx[entry.id]] = replace(entry, priority=new_priority)
changed = True
return changed
def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tuple[bool, Set[str]]:
changed = False
active_sources: Set[str] = set()
auth_store = _load_auth_store()
if provider == "anthropic":
from agent.anthropic_adapter import read_claude_code_credentials, read_hermes_oauth_credentials
for source_name, creds in (
("hermes_pkce", read_hermes_oauth_credentials()),
("claude_code", read_claude_code_credentials()),
):
if creds and creds.get("accessToken"):
active_sources.add(source_name)
changed |= _upsert_entry(
entries,
provider,
source_name,
{
"source": source_name,
"auth_type": AUTH_TYPE_OAUTH,
"access_token": creds.get("accessToken", ""),
"refresh_token": creds.get("refreshToken"),
"expires_at_ms": creds.get("expiresAt"),
"label": label_from_token(creds.get("accessToken", ""), source_name),
},
)
elif provider == "nous":
state = _load_provider_state(auth_store, "nous")
if state:
active_sources.add("device_code")
changed |= _upsert_entry(
entries,
provider,
"device_code",
{
"source": "device_code",
"auth_type": AUTH_TYPE_OAUTH,
"access_token": state.get("access_token", ""),
"refresh_token": state.get("refresh_token"),
"expires_at": state.get("expires_at"),
"token_type": state.get("token_type"),
"scope": state.get("scope"),
"client_id": state.get("client_id"),
"portal_base_url": state.get("portal_base_url"),
"inference_base_url": state.get("inference_base_url"),
"agent_key": state.get("agent_key"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
"tls": state.get("tls") if isinstance(state.get("tls"), dict) else None,
"label": label_from_token(state.get("access_token", ""), "device_code"),
},
)
elif provider == "openai-codex":
state = _load_provider_state(auth_store, "openai-codex")
tokens = state.get("tokens") if isinstance(state, dict) else None
if isinstance(tokens, dict) and tokens.get("access_token"):
active_sources.add("device_code")
changed |= _upsert_entry(
entries,
provider,
"device_code",
{
"source": "device_code",
"auth_type": AUTH_TYPE_OAUTH,
"access_token": tokens.get("access_token", ""),
"refresh_token": tokens.get("refresh_token"),
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": state.get("last_refresh"),
"label": label_from_token(tokens.get("access_token", ""), "device_code"),
},
)
return changed, active_sources
def _seed_from_env(provider: str, entries: List[PooledCredential]) -> Tuple[bool, Set[str]]:
changed = False
active_sources: Set[str] = set()
if provider == "openrouter":
token = os.getenv("OPENROUTER_API_KEY", "").strip()
if token:
source = "env:OPENROUTER_API_KEY"
active_sources.add(source)
changed |= _upsert_entry(
entries,
provider,
source,
{
"source": source,
"auth_type": AUTH_TYPE_API_KEY,
"access_token": token,
"base_url": OPENROUTER_BASE_URL,
"label": "OPENROUTER_API_KEY",
},
)
return changed, active_sources
pconfig = PROVIDER_REGISTRY.get(provider)
if not pconfig or pconfig.auth_type != AUTH_TYPE_API_KEY:
return changed, active_sources
env_url = ""
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
env_vars = list(pconfig.api_key_env_vars)
if provider == "anthropic":
env_vars = [
"ANTHROPIC_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN",
"ANTHROPIC_API_KEY",
]
for env_var in env_vars:
token = os.getenv(env_var, "").strip()
if not token:
continue
source = f"env:{env_var}"
active_sources.add(source)
auth_type = AUTH_TYPE_OAUTH if provider == "anthropic" and not token.startswith("sk-ant-api") else AUTH_TYPE_API_KEY
base_url = env_url or pconfig.inference_base_url
changed |= _upsert_entry(
entries,
provider,
source,
{
"source": source,
"auth_type": auth_type,
"access_token": token,
"base_url": base_url,
"label": env_var,
},
)
return changed, active_sources
def _prune_stale_seeded_entries(entries: List[PooledCredential], active_sources: Set[str]) -> bool:
retained = [
entry
for entry in entries
if _is_manual_source(entry.source)
or entry.source in active_sources
or not (
entry.source.startswith("env:")
or entry.source in {"claude_code", "hermes_pkce"}
)
]
if len(retained) == len(entries):
return False
entries[:] = retained
return True
def _seed_custom_pool(pool_key: str, entries: List[PooledCredential]) -> Tuple[bool, Set[str]]:
"""Seed a custom endpoint pool from custom_providers config and model config."""
changed = False
active_sources: Set[str] = set()
# Seed from the custom_providers config entry's api_key field
cp_config = _get_custom_provider_config(pool_key)
if cp_config:
api_key = str(cp_config.get("api_key") or "").strip()
base_url = str(cp_config.get("base_url") or "").strip().rstrip("/")
name = str(cp_config.get("name") or "").strip()
if api_key:
source = f"config:{name}"
active_sources.add(source)
changed |= _upsert_entry(
entries,
pool_key,
source,
{
"source": source,
"auth_type": AUTH_TYPE_API_KEY,
"access_token": api_key,
"base_url": base_url,
"label": name or source,
},
)
# Seed from model.api_key if model.provider=='custom' and model.base_url matches
try:
config = _load_config_safe()
model_cfg = config.get("model") if config else None
if isinstance(model_cfg, dict):
model_provider = str(model_cfg.get("provider") or "").strip().lower()
model_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
model_api_key = ""
for k in ("api_key", "api"):
v = model_cfg.get(k)
if isinstance(v, str) and v.strip():
model_api_key = v.strip()
break
if model_provider == "custom" and model_base_url and model_api_key:
# Check if this model's base_url matches our custom provider
matched_key = get_custom_provider_pool_key(model_base_url)
if matched_key == pool_key:
source = "model_config"
active_sources.add(source)
changed |= _upsert_entry(
entries,
pool_key,
source,
{
"source": source,
"auth_type": AUTH_TYPE_API_KEY,
"access_token": model_api_key,
"base_url": model_base_url,
"label": "model_config",
},
)
except Exception:
pass
return changed, active_sources
def load_pool(provider: str) -> CredentialPool:
provider = (provider or "").strip().lower()
raw_entries = read_credential_pool(provider)
entries = [PooledCredential.from_dict(provider, payload) for payload in raw_entries]
if provider.startswith(CUSTOM_POOL_PREFIX):
# Custom endpoint pool — seed from custom_providers config and model config
custom_changed, custom_sources = _seed_custom_pool(provider, entries)
changed = custom_changed
changed |= _prune_stale_seeded_entries(entries, custom_sources)
else:
singleton_changed, singleton_sources = _seed_from_singletons(provider, entries)
env_changed, env_sources = _seed_from_env(provider, entries)
changed = singleton_changed or env_changed
changed |= _prune_stale_seeded_entries(entries, singleton_sources | env_sources)
changed |= _normalize_pool_priorities(provider, entries)
if changed:
write_credential_pool(
provider,
[entry.to_dict() for entry in sorted(entries, key=lambda item: item.priority)],
)
return CredentialPool(provider, entries)

4
cli.py
View File

@@ -1955,6 +1955,7 @@ class HermesCLI:
resolved_api_mode = runtime.get("api_mode", self.api_mode)
resolved_acp_command = runtime.get("command")
resolved_acp_args = list(runtime.get("args") or [])
resolved_credential_pool = runtime.get("credential_pool")
if not isinstance(api_key, str) or not api_key:
# Custom / local endpoints (llama.cpp, ollama, vLLM, etc.) often
# don't require authentication. When a base_url IS configured but
@@ -1987,6 +1988,7 @@ class HermesCLI:
self.api_mode = resolved_api_mode
self.acp_command = resolved_acp_command
self.acp_args = resolved_acp_args
self._credential_pool = resolved_credential_pool
self._provider_source = runtime.get("source")
self.api_key = api_key
self.base_url = base_url
@@ -2088,6 +2090,7 @@ class HermesCLI:
"api_mode": self.api_mode,
"command": self.acp_command,
"args": list(self.acp_args or []),
"credential_pool": getattr(self, "_credential_pool", None),
}
effective_model = model_override or self.model
self.agent = AIAgent(
@@ -2098,6 +2101,7 @@ class HermesCLI:
api_mode=runtime.get("api_mode"),
acp_command=runtime.get("command"),
acp_args=runtime.get("args"),
credential_pool=runtime.get("credential_pool"),
max_iterations=self.max_turns,
enabled_toolsets=self.enabled_toolsets,
verbose_logging=self.verbose,

View File

@@ -298,6 +298,7 @@ def _resolve_runtime_agent_kwargs() -> dict:
"api_mode": runtime.get("api_mode"),
"command": runtime.get("command"),
"args": list(runtime.get("args") or []),
"credential_pool": runtime.get("credential_pool"),
}

View File

@@ -545,7 +545,11 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:
except Exception:
return {"version": AUTH_STORE_VERSION, "providers": {}}
if isinstance(raw, dict) and isinstance(raw.get("providers"), dict):
if isinstance(raw, dict) and (
isinstance(raw.get("providers"), dict)
or isinstance(raw.get("credential_pool"), dict)
):
raw.setdefault("providers", {})
return raw
# Migrate from PR's "systems" format if present
@@ -613,6 +617,30 @@ def _save_provider_state(auth_store: Dict[str, Any], provider_id: str, state: Di
auth_store["active_provider"] = provider_id
def read_credential_pool(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Return the persisted credential pool, or one provider slice."""
auth_store = _load_auth_store()
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
pool = {}
if provider_id is None:
return dict(pool)
provider_entries = pool.get(provider_id)
return list(provider_entries) if isinstance(provider_entries, list) else []
def write_credential_pool(provider_id: str, entries: List[Dict[str, Any]]) -> Path:
"""Persist one provider's credential pool under auth.json."""
with _auth_store_lock():
auth_store = _load_auth_store()
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
pool = {}
auth_store["credential_pool"] = pool
pool[provider_id] = list(entries)
return _save_auth_store(auth_store)
def get_provider_auth_state(provider_id: str) -> Optional[Dict[str, Any]]:
"""Return persisted auth state for a provider, or None."""
auth_store = _load_auth_store()
@@ -638,10 +666,25 @@ def clear_provider_auth(provider_id: Optional[str] = None) -> bool:
return False
providers = auth_store.get("providers", {})
if target not in providers:
return False
if not isinstance(providers, dict):
providers = {}
auth_store["providers"] = providers
del providers[target]
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
pool = {}
auth_store["credential_pool"] = pool
cleared = False
if target in providers:
del providers[target]
cleared = True
if target in pool:
del pool[target]
cleared = True
if not cleared:
return False
if auth_store.get("active_provider") == target:
auth_store["active_provider"] = None
_save_auth_store(auth_store)
@@ -898,15 +941,14 @@ def _save_codex_tokens(tokens: Dict[str, str], last_refresh: str = None) -> None
_save_auth_store(auth_store)
def _refresh_codex_auth_tokens(
tokens: Dict[str, str],
timeout_seconds: float,
) -> Dict[str, str]:
"""Refresh Codex access token using the refresh token.
Saves the new tokens to Hermes auth store automatically.
"""
refresh_token = tokens.get("refresh_token")
def refresh_codex_oauth_pure(
access_token: str,
refresh_token: str,
*,
timeout_seconds: float = 20.0,
) -> Dict[str, Any]:
"""Refresh Codex OAuth tokens without mutating Hermes auth state."""
del access_token # Access token is only used by callers to decide whether to refresh.
if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError(
"Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.",
@@ -961,8 +1003,8 @@ def _refresh_codex_auth_tokens(
relogin_required=True,
) from exc
access_token = refresh_payload.get("access_token")
if not isinstance(access_token, str) or not access_token.strip():
refreshed_access = refresh_payload.get("access_token")
if not isinstance(refreshed_access, str) or not refreshed_access.strip():
raise AuthError(
"Codex token refresh response was missing access_token.",
provider="openai-codex",
@@ -970,11 +1012,33 @@ def _refresh_codex_auth_tokens(
relogin_required=True,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = access_token.strip()
updated = {
"access_token": refreshed_access.strip(),
"refresh_token": refresh_token.strip(),
"last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
next_refresh = refresh_payload.get("refresh_token")
if isinstance(next_refresh, str) and next_refresh.strip():
updated_tokens["refresh_token"] = next_refresh.strip()
updated["refresh_token"] = next_refresh.strip()
return updated
def _refresh_codex_auth_tokens(
tokens: Dict[str, str],
timeout_seconds: float,
) -> Dict[str, str]:
"""Refresh Codex access token using the refresh token.
Saves the new tokens to Hermes auth store automatically.
"""
refreshed = refresh_codex_oauth_pure(
str(tokens.get("access_token", "") or ""),
str(tokens.get("refresh_token", "") or ""),
timeout_seconds=timeout_seconds,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = refreshed["access_token"]
updated_tokens["refresh_token"] = refreshed["refresh_token"]
_save_codex_tokens(updated_tokens)
return updated_tokens
@@ -1313,6 +1377,122 @@ def _agent_key_is_usable(state: Dict[str, Any], min_ttl_seconds: int) -> bool:
return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds)
def refresh_nous_oauth_pure(
access_token: str,
refresh_token: str,
client_id: str,
portal_base_url: str,
inference_base_url: str,
*,
token_type: str = "Bearer",
scope: str = DEFAULT_NOUS_SCOPE,
obtained_at: Optional[str] = None,
expires_at: Optional[str] = None,
agent_key: Optional[str] = None,
agent_key_expires_at: Optional[str] = None,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
force_refresh: bool = False,
force_mint: bool = False,
) -> Dict[str, Any]:
"""Refresh Nous OAuth state without mutating auth.json."""
state: Dict[str, Any] = {
"access_token": access_token,
"refresh_token": refresh_token,
"client_id": client_id or DEFAULT_NOUS_CLIENT_ID,
"portal_base_url": (portal_base_url or DEFAULT_NOUS_PORTAL_URL).rstrip("/"),
"inference_base_url": (inference_base_url or DEFAULT_NOUS_INFERENCE_URL).rstrip("/"),
"token_type": token_type or "Bearer",
"scope": scope or DEFAULT_NOUS_SCOPE,
"obtained_at": obtained_at,
"expires_at": expires_at,
"agent_key": agent_key,
"agent_key_expires_at": agent_key_expires_at,
"tls": {
"insecure": bool(insecure),
"ca_bundle": ca_bundle,
},
}
verify = _resolve_verify(insecure=insecure, ca_bundle=ca_bundle, auth_state=state)
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
if force_refresh or _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
refreshed = _refresh_access_token(
client=client,
portal_base_url=state["portal_base_url"],
client_id=state["client_id"],
refresh_token=state["refresh_token"],
)
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or state["refresh_token"]
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_url = _optional_base_url(refreshed.get("inference_base_url"))
if refreshed_url:
state["inference_base_url"] = refreshed_url
state["obtained_at"] = now.isoformat()
state["expires_in"] = access_ttl
state["expires_at"] = datetime.fromtimestamp(
now.timestamp() + access_ttl, tz=timezone.utc
).isoformat()
if force_mint or not _agent_key_is_usable(state, max(60, int(min_key_ttl_seconds))):
mint_payload = _mint_agent_key(
client=client,
portal_base_url=state["portal_base_url"],
access_token=state["access_token"],
min_ttl_seconds=min_key_ttl_seconds,
)
now = datetime.now(timezone.utc)
state["agent_key"] = mint_payload.get("api_key")
state["agent_key_id"] = mint_payload.get("key_id")
state["agent_key_expires_at"] = mint_payload.get("expires_at")
state["agent_key_expires_in"] = mint_payload.get("expires_in")
state["agent_key_reused"] = bool(mint_payload.get("reused", False))
state["agent_key_obtained_at"] = now.isoformat()
minted_url = _optional_base_url(mint_payload.get("inference_base_url"))
if minted_url:
state["inference_base_url"] = minted_url
return state
def refresh_nous_oauth_from_state(
state: Dict[str, Any],
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
force_refresh: bool = False,
force_mint: bool = False,
) -> Dict[str, Any]:
"""Refresh Nous OAuth from a state dict. Thin wrapper around refresh_nous_oauth_pure."""
tls = state.get("tls") or {}
return refresh_nous_oauth_pure(
state.get("access_token", ""),
state.get("refresh_token", ""),
state.get("client_id", "hermes-cli"),
state.get("portal_base_url", DEFAULT_NOUS_PORTAL_URL),
state.get("inference_base_url", DEFAULT_NOUS_INFERENCE_URL),
token_type=state.get("token_type", "Bearer"),
scope=state.get("scope", DEFAULT_NOUS_SCOPE),
obtained_at=state.get("obtained_at"),
expires_at=state.get("expires_at"),
agent_key=state.get("agent_key"),
agent_key_expires_at=state.get("agent_key_expires_at"),
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
insecure=tls.get("insecure"),
ca_bundle=tls.get("ca_bundle"),
force_refresh=force_refresh,
force_mint=force_mint,
)
def resolve_nous_runtime_credentials(
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
@@ -2180,34 +2360,36 @@ def _codex_device_code_login() -> Dict[str, Any]:
}
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
def _nous_device_code_login(
*,
portal_base_url: Optional[str] = None,
inference_base_url: Optional[str] = None,
client_id: Optional[str] = None,
scope: Optional[str] = None,
open_browser: bool = True,
timeout_seconds: float = 15.0,
insecure: bool = False,
ca_bundle: Optional[str] = None,
min_key_ttl_seconds: int = 5 * 60,
) -> Dict[str, Any]:
"""Run the Nous device-code flow and return full OAuth state without persisting."""
pconfig = PROVIDER_REGISTRY["nous"]
portal_base_url = (
getattr(args, "portal_url", None)
portal_base_url
or os.getenv("HERMES_PORTAL_BASE_URL")
or os.getenv("NOUS_PORTAL_BASE_URL")
or pconfig.portal_base_url
).rstrip("/")
requested_inference_url = (
getattr(args, "inference_url", None)
inference_base_url
or os.getenv("NOUS_INFERENCE_BASE_URL")
or pconfig.inference_base_url
).rstrip("/")
client_id = getattr(args, "client_id", None) or pconfig.client_id
scope = getattr(args, "scope", None) or pconfig.scope
open_browser = not getattr(args, "no_browser", False)
timeout_seconds = getattr(args, "timeout", None) or 15.0
client_id = client_id or pconfig.client_id
scope = scope or pconfig.scope
timeout = httpx.Timeout(timeout_seconds)
insecure = bool(getattr(args, "insecure", False))
ca_bundle = (
getattr(args, "ca_bundle", None)
or os.getenv("HERMES_CA_BUNDLE")
or os.getenv("SSL_CERT_FILE")
)
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
# Skip browser open in SSH sessions
if _is_remote_session():
open_browser = False
@@ -2218,74 +2400,109 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
elif ca_bundle:
print(f"TLS verification: custom CA bundle ({ca_bundle})")
try:
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
device_data = _request_device_code(
client=client, portal_base_url=portal_base_url,
client_id=client_id, scope=scope,
)
verification_url = str(device_data["verification_uri_complete"])
user_code = str(device_data["user_code"])
expires_in = int(device_data["expires_in"])
interval = int(device_data["interval"])
print()
print("To continue:")
print(f" 1. Open: {verification_url}")
print(f" 2. If prompted, enter code: {user_code}")
if open_browser:
opened = webbrowser.open(verification_url)
if opened:
print(" (Opened browser for verification)")
else:
print(" Could not open browser automatically — use the URL above.")
effective_interval = max(1, min(interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS))
print(f"Waiting for approval (polling every {effective_interval}s)...")
token_data = _poll_for_token(
client=client, portal_base_url=portal_base_url,
client_id=client_id, device_code=str(device_data["device_code"]),
expires_in=expires_in, poll_interval=interval,
)
# Process token response
now = datetime.now(timezone.utc)
token_expires_in = _coerce_ttl_seconds(token_data.get("expires_in", 0))
expires_at = now.timestamp() + token_expires_in
inference_base_url = (
_optional_base_url(token_data.get("inference_base_url"))
or requested_inference_url
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
if inference_base_url != requested_inference_url:
print(f"Using portal-provided inference URL: {inference_base_url}")
auth_state = {
"portal_base_url": portal_base_url,
"inference_base_url": inference_base_url,
"client_id": client_id,
"scope": token_data.get("scope") or scope,
"token_type": token_data.get("token_type", "Bearer"),
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"obtained_at": now.isoformat(),
"expires_at": datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(),
"expires_in": token_expires_in,
"tls": {
"insecure": verify is False,
"ca_bundle": verify if isinstance(verify, str) else None,
},
"agent_key": None,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
}
verification_url = str(device_data["verification_uri_complete"])
user_code = str(device_data["user_code"])
expires_in = int(device_data["expires_in"])
interval = int(device_data["interval"])
print()
print("To continue:")
print(f" 1. Open: {verification_url}")
print(f" 2. If prompted, enter code: {user_code}")
if open_browser:
opened = webbrowser.open(verification_url)
if opened:
print(" (Opened browser for verification)")
else:
print(" Could not open browser automatically — use the URL above.")
effective_interval = max(1, min(interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS))
print(f"Waiting for approval (polling every {effective_interval}s)...")
token_data = _poll_for_token(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
device_code=str(device_data["device_code"]),
expires_in=expires_in,
poll_interval=interval,
)
now = datetime.now(timezone.utc)
token_expires_in = _coerce_ttl_seconds(token_data.get("expires_in", 0))
expires_at = now.timestamp() + token_expires_in
resolved_inference_url = (
_optional_base_url(token_data.get("inference_base_url"))
or requested_inference_url
)
if resolved_inference_url != requested_inference_url:
print(f"Using portal-provided inference URL: {resolved_inference_url}")
auth_state = {
"portal_base_url": portal_base_url,
"inference_base_url": resolved_inference_url,
"client_id": client_id,
"scope": token_data.get("scope") or scope,
"token_type": token_data.get("token_type", "Bearer"),
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"obtained_at": now.isoformat(),
"expires_at": datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(),
"expires_in": token_expires_in,
"tls": {
"insecure": verify is False,
"ca_bundle": verify if isinstance(verify, str) else None,
},
"agent_key": None,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
}
return refresh_nous_oauth_from_state(
auth_state,
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
force_refresh=False,
force_mint=True,
)
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
timeout_seconds = getattr(args, "timeout", None) or 15.0
insecure = bool(getattr(args, "insecure", False))
ca_bundle = (
getattr(args, "ca_bundle", None)
or os.getenv("HERMES_CA_BUNDLE")
or os.getenv("SSL_CERT_FILE")
)
try:
auth_state = _nous_device_code_login(
portal_base_url=getattr(args, "portal_url", None) or pconfig.portal_base_url,
inference_base_url=getattr(args, "inference_url", None) or pconfig.inference_base_url,
client_id=getattr(args, "client_id", None) or pconfig.client_id,
scope=getattr(args, "scope", None) or pconfig.scope,
open_browser=not getattr(args, "no_browser", False),
timeout_seconds=timeout_seconds,
insecure=insecure,
ca_bundle=ca_bundle,
min_key_ttl_seconds=5 * 60,
)
inference_base_url = auth_state["inference_base_url"]
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
# Save auth state
with _auth_store_lock():
auth_store = _load_auth_store()
_save_provider_state(auth_store, "nous", auth_state)
@@ -2297,18 +2514,14 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
print(f" Auth state: {saved_to}")
print(f" Config updated: {config_path} (model.provider=nous)")
# Mint an initial agent key and list available models
try:
runtime_creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=5 * 60,
timeout_seconds=timeout_seconds,
insecure=insecure, ca_bundle=ca_bundle,
)
runtime_key = runtime_creds.get("api_key")
runtime_base_url = runtime_creds.get("base_url") or inference_base_url
runtime_key = auth_state.get("agent_key") or auth_state.get("access_token")
if not isinstance(runtime_key, str) or not runtime_key:
raise AuthError("No runtime API key available to fetch models",
provider="nous", code="invalid_token")
raise AuthError(
"No runtime API key available to fetch models",
provider="nous",
code="invalid_token",
)
# Use curated model list (same as OpenRouter defaults) instead
# of the full /models dump which returns hundreds of models.

470
hermes_cli/auth_commands.py Normal file
View File

@@ -0,0 +1,470 @@
"""Credential-pool auth subcommands."""
from __future__ import annotations
from getpass import getpass
import math
import time
from types import SimpleNamespace
import uuid
from agent.credential_pool import (
AUTH_TYPE_API_KEY,
AUTH_TYPE_OAUTH,
CUSTOM_POOL_PREFIX,
SOURCE_MANUAL,
STATUS_EXHAUSTED,
STRATEGY_FILL_FIRST,
STRATEGY_ROUND_ROBIN,
STRATEGY_RANDOM,
STRATEGY_LEAST_USED,
SUPPORTED_POOL_STRATEGIES,
PooledCredential,
_normalize_custom_pool_name,
get_pool_strategy,
label_from_token,
list_custom_pool_providers,
load_pool,
_exhausted_ttl,
)
import hermes_cli.auth as auth_mod
from hermes_cli.auth import PROVIDER_REGISTRY
from hermes_constants import OPENROUTER_BASE_URL
# Providers that support OAuth login in addition to API keys.
_OAUTH_CAPABLE_PROVIDERS = {"anthropic", "nous", "openai-codex"}
def _get_custom_provider_names() -> list:
"""Return list of (display_name, pool_key) tuples for custom_providers in config."""
try:
from hermes_cli.config import load_config
config = load_config()
except Exception:
return []
custom_providers = config.get("custom_providers")
if not isinstance(custom_providers, list):
return []
result = []
for entry in custom_providers:
if not isinstance(entry, dict):
continue
name = entry.get("name")
if not isinstance(name, str) or not name.strip():
continue
pool_key = f"{CUSTOM_POOL_PREFIX}{_normalize_custom_pool_name(name)}"
result.append((name.strip(), pool_key))
return result
def _resolve_custom_provider_input(raw: str) -> str | None:
"""If raw input matches a custom_providers entry name (case-insensitive), return its pool key."""
normalized = (raw or "").strip().lower().replace(" ", "-")
if not normalized:
return None
# Direct match on 'custom:name' format
if normalized.startswith(CUSTOM_POOL_PREFIX):
return normalized
for display_name, pool_key in _get_custom_provider_names():
if _normalize_custom_pool_name(display_name) == normalized:
return pool_key
return None
def _normalize_provider(provider: str) -> str:
normalized = (provider or "").strip().lower()
if normalized in {"or", "open-router"}:
return "openrouter"
# Check if it matches a custom provider name
custom_key = _resolve_custom_provider_input(normalized)
if custom_key:
return custom_key
return normalized
def _provider_base_url(provider: str) -> str:
if provider == "openrouter":
return OPENROUTER_BASE_URL
if provider.startswith(CUSTOM_POOL_PREFIX):
from agent.credential_pool import _get_custom_provider_config
cp_config = _get_custom_provider_config(provider)
if cp_config:
return str(cp_config.get("base_url") or "").strip()
return ""
pconfig = PROVIDER_REGISTRY.get(provider)
return pconfig.inference_base_url if pconfig else ""
def _oauth_default_label(provider: str, count: int) -> str:
return f"{provider}-oauth-{count}"
def _api_key_default_label(count: int) -> str:
return f"api-key-{count}"
def _display_source(source: str) -> str:
return source.split(":", 1)[1] if source.startswith("manual:") else source
def _format_exhausted_status(entry) -> str:
if entry.last_status != STATUS_EXHAUSTED:
return ""
code = f" ({entry.last_error_code})" if entry.last_error_code else ""
if not entry.last_status_at:
return f" exhausted{code}"
remaining = max(0, int(math.ceil((entry.last_status_at + _exhausted_ttl(entry.last_error_code)) - time.time())))
if remaining <= 0:
return f" exhausted{code} (ready to retry)"
minutes, seconds = divmod(remaining, 60)
hours, minutes = divmod(minutes, 60)
if hours:
wait = f"{hours}h {minutes}m"
elif minutes:
wait = f"{minutes}m {seconds}s"
else:
wait = f"{seconds}s"
return f" exhausted{code} ({wait} left)"
def auth_add_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
if provider not in PROVIDER_REGISTRY and provider != "openrouter" and not provider.startswith(CUSTOM_POOL_PREFIX):
raise SystemExit(f"Unknown provider: {provider}")
requested_type = str(getattr(args, "auth_type", "") or "").strip().lower()
if requested_type in {AUTH_TYPE_API_KEY, "api-key"}:
requested_type = AUTH_TYPE_API_KEY
if not requested_type:
if provider.startswith(CUSTOM_POOL_PREFIX):
requested_type = AUTH_TYPE_API_KEY
else:
requested_type = AUTH_TYPE_OAUTH if provider in {"anthropic", "nous", "openai-codex"} else AUTH_TYPE_API_KEY
pool = load_pool(provider)
if requested_type == AUTH_TYPE_API_KEY:
token = (getattr(args, "api_key", None) or "").strip()
if not token:
token = getpass("Paste your API key: ").strip()
if not token:
raise SystemExit("No API key provided.")
default_label = _api_key_default_label(len(pool.entries()) + 1)
label = (getattr(args, "label", None) or "").strip()
if not label:
label = input(f"Label (optional, default: {default_label}): ").strip() or default_label
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_API_KEY,
priority=0,
source=SOURCE_MANUAL,
access_token=token,
base_url=_provider_base_url(provider),
)
pool.add_entry(entry)
print(f'Added {provider} credential #{len(pool.entries())}: "{label}"')
return
if provider == "anthropic":
from agent import anthropic_adapter as anthropic_mod
creds = anthropic_mod.run_hermes_oauth_login_pure()
if not creds:
raise SystemExit("Anthropic OAuth login did not return credentials.")
label = (getattr(args, "label", None) or "").strip() or label_from_token(
creds["access_token"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:hermes_pkce",
access_token=creds["access_token"],
refresh_token=creds.get("refresh_token"),
expires_at_ms=creds.get("expires_at_ms"),
base_url=_provider_base_url(provider),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "nous":
creds = auth_mod._nous_device_code_login(
portal_base_url=getattr(args, "portal_url", None),
inference_base_url=getattr(args, "inference_url", None),
client_id=getattr(args, "client_id", None),
scope=getattr(args, "scope", None),
open_browser=not getattr(args, "no_browser", False),
timeout_seconds=getattr(args, "timeout", None) or 15.0,
insecure=bool(getattr(args, "insecure", False)),
ca_bundle=getattr(args, "ca_bundle", None),
min_key_ttl_seconds=max(60, int(getattr(args, "min_key_ttl_seconds", 5 * 60))),
)
label = (getattr(args, "label", None) or "").strip() or label_from_token(
creds.get("access_token", ""),
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential.from_dict(provider, {
**creds,
"label": label,
"auth_type": AUTH_TYPE_OAUTH,
"source": f"{SOURCE_MANUAL}:device_code",
"base_url": creds.get("inference_base_url"),
})
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "openai-codex":
creds = auth_mod._codex_device_code_login()
label = (getattr(args, "label", None) or "").strip() or label_from_token(
creds["tokens"]["access_token"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type=AUTH_TYPE_OAUTH,
priority=0,
source=f"{SOURCE_MANUAL}:device_code",
access_token=creds["tokens"]["access_token"],
refresh_token=creds["tokens"].get("refresh_token"),
base_url=creds.get("base_url"),
last_refresh=creds.get("last_refresh"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
raise SystemExit(f"`hermes auth add {provider}` is not implemented for auth type {requested_type} yet.")
def auth_list_command(args) -> None:
provider_filter = _normalize_provider(getattr(args, "provider", "") or "")
if provider_filter:
providers = [provider_filter]
else:
providers = sorted({
*PROVIDER_REGISTRY.keys(),
"openrouter",
*list_custom_pool_providers(),
})
for provider in providers:
pool = load_pool(provider)
entries = pool.entries()
if not entries:
continue
current = pool.peek()
print(f"{provider} ({len(entries)} credentials):")
for idx, entry in enumerate(entries, start=1):
marker = " "
if current is not None and entry.id == current.id:
marker = ""
status = _format_exhausted_status(entry)
source = _display_source(entry.source)
print(f" #{idx} {entry.label:<20} {entry.auth_type:<7} {source}{status} {marker}".rstrip())
print()
def auth_remove_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
index = int(getattr(args, "index"))
pool = load_pool(provider)
removed = pool.remove_index(index)
if removed is None:
raise SystemExit(f"No credential #{index} for provider {provider}.")
print(f"Removed {provider} credential #{index} ({removed.label})")
def auth_reset_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
pool = load_pool(provider)
count = pool.reset_statuses()
print(f"Reset status on {count} {provider} credentials")
def _interactive_auth() -> None:
"""Interactive credential pool management when `hermes auth` is called bare."""
# Show current pool status first
print("Credential Pool Status")
print("=" * 50)
auth_list_command(SimpleNamespace(provider=None))
print()
# Main menu
choices = [
"Add a credential",
"Remove a credential",
"Reset cooldowns for a provider",
"Set rotation strategy for a provider",
"Exit",
]
print("What would you like to do?")
for i, choice in enumerate(choices, 1):
print(f" {i}. {choice}")
try:
raw = input("\nChoice: ").strip()
except (EOFError, KeyboardInterrupt):
return
if not raw or raw == str(len(choices)):
return
if raw == "1":
_interactive_add()
elif raw == "2":
_interactive_remove()
elif raw == "3":
_interactive_reset()
elif raw == "4":
_interactive_strategy()
def _pick_provider(prompt: str = "Provider") -> str:
"""Prompt for a provider name with auto-complete hints."""
known = sorted(set(list(PROVIDER_REGISTRY.keys()) + ["openrouter"]))
custom_names = _get_custom_provider_names()
if custom_names:
custom_display = [name for name, _key in custom_names]
print(f"\nKnown providers: {', '.join(known)}")
print(f"Custom endpoints: {', '.join(custom_display)}")
else:
print(f"\nKnown providers: {', '.join(known)}")
try:
raw = input(f"{prompt}: ").strip()
except (EOFError, KeyboardInterrupt):
raise SystemExit()
return _normalize_provider(raw)
def _interactive_add() -> None:
provider = _pick_provider("Provider to add credential for")
if provider not in PROVIDER_REGISTRY and provider != "openrouter" and not provider.startswith(CUSTOM_POOL_PREFIX):
raise SystemExit(f"Unknown provider: {provider}")
# For OAuth-capable providers, ask which type
if provider in _OAUTH_CAPABLE_PROVIDERS:
print(f"\n{provider} supports both API keys and OAuth login.")
print(" 1. API key (paste a key from the provider dashboard)")
print(" 2. OAuth login (authenticate via browser)")
try:
type_choice = input("Type [1/2]: ").strip()
except (EOFError, KeyboardInterrupt):
return
if type_choice == "2":
auth_type = "oauth"
else:
auth_type = "api_key"
else:
auth_type = "api_key"
auth_add_command(SimpleNamespace(
provider=provider, auth_type=auth_type, label=None, api_key=None,
portal_url=None, inference_url=None, client_id=None, scope=None,
no_browser=False, timeout=None, insecure=False, ca_bundle=None,
))
def _interactive_remove() -> None:
provider = _pick_provider("Provider to remove credential from")
pool = load_pool(provider)
if not pool.has_credentials():
print(f"No credentials for {provider}.")
return
# Show entries with indices
for i, e in enumerate(pool.entries(), 1):
exhausted = _format_exhausted_status(e)
print(f" #{i} {e.label:25s} {e.auth_type:10s} {e.source}{exhausted}")
try:
raw = input("Remove # (or blank to cancel): ").strip()
except (EOFError, KeyboardInterrupt):
return
if not raw:
return
try:
index = int(raw)
except ValueError:
print("Invalid number.")
return
auth_remove_command(SimpleNamespace(provider=provider, index=index))
def _interactive_reset() -> None:
provider = _pick_provider("Provider to reset cooldowns for")
auth_reset_command(SimpleNamespace(provider=provider))
def _interactive_strategy() -> None:
provider = _pick_provider("Provider to set strategy for")
current = get_pool_strategy(provider)
strategies = [STRATEGY_FILL_FIRST, STRATEGY_ROUND_ROBIN, STRATEGY_LEAST_USED, STRATEGY_RANDOM]
print(f"\nCurrent strategy for {provider}: {current}")
print()
descriptions = {
STRATEGY_FILL_FIRST: "Use first key until exhausted, then next",
STRATEGY_ROUND_ROBIN: "Cycle through keys evenly",
STRATEGY_LEAST_USED: "Always pick the least-used key",
STRATEGY_RANDOM: "Random selection",
}
for i, s in enumerate(strategies, 1):
marker = "" if s == current else ""
print(f" {i}. {s:15s}{descriptions.get(s, '')}{marker}")
try:
raw = input("\nStrategy [1-4]: ").strip()
except (EOFError, KeyboardInterrupt):
return
if not raw:
return
try:
idx = int(raw) - 1
strategy = strategies[idx]
except (ValueError, IndexError):
print("Invalid choice.")
return
from hermes_cli.config import load_config, save_config
cfg = load_config()
pool_strategies = cfg.get("credential_pool_strategies") or {}
if not isinstance(pool_strategies, dict):
pool_strategies = {}
pool_strategies[provider] = strategy
cfg["credential_pool_strategies"] = pool_strategies
save_config(cfg)
print(f"Set {provider} strategy to: {strategy}")
def auth_command(args) -> None:
action = getattr(args, "auth_action", "")
if action == "add":
auth_add_command(args)
return
if action == "list":
auth_list_command(args)
return
if action == "remove":
auth_remove_command(args)
return
if action == "reset":
auth_reset_command(args)
return
# No subcommand — launch interactive mode
_interactive_auth()

View File

@@ -198,6 +198,7 @@ def ensure_hermes_home():
DEFAULT_CONFIG = {
"model": "anthropic/claude-opus-4.6",
"fallback_providers": [],
"credential_pool_strategies": {},
"toolsets": ["hermes-cli"],
"agent": {
"max_turns": 90,
@@ -502,7 +503,7 @@ DEFAULT_CONFIG = {
},
# Config schema version - bump this when adding new required fields
"_config_version": 10,
"_config_version": 11,
}
# =============================================================================

View File

@@ -2413,6 +2413,12 @@ def cmd_logout(args):
logout_command(args)
def cmd_auth(args):
"""Manage pooled credentials."""
from hermes_cli.auth_commands import auth_command
auth_command(args)
def cmd_status(args):
"""Show status of all components."""
from hermes_cli.status import show_status
@@ -3318,7 +3324,7 @@ def _coalesce_session_name_args(argv: list) -> list:
or a known top-level subcommand.
"""
_SUBCOMMANDS = {
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout", "auth",
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
"mcp", "sessions", "insights", "version", "update", "uninstall",
"profile",
@@ -3607,6 +3613,10 @@ Examples:
hermes --resume <session_id> Resume a specific session by ID
hermes setup Run setup wizard
hermes logout Clear stored authentication
hermes auth add <provider> Add a pooled credential
hermes auth list List pooled credentials
hermes auth remove <p> <n> Remove pooled credential by index
hermes auth reset <provider> Clear exhaustion status for a provider
hermes model Select default model
hermes config View configuration
hermes config edit Edit config in $EDITOR
@@ -3925,6 +3935,33 @@ For more help on a command:
)
logout_parser.set_defaults(func=cmd_logout)
auth_parser = subparsers.add_parser(
"auth",
help="Manage pooled provider credentials",
)
auth_subparsers = auth_parser.add_subparsers(dest="auth_action")
auth_add = auth_subparsers.add_parser("add", help="Add a pooled credential")
auth_add.add_argument("provider", help="Provider id (for example: anthropic, openai-codex, openrouter)")
auth_add.add_argument("--type", dest="auth_type", choices=["oauth", "api-key", "api_key"], help="Credential type to add")
auth_add.add_argument("--label", help="Optional display label")
auth_add.add_argument("--api-key", help="API key value (otherwise prompted securely)")
auth_add.add_argument("--portal-url", help="Nous portal base URL")
auth_add.add_argument("--inference-url", help="Nous inference base URL")
auth_add.add_argument("--client-id", help="OAuth client id")
auth_add.add_argument("--scope", help="OAuth scope override")
auth_add.add_argument("--no-browser", action="store_true", help="Do not auto-open a browser for OAuth login")
auth_add.add_argument("--timeout", type=float, help="OAuth/network timeout in seconds")
auth_add.add_argument("--insecure", action="store_true", help="Disable TLS verification for OAuth login")
auth_add.add_argument("--ca-bundle", help="Custom CA bundle for OAuth login")
auth_list = auth_subparsers.add_parser("list", help="List pooled credentials")
auth_list.add_argument("provider", nargs="?", help="Optional provider filter")
auth_remove = auth_subparsers.add_parser("remove", help="Remove a pooled credential by index")
auth_remove.add_argument("provider", help="Provider id")
auth_remove.add_argument("index", type=int, help="1-based credential index")
auth_reset = auth_subparsers.add_parser("reset", help="Clear exhaustion status for all credentials for a provider")
auth_reset.add_argument("provider", help="Provider id")
auth_parser.set_defaults(func=cmd_auth)
# =========================================================================
# status command
# =========================================================================

View File

@@ -6,8 +6,10 @@ import os
from typing import Any, Dict, Optional
from hermes_cli import auth as auth_mod
from agent.credential_pool import CredentialPool, PooledCredential, get_custom_provider_pool_key, load_pool
from hermes_cli.auth import (
AuthError,
DEFAULT_CODEX_BASE_URL,
PROVIDER_REGISTRY,
format_auth_error,
resolve_provider,
@@ -109,6 +111,50 @@ def _parse_api_mode(raw: Any) -> Optional[str]:
return None
def _resolve_runtime_from_pool_entry(
*,
provider: str,
entry: PooledCredential,
requested_provider: str,
model_cfg: Optional[Dict[str, Any]] = None,
pool: Optional[CredentialPool] = None,
) -> Dict[str, Any]:
model_cfg = model_cfg or _get_model_config()
base_url = (getattr(entry, "runtime_base_url", None) or getattr(entry, "base_url", None) or "").rstrip("/")
api_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
api_mode = "chat_completions"
if provider == "openai-codex":
api_mode = "codex_responses"
base_url = base_url or DEFAULT_CODEX_BASE_URL
elif provider == "anthropic":
api_mode = "anthropic_messages"
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = ""
if cfg_provider == "anthropic":
cfg_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
base_url = cfg_base_url or base_url or "https://api.anthropic.com"
elif provider == "nous":
api_mode = "chat_completions"
elif provider == "copilot":
api_mode = _copilot_runtime_api_mode(model_cfg, getattr(entry, "runtime_api_key", ""))
else:
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
if configured_mode:
api_mode = configured_mode
elif base_url.rstrip("/").endswith("/anthropic"):
api_mode = "anthropic_messages"
return {
"provider": provider,
"api_mode": api_mode,
"base_url": base_url,
"api_key": api_key,
"source": getattr(entry, "source", "pool"),
"credential_pool": pool,
"requested_provider": requested_provider,
}
def resolve_requested_provider(requested: Optional[str] = None) -> str:
"""Resolve provider request from explicit arg, config, then env."""
if requested and requested.strip():
@@ -128,6 +174,37 @@ def resolve_requested_provider(requested: Optional[str] = None) -> str:
return "auto"
def _try_resolve_from_custom_pool(
base_url: str,
provider_label: str,
api_mode_override: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Check if a credential pool exists for a custom endpoint and return a runtime dict if so."""
pool_key = get_custom_provider_pool_key(base_url)
if not pool_key:
return None
try:
pool = load_pool(pool_key)
if not pool.has_credentials():
return None
entry = pool.select()
if entry is None:
return None
pool_api_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
if not pool_api_key:
return None
return {
"provider": provider_label,
"api_mode": api_mode_override or _detect_api_mode_for_url(base_url) or "chat_completions",
"base_url": base_url,
"api_key": pool_api_key,
"source": f"pool:{pool_key}",
"credential_pool": pool,
}
except Exception:
return None
def _get_named_custom_provider(requested_provider: str) -> Optional[Dict[str, Any]]:
requested_norm = _normalize_custom_provider_name(requested_provider or "")
if not requested_norm or requested_norm == "custom":
@@ -192,6 +269,11 @@ def _resolve_named_custom_runtime(
if not base_url:
return None
# Check if a credential pool exists for this custom endpoint
pool_result = _try_resolve_from_custom_pool(base_url, "custom", custom_provider.get("api_mode"))
if pool_result:
return pool_result
api_key_candidates = [
(explicit_api_key or "").strip(),
str(custom_provider.get("api_key", "") or "").strip(),
@@ -281,6 +363,15 @@ def _resolve_openrouter_runtime(
# Also provide a placeholder API key for local servers that don't require
# authentication — the OpenAI SDK requires a non-empty api_key string.
effective_provider = "custom" if requested_norm == "custom" else "openrouter"
# For custom endpoints, check if a credential pool exists
if effective_provider == "custom" and base_url:
pool_result = _try_resolve_from_custom_pool(
base_url, effective_provider, _parse_api_mode(model_cfg.get("api_mode")),
)
if pool_result:
return pool_result
if effective_provider == "custom" and not api_key and not _is_openrouter_url:
api_key = "no-key-required"
@@ -295,6 +386,134 @@ def _resolve_openrouter_runtime(
}
def _resolve_explicit_runtime(
*,
provider: str,
requested_provider: str,
model_cfg: Dict[str, Any],
explicit_api_key: Optional[str] = None,
explicit_base_url: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
explicit_api_key = str(explicit_api_key or "").strip()
explicit_base_url = str(explicit_base_url or "").strip().rstrip("/")
if not explicit_api_key and not explicit_base_url:
return None
if provider == "anthropic":
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = ""
if cfg_provider == "anthropic":
cfg_base_url = str(model_cfg.get("base_url") or "").strip().rstrip("/")
base_url = explicit_base_url or cfg_base_url or "https://api.anthropic.com"
api_key = explicit_api_key
if not api_key:
from agent.anthropic_adapter import resolve_anthropic_token
api_key = resolve_anthropic_token()
if not api_key:
raise AuthError(
"No Anthropic credentials found. Set ANTHROPIC_TOKEN or ANTHROPIC_API_KEY, "
"run 'claude setup-token', or authenticate with 'claude /login'."
)
return {
"provider": "anthropic",
"api_mode": "anthropic_messages",
"base_url": base_url,
"api_key": api_key,
"source": "explicit",
"requested_provider": requested_provider,
}
if provider == "openai-codex":
base_url = explicit_base_url or DEFAULT_CODEX_BASE_URL
api_key = explicit_api_key
last_refresh = None
if not api_key:
creds = resolve_codex_runtime_credentials()
api_key = creds.get("api_key", "")
last_refresh = creds.get("last_refresh")
if not explicit_base_url:
base_url = creds.get("base_url", "").rstrip("/") or base_url
return {
"provider": "openai-codex",
"api_mode": "codex_responses",
"base_url": base_url,
"api_key": api_key,
"source": "explicit",
"last_refresh": last_refresh,
"requested_provider": requested_provider,
}
if provider == "nous":
state = auth_mod.get_provider_auth_state("nous") or {}
base_url = (
explicit_base_url
or str(state.get("inference_base_url") or auth_mod.DEFAULT_NOUS_INFERENCE_URL).strip().rstrip("/")
)
api_key = explicit_api_key or str(state.get("agent_key") or state.get("access_token") or "").strip()
expires_at = state.get("agent_key_expires_at") or state.get("expires_at")
if not api_key:
creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
api_key = creds.get("api_key", "")
expires_at = creds.get("expires_at")
if not explicit_base_url:
base_url = creds.get("base_url", "").rstrip("/") or base_url
return {
"provider": "nous",
"api_mode": "chat_completions",
"base_url": base_url,
"api_key": api_key,
"source": "explicit",
"expires_at": expires_at,
"requested_provider": requested_provider,
}
pconfig = PROVIDER_REGISTRY.get(provider)
if pconfig and pconfig.auth_type == "api_key":
env_url = ""
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
base_url = explicit_base_url
if not base_url:
if provider == "kimi-coding":
creds = resolve_api_key_provider_credentials(provider)
base_url = creds.get("base_url", "").rstrip("/")
else:
base_url = env_url or pconfig.inference_base_url
api_key = explicit_api_key
if not api_key:
creds = resolve_api_key_provider_credentials(provider)
api_key = creds.get("api_key", "")
if not base_url:
base_url = creds.get("base_url", "").rstrip("/")
api_mode = "chat_completions"
if provider == "copilot":
api_mode = _copilot_runtime_api_mode(model_cfg, api_key)
else:
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
if configured_mode:
api_mode = configured_mode
elif base_url.rstrip("/").endswith("/anthropic"):
api_mode = "anthropic_messages"
return {
"provider": provider,
"api_mode": api_mode,
"base_url": base_url.rstrip("/"),
"api_key": api_key,
"source": "explicit",
"requested_provider": requested_provider,
}
return None
def resolve_runtime_provider(
*,
requested: Optional[str] = None,
@@ -318,6 +537,57 @@ def resolve_runtime_provider(
explicit_api_key=explicit_api_key,
explicit_base_url=explicit_base_url,
)
model_cfg = _get_model_config()
explicit_runtime = _resolve_explicit_runtime(
provider=provider,
requested_provider=requested_provider,
model_cfg=model_cfg,
explicit_api_key=explicit_api_key,
explicit_base_url=explicit_base_url,
)
if explicit_runtime:
return explicit_runtime
should_use_pool = provider != "openrouter"
if provider == "openrouter":
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
env_openrouter_base_url = os.getenv("OPENROUTER_BASE_URL", "").strip()
has_custom_endpoint = bool(
explicit_base_url
or env_openai_base_url
or env_openrouter_base_url
)
if cfg_base_url and cfg_provider in {"auto", "custom"}:
has_custom_endpoint = True
has_runtime_override = bool(explicit_api_key or explicit_base_url)
should_use_pool = (
requested_provider in {"openrouter", "auto"}
and not has_custom_endpoint
and not has_runtime_override
)
try:
pool = load_pool(provider) if should_use_pool else None
except Exception:
pool = None
if pool and pool.has_credentials():
entry = pool.select()
pool_api_key = ""
if entry is not None:
pool_api_key = (
getattr(entry, "runtime_api_key", None)
or getattr(entry, "access_token", "")
)
if entry is not None and pool_api_key:
return _resolve_runtime_from_pool_entry(
provider=provider,
entry=entry,
requested_provider=requested_provider,
model_cfg=model_cfg,
pool=pool,
)
if provider == "nous":
creds = resolve_nous_runtime_credentials(
@@ -371,7 +641,6 @@ def resolve_runtime_provider(
# Allow base URL override from config.yaml model.base_url, but only
# when the configured provider is anthropic — otherwise a non-Anthropic
# base_url (e.g. Codex endpoint) would leak into Anthropic requests.
model_cfg = _get_model_config()
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = ""
if cfg_provider == "anthropic":
@@ -390,7 +659,6 @@ def resolve_runtime_provider(
pconfig = PROVIDER_REGISTRY.get(provider)
if pconfig and pconfig.auth_type == "api_key":
creds = resolve_api_key_provider_credentials(provider)
model_cfg = _get_model_config()
base_url = creds.get("base_url", "").rstrip("/")
api_mode = "chat_completions"
if provider == "copilot":

View File

@@ -54,6 +54,32 @@ def _set_default_model(config: Dict[str, Any], model_name: str) -> None:
config["model"] = model_cfg
def _get_credential_pool_strategies(config: Dict[str, Any]) -> Dict[str, str]:
strategies = config.get("credential_pool_strategies")
return dict(strategies) if isinstance(strategies, dict) else {}
def _set_credential_pool_strategy(config: Dict[str, Any], provider: str, strategy: str) -> None:
if not provider:
return
strategies = _get_credential_pool_strategies(config)
strategies[provider] = strategy
config["credential_pool_strategies"] = strategies
def _supports_same_provider_pool_setup(provider: str) -> bool:
if not provider or provider == "custom":
return False
if provider == "openrouter":
return True
from hermes_cli.auth import PROVIDER_REGISTRY
pconfig = PROVIDER_REGISTRY.get(provider)
if not pconfig:
return False
return pconfig.auth_type in {"api_key", "oauth_device_code"}
# Default model lists per provider — used as fallback when the live
# /models endpoint can't be reached.
_DEFAULT_PROVIDER_MODELS = {
@@ -849,6 +875,85 @@ def setup_model_provider(config: dict):
selected_provider = _m.get("provider")
# ── Same-provider fallback & rotation setup ──
if _supports_same_provider_pool_setup(selected_provider):
try:
from types import SimpleNamespace
from agent.credential_pool import load_pool
from hermes_cli.auth_commands import auth_add_command
pool = load_pool(selected_provider)
entries = pool.entries()
entry_count = len(entries)
manual_count = sum(1 for entry in entries if str(getattr(entry, "source", "")).startswith("manual"))
auto_count = entry_count - manual_count
print()
print_header("Same-Provider Fallback & Rotation")
print_info(
"Hermes can keep multiple credentials for one provider and rotate between"
)
print_info(
"them when a credential is exhausted or rate-limited. This preserves"
)
print_info(
"your primary provider while reducing interruptions from quota issues."
)
print()
if auto_count > 0:
print_info(
f"Current pooled credentials for {selected_provider}: {entry_count} "
f"({manual_count} manual, {auto_count} auto-detected from env/shared auth)"
)
else:
print_info(f"Current pooled credentials for {selected_provider}: {entry_count}")
while prompt_yes_no("Add another credential for same-provider fallback?", False):
auth_add_command(
SimpleNamespace(
provider=selected_provider,
auth_type="",
label=None,
api_key=None,
portal_url=None,
inference_url=None,
client_id=None,
scope=None,
no_browser=False,
timeout=15.0,
insecure=False,
ca_bundle=None,
min_key_ttl_seconds=5 * 60,
)
)
pool = load_pool(selected_provider)
entry_count = len(pool.entries())
print_info(f"Provider pool now has {entry_count} credential(s).")
if entry_count > 1:
strategy_labels = [
"Fill-first / sticky — keep using the first healthy credential until it is exhausted",
"Round robin — rotate to the next healthy credential after each selection",
"Random — pick a random healthy credential each time",
]
current_strategy = _get_credential_pool_strategies(config).get(selected_provider, "fill_first")
default_strategy_idx = {
"fill_first": 0,
"round_robin": 1,
"random": 2,
}.get(current_strategy, 0)
strategy_idx = prompt_choice(
"Select same-provider rotation strategy:",
strategy_labels,
default_strategy_idx,
)
strategy_value = ["fill_first", "round_robin", "random"][strategy_idx]
_set_credential_pool_strategy(config, selected_provider, strategy_value)
print_success(f"Saved {selected_provider} rotation strategy: {strategy_value}")
else:
_set_credential_pool_strategy(config, selected_provider, "fill_first")
except Exception as exc:
logger.debug("Could not configure same-provider fallback in setup: %s", exc)
# ── Vision & Image Analysis Setup ──
# Keep setup aligned with the actual runtime resolver the vision tools use.
try:

View File

@@ -505,6 +505,7 @@ class AIAgent:
honcho_config=None,
iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None,
credential_pool=None,
checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 50,
pass_session_id: bool = False,
@@ -575,6 +576,7 @@ class AIAgent:
self.skip_context_files = skip_context_files
self.pass_session_id = pass_session_id
self.persist_session = persist_session
self._credential_pool = credential_pool
self.log_prefix_chars = log_prefix_chars
self.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
@@ -3775,6 +3777,93 @@ class AIAgent:
self._is_anthropic_oauth = _is_oauth_token(new_token)
return True
def _apply_client_headers_for_base_url(self, base_url: str) -> None:
from agent.auxiliary_client import _OR_HEADERS
normalized = (base_url or "").lower()
if "openrouter" in normalized:
self._client_kwargs["default_headers"] = dict(_OR_HEADERS)
elif "api.githubcopilot.com" in normalized:
from hermes_cli.models import copilot_default_headers
self._client_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in normalized:
self._client_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.3"}
else:
self._client_kwargs.pop("default_headers", None)
def _swap_credential(self, entry) -> None:
runtime_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
runtime_base = getattr(entry, "runtime_base_url", None) or getattr(entry, "base_url", None) or self.base_url
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, _is_oauth_token
try:
self._anthropic_client.close()
except Exception:
pass
self._anthropic_api_key = runtime_key
self._anthropic_base_url = runtime_base
self._anthropic_client = build_anthropic_client(runtime_key, runtime_base)
self._is_anthropic_oauth = _is_oauth_token(runtime_key) if self.provider == "anthropic" else False
self.api_key = runtime_key
self.base_url = runtime_base
return
self.api_key = runtime_key
self.base_url = runtime_base.rstrip("/") if isinstance(runtime_base, str) else runtime_base
self._client_kwargs["api_key"] = self.api_key
self._client_kwargs["base_url"] = self.base_url
self._apply_client_headers_for_base_url(self.base_url)
self._replace_primary_openai_client(reason="credential_rotation")
def _recover_with_credential_pool(
self,
*,
status_code: Optional[int],
has_retried_429: bool,
) -> tuple[bool, bool]:
"""Attempt credential recovery via pool rotation.
Returns (recovered, has_retried_429).
On 429: first occurrence retries same credential (sets flag True).
second consecutive 429 rotates to next credential (resets flag).
On 402: immediately rotates (billing exhaustion won't resolve with retry).
On 401: attempts token refresh before rotating.
"""
pool = self._credential_pool
if pool is None or status_code is None:
return False, has_retried_429
if status_code == 402:
next_entry = pool.mark_exhausted_and_rotate(status_code=402)
if next_entry is not None:
logger.info(f"Credential 402 (billing) — rotated to pool entry {getattr(next_entry, 'id', '?')}")
self._swap_credential(next_entry)
return True, False
return False, has_retried_429
if status_code == 429:
if not has_retried_429:
return False, True
next_entry = pool.mark_exhausted_and_rotate(status_code=429)
if next_entry is not None:
logger.info(f"Credential 429 (rate limit) — rotated to pool entry {getattr(next_entry, 'id', '?')}")
self._swap_credential(next_entry)
return True, False
return False, True
if status_code == 401:
refreshed = pool.try_refresh_current()
if refreshed is not None:
logger.info(f"Credential 401 — refreshed pool entry {getattr(refreshed, 'id', '?')}")
self._swap_credential(refreshed)
return True, has_retried_429
return False, has_retried_429
def _anthropic_messages_create(self, api_kwargs: dict):
if self.api_mode == "anthropic_messages":
self._try_refresh_anthropic_client_credentials()
@@ -6460,6 +6549,7 @@ class AIAgent:
codex_auth_retry_attempted = False
anthropic_auth_retry_attempted = False
nous_auth_retry_attempted = False
has_retried_429 = False
restart_with_compressed_messages = False
restart_with_length_continuation = False
@@ -6895,6 +6985,7 @@ class AIAgent:
if not self.quiet_mode:
self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)")
has_retried_429 = False # Reset on success
break # Success, exit retry loop
except InterruptedError:
@@ -6937,6 +7028,12 @@ class AIAgent:
# prompt or prefill. Fall through to normal error path.
status_code = getattr(api_error, "status_code", None)
recovered_with_pool, has_retried_429 = self._recover_with_credential_pool(
status_code=status_code,
has_retried_429=has_retried_429,
)
if recovered_with_pool:
continue
if (
self.api_mode == "codex_responses"
and self.provider == "openai-codex"

View File

@@ -198,7 +198,8 @@ class TestAnthropicOAuthFlag:
def test_api_key_no_oauth_flag(self, monkeypatch):
"""Regular API keys (sk-ant-api-*) should create client with is_oauth=False."""
with patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-api03-testkey1234"), \
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build:
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build, \
patch("agent.auxiliary_client._select_pool_entry", return_value=(False, None)):
mock_build.return_value = MagicMock()
from agent.auxiliary_client import _try_anthropic, AnthropicAuxiliaryClient
client, model = _try_anthropic()
@@ -207,6 +208,31 @@ class TestAnthropicOAuthFlag:
adapter = client.chat.completions
assert adapter._is_oauth is False
def test_pool_entry_takes_priority_over_legacy_resolution(self):
class _Entry:
access_token = "sk-ant-oat01-pooled"
base_url = "https://api.anthropic.com"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.anthropic_adapter.resolve_anthropic_token", side_effect=AssertionError("legacy path should not run")),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()) as mock_build,
):
from agent.auxiliary_client import _try_anthropic
client, model = _try_anthropic()
assert client is not None
assert model == "claude-haiku-4-5-20251001"
assert mock_build.call_args.args[0] == "sk-ant-oat01-pooled"
class TestExpiredCodexFallback:
"""Test that expired Codex tokens don't block the auto chain."""
@@ -392,7 +418,8 @@ class TestExplicitProviderRouting:
def test_explicit_anthropic_api_key(self, monkeypatch):
"""provider='anthropic' + regular API key should work with is_oauth=False."""
with patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-api-regular-key"), \
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build:
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build, \
patch("agent.auxiliary_client._select_pool_entry", return_value=(False, None)):
mock_build.return_value = MagicMock()
client, model = resolve_provider_client("anthropic")
assert client is not None
@@ -542,6 +569,32 @@ class TestGetTextAuxiliaryClient:
from agent.auxiliary_client import CodexAuxiliaryClient
assert isinstance(client, CodexAuxiliaryClient)
def test_codex_pool_entry_takes_priority_over_auth_store(self):
class _Entry:
access_token = "pooled-codex-token"
base_url = "https://chatgpt.com/backend-api/codex"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.auxiliary_client.OpenAI"),
patch("hermes_cli.auth._read_codex_tokens", side_effect=AssertionError("legacy codex store should not run")),
):
from agent.auxiliary_client import _try_codex
client, model = _try_codex()
from agent.auxiliary_client import CodexAuxiliaryClient
assert isinstance(client, CodexAuxiliaryClient)
assert model == "gpt-5.2-codex"
def test_returns_none_when_nothing_available(self, monkeypatch):
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
@@ -590,6 +643,35 @@ class TestVisionClientFallback:
assert client.__class__.__name__ == "AnthropicAuxiliaryClient"
assert model == "claude-haiku-4-5-20251001"
class TestAuxiliaryPoolAwareness:
def test_try_nous_uses_pool_entry(self):
class _Entry:
access_token = "pooled-access-token"
agent_key = "pooled-agent-key"
inference_base_url = "https://inference.pool.example/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
):
from agent.auxiliary_client import _try_nous
client, model = _try_nous()
assert client is not None
assert model == "gemini-3-flash"
call_kwargs = mock_openai.call_args.kwargs
assert call_kwargs["api_key"] == "pooled-agent-key"
assert call_kwargs["base_url"] == "https://inference.pool.example/v1"
def test_resolve_provider_client_copilot_uses_runtime_credentials(self, monkeypatch):
monkeypatch.delenv("GITHUB_TOKEN", raising=False)
monkeypatch.delenv("GH_TOKEN", raising=False)

View File

@@ -113,6 +113,205 @@ def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(
assert reloaded["model"]["provider"] == "zai"
def test_setup_same_provider_rotation_strategy_saved_for_multi_credential_pool(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
save_env_value("OPENROUTER_API_KEY", "or-key")
# Pre-write config so the pool step sees provider="openrouter"
_write_model_config("openrouter", "", "anthropic/claude-opus-4.6")
config = load_config()
class _Entry:
def __init__(self, label):
self.label = label
class _Pool:
def entries(self):
return [_Entry("primary"), _Entry("secondary")]
def fake_select():
pass # no-op — config already has provider set
def fake_prompt_choice(question, choices, default=0):
if "rotation strategy" in question:
return 1 # round robin
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
return default
def fake_prompt_yes_no(question, default=True):
return False
# Patch directly on the module objects to ensure local imports pick them up.
import hermes_cli.main as _main_mod
import hermes_cli.setup as _setup_mod
import agent.credential_pool as _pool_mod
import agent.auxiliary_client as _aux_mod
monkeypatch.setattr(_main_mod, "select_provider_and_model", fake_select)
# NOTE: _stub_tts overwrites prompt_choice, so set our mock AFTER it.
_stub_tts(monkeypatch)
monkeypatch.setattr(_setup_mod, "prompt_choice", fake_prompt_choice)
monkeypatch.setattr(_setup_mod, "prompt_yes_no", fake_prompt_yes_no)
monkeypatch.setattr(_setup_mod, "prompt", lambda *args, **kwargs: "")
monkeypatch.setattr(_pool_mod, "load_pool", lambda provider: _Pool())
monkeypatch.setattr(_aux_mod, "get_available_vision_backends", lambda: [])
setup_model_provider(config)
# The pool has 2 entries, so the strategy prompt should fire
strategy = config.get("credential_pool_strategies", {}).get("openrouter")
assert strategy == "round_robin", f"Expected round_robin but got {strategy}"
def test_setup_same_provider_fallback_can_add_another_credential(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
save_env_value("OPENROUTER_API_KEY", "or-key")
# Pre-write config so the pool step sees provider="openrouter"
_write_model_config("openrouter", "", "anthropic/claude-opus-4.6")
config = load_config()
pool_sizes = iter([1, 2])
add_calls = []
class _Entry:
def __init__(self, label):
self.label = label
class _Pool:
def __init__(self, size):
self._size = size
def entries(self):
return [_Entry(f"cred-{idx}") for idx in range(self._size)]
def fake_load_pool(provider):
return _Pool(next(pool_sizes))
def fake_auth_add_command(args):
add_calls.append(args.provider)
def fake_select():
pass # no-op — config already has provider set
def fake_prompt_choice(question, choices, default=0):
if question == "Select same-provider rotation strategy:":
return 0
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
return default
yes_no_answers = iter([True, False])
def fake_prompt_yes_no(question, default=True):
if question == "Add another credential for same-provider fallback?":
return next(yes_no_answers)
return False
monkeypatch.setattr("hermes_cli.main.select_provider_and_model", fake_select)
_stub_tts(monkeypatch)
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", fake_prompt_yes_no)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("agent.credential_pool.load_pool", fake_load_pool)
monkeypatch.setattr("hermes_cli.auth_commands.auth_add_command", fake_auth_add_command)
monkeypatch.setattr("agent.auxiliary_client.get_available_vision_backends", lambda: [])
setup_model_provider(config)
assert add_calls == ["openrouter"]
assert config.get("credential_pool_strategies", {}).get("openrouter") == "fill_first"
def test_setup_pool_step_shows_manual_vs_auto_detected_counts(tmp_path, monkeypatch, capsys):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
save_env_value("OPENROUTER_API_KEY", "or-key")
# Pre-write config so the pool step sees provider="openrouter"
_write_model_config("openrouter", "", "anthropic/claude-opus-4.6")
config = load_config()
class _Entry:
def __init__(self, label, source):
self.label = label
self.source = source
class _Pool:
def entries(self):
return [
_Entry("primary", "manual"),
_Entry("secondary", "manual"),
_Entry("OPENROUTER_API_KEY", "env:OPENROUTER_API_KEY"),
]
def fake_select():
pass # no-op — config already has provider set
def fake_prompt_choice(question, choices, default=0):
if "rotation strategy" in question:
return 0
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
return default
monkeypatch.setattr("hermes_cli.main.select_provider_and_model", fake_select)
_stub_tts(monkeypatch)
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", lambda *args, **kwargs: False)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("agent.credential_pool.load_pool", lambda provider: _Pool())
monkeypatch.setattr("agent.auxiliary_client.get_available_vision_backends", lambda: [])
setup_model_provider(config)
out = capsys.readouterr().out
assert "Current pooled credentials for openrouter: 3 (2 manual, 1 auto-detected from env/shared auth)" in out
def test_setup_copilot_acp_skips_same_provider_pool_step(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
_clear_provider_env(monkeypatch)
config = load_config()
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
return 15 # GitHub Copilot ACP
if question == "Select default model:":
return 0
if question == "Configure vision:":
return len(choices) - 1
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
def fake_prompt_yes_no(question, default=True):
if question == "Add another credential for same-provider fallback?":
raise AssertionError("same-provider pool prompt should not appear for copilot-acp")
return False
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", fake_prompt_yes_no)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("hermes_cli.auth.get_active_provider", lambda: None)
monkeypatch.setattr("hermes_cli.auth.detect_external_credentials", lambda: [])
monkeypatch.setattr("agent.auxiliary_client.get_available_vision_backends", lambda: [])
setup_model_provider(config)
assert config.get("credential_pool_strategies", {}) == {}
def test_setup_copilot_uses_gh_auth_and_saves_provider(tmp_path, monkeypatch):
"""Copilot provider saves correctly through delegation."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))

391
tests/test_auth_commands.py Normal file
View File

@@ -0,0 +1,391 @@
"""Tests for auth subcommands backed by the credential pool."""
from __future__ import annotations
import base64
import json
import pytest
def _write_auth_store(tmp_path, payload: dict) -> None:
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
def _jwt_with_email(email: str) -> str:
header = base64.urlsafe_b64encode(b'{"alg":"RS256","typ":"JWT"}').rstrip(b"=").decode()
payload = base64.urlsafe_b64encode(
json.dumps({"email": email}).encode()
).rstrip(b"=").decode()
return f"{header}.{payload}.signature"
@pytest.fixture(autouse=True)
def _clear_provider_env(monkeypatch):
for key in (
"OPENROUTER_API_KEY",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"ANTHROPIC_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN",
):
monkeypatch.delenv(key, raising=False)
def test_auth_add_api_key_persists_manual_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "openrouter"
auth_type = "api-key"
api_key = "sk-or-manual"
label = "personal"
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["openrouter"]
entry = next(item for item in entries if item["source"] == "manual")
assert entry["label"] == "personal"
assert entry["auth_type"] == "api_key"
assert entry["source"] == "manual"
assert entry["access_token"] == "sk-or-manual"
def test_auth_add_anthropic_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("claude@example.com")
monkeypatch.setattr(
"agent.anthropic_adapter.run_hermes_oauth_login_pure",
lambda: {
"access_token": token,
"refresh_token": "refresh-token",
"expires_at_ms": 1711234567000,
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "anthropic"
auth_type = "oauth"
api_key = None
label = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["anthropic"]
entry = next(item for item in entries if item["source"] == "manual:hermes_pkce")
assert entry["label"] == "claude@example.com"
assert entry["source"] == "manual:hermes_pkce"
assert entry["refresh_token"] == "refresh-token"
assert entry["expires_at_ms"] == 1711234567000
def test_auth_add_nous_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("nous@example.com")
monkeypatch.setattr(
"hermes_cli.auth._nous_device_code_login",
lambda **kwargs: {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",
"obtained_at": "2026-03-23T10:00:00+00:00",
"expires_at": "2026-03-23T11:00:00+00:00",
"expires_in": 3600,
"agent_key": "ak-test",
"agent_key_id": "ak-id",
"agent_key_expires_at": "2026-03-23T10:30:00+00:00",
"agent_key_expires_in": 1800,
"agent_key_reused": False,
"agent_key_obtained_at": "2026-03-23T10:00:10+00:00",
"tls": {"insecure": False, "ca_bundle": None},
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "nous"
auth_type = "oauth"
api_key = None
label = None
portal_url = None
inference_url = None
client_id = None
scope = None
no_browser = False
timeout = None
insecure = False
ca_bundle = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["nous"]
entry = next(item for item in entries if item["source"] == "manual:device_code")
assert entry["label"] == "nous@example.com"
assert entry["source"] == "manual:device_code"
assert entry["agent_key"] == "ak-test"
assert entry["portal_base_url"] == "https://portal.example.com"
def test_auth_add_codex_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("codex@example.com")
monkeypatch.setattr(
"hermes_cli.auth._codex_device_code_login",
lambda: {
"tokens": {
"access_token": token,
"refresh_token": "refresh-token",
},
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": "2026-03-23T10:00:00Z",
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "openai-codex"
auth_type = "oauth"
api_key = None
label = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["openai-codex"]
entry = next(item for item in entries if item["source"] == "manual:device_code")
assert entry["label"] == "codex@example.com"
assert entry["source"] == "manual:device_code"
assert entry["refresh_token"] == "refresh-token"
assert entry["base_url"] == "https://chatgpt.com/backend-api/codex"
def test_auth_remove_reindexes_priorities(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
# Prevent pool auto-seeding from host env vars and file-backed sources
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
monkeypatch.setattr(
"agent.credential_pool._seed_from_singletons",
lambda provider, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-ant-api-secondary",
},
]
},
},
)
from hermes_cli.auth_commands import auth_remove_command
class _Args:
provider = "anthropic"
index = 1
auth_remove_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["anthropic"]
assert len(entries) == 1
assert entries[0]["label"] == "secondary"
assert entries[0]["priority"] == 0
def test_auth_reset_clears_provider_statuses(tmp_path, monkeypatch, capsys):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
"last_status": "exhausted",
"last_status_at": 1711230000.0,
"last_error_code": 402,
}
]
},
},
)
from hermes_cli.auth_commands import auth_reset_command
class _Args:
provider = "anthropic"
auth_reset_command(_Args())
out = capsys.readouterr().out
assert "Reset status" in out
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entry = payload["credential_pool"]["anthropic"][0]
assert entry["last_status"] is None
assert entry["last_status_at"] is None
assert entry["last_error_code"] is None
def test_clear_provider_auth_removes_provider_pool_entries(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"active_provider": "anthropic",
"providers": {
"anthropic": {"access_token": "legacy-token"},
},
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "oauth",
"priority": 0,
"source": "manual:hermes_pkce",
"access_token": "pool-token",
}
],
"openrouter": [
{
"id": "cred-2",
"label": "other-provider",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-test",
}
],
},
},
)
from hermes_cli.auth import clear_provider_auth
assert clear_provider_auth("anthropic") is True
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
assert payload["active_provider"] is None
assert "anthropic" not in payload.get("providers", {})
assert "anthropic" not in payload.get("credential_pool", {})
assert "openrouter" in payload.get("credential_pool", {})
def test_auth_list_does_not_call_mutating_select(monkeypatch, capsys):
from hermes_cli.auth_commands import auth_list_command
class _Entry:
id = "cred-1"
label = "primary"
auth_type="***"
source = "manual"
last_status = None
last_error_code = None
last_status_at = None
class _Pool:
def entries(self):
return [_Entry()]
def peek(self):
return _Entry()
def select(self):
raise AssertionError("auth_list_command should not call select()")
monkeypatch.setattr(
"hermes_cli.auth_commands.load_pool",
lambda provider: _Pool() if provider == "openrouter" else type("_EmptyPool", (), {"entries": lambda self: []})(),
)
class _Args:
provider = "openrouter"
auth_list_command(_Args())
out = capsys.readouterr().out
assert "openrouter (1 credentials):" in out
assert "primary" in out
def test_auth_list_shows_exhausted_cooldown(monkeypatch, capsys):
from hermes_cli.auth_commands import auth_list_command
class _Entry:
id = "cred-1"
label = "primary"
auth_type = "api_key"
source = "manual"
last_status = "exhausted"
last_error_code = 429
last_status_at = 1000.0
class _Pool:
def entries(self):
return [_Entry()]
def peek(self):
return None
monkeypatch.setattr("hermes_cli.auth_commands.load_pool", lambda provider: _Pool())
monkeypatch.setattr("hermes_cli.auth_commands.time.time", lambda: 1030.0)
class _Args:
provider = "openrouter"
auth_list_command(_Args())
out = capsys.readouterr().out
assert "exhausted (429)" in out
assert "59m 30s left" in out

View File

@@ -0,0 +1,949 @@
"""Tests for multi-credential runtime pooling and rotation."""
from __future__ import annotations
import json
import time
import pytest
def _write_auth_store(tmp_path, payload: dict) -> None:
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
def test_fill_first_selection_skips_recently_exhausted_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "***",
"last_status": "exhausted",
"last_status_at": time.time(),
"last_error_code": 402,
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "***",
"last_status": "ok",
"last_status_at": None,
"last_error_code": None,
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entry = pool.select()
assert entry is not None
assert entry.id == "cred-2"
assert pool.current().id == "cred-2"
def test_select_clears_expired_exhaustion(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "old",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "***",
"last_status": "exhausted",
"last_status_at": time.time() - 90000,
"last_error_code": 402,
}
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entry = pool.select()
assert entry is not None
assert entry.last_status == "ok"
def test_round_robin_strategy_rotates_priorities(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "***",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "***",
},
]
},
},
)
config_path = tmp_path / "hermes" / "config.yaml"
config_path.write_text("credential_pool_strategies:\n openrouter: round_robin\n")
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
first = pool.select()
assert first is not None
assert first.id == "cred-1"
reloaded = load_pool("openrouter")
second = reloaded.select()
assert second is not None
assert second.id == "cred-2"
def test_random_strategy_uses_random_choice(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "***",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "***",
},
]
},
},
)
config_path = tmp_path / "hermes" / "config.yaml"
config_path.write_text("credential_pool_strategies:\n openrouter: random\n")
monkeypatch.setattr("agent.credential_pool.random.choice", lambda entries: entries[-1])
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
selected = pool.select()
assert selected is not None
assert selected.id == "cred-2"
def test_exhausted_entry_resets_after_ttl(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-primary",
"base_url": "https://openrouter.ai/api/v1",
"last_status": "exhausted",
"last_status_at": time.time() - 90000,
"last_error_code": 429,
}
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.id == "cred-1"
assert entry.last_status == "ok"
def test_mark_exhausted_and_rotate_persists_status(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-ant-api-secondary",
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
assert pool.select().id == "cred-1"
next_entry = pool.mark_exhausted_and_rotate(status_code=402)
assert next_entry is not None
assert next_entry.id == "cred-2"
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
persisted = auth_payload["credential_pool"]["anthropic"][0]
assert persisted["last_status"] == "exhausted"
assert persisted["last_error_code"] == 402
def test_try_refresh_current_updates_only_current_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openai-codex": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "oauth",
"priority": 0,
"source": "device_code",
"access_token": "access-old",
"refresh_token": "refresh-old",
"base_url": "https://chatgpt.com/backend-api/codex",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "oauth",
"priority": 1,
"source": "device_code",
"access_token": "access-other",
"refresh_token": "refresh-other",
"base_url": "https://chatgpt.com/backend-api/codex",
},
]
},
},
)
from agent.credential_pool import load_pool
monkeypatch.setattr(
"hermes_cli.auth.refresh_codex_oauth_pure",
lambda access_token, refresh_token, timeout_seconds=20.0: {
"access_token": "access-new",
"refresh_token": "refresh-new",
},
)
pool = load_pool("openai-codex")
current = pool.select()
assert current.id == "cred-1"
refreshed = pool.try_refresh_current()
assert refreshed is not None
assert refreshed.access_token == "access-new"
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
primary, secondary = auth_payload["credential_pool"]["openai-codex"]
assert primary["access_token"] == "access-new"
assert primary["refresh_token"] == "refresh-new"
assert secondary["access_token"] == "access-other"
assert secondary["refresh_token"] == "refresh-other"
def test_load_pool_seeds_env_api_key(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-seeded")
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.source == "env:OPENROUTER_API_KEY"
assert entry.access_token == "sk-or-seeded"
def test_load_pool_removes_stale_seeded_env_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "seeded-env",
"label": "OPENROUTER_API_KEY",
"auth_type": "api_key",
"priority": 0,
"source": "env:OPENROUTER_API_KEY",
"access_token": "stale-token",
"base_url": "https://openrouter.ai/api/v1",
}
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
assert pool.entries() == []
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
assert auth_payload["credential_pool"]["openrouter"] == []
def test_load_pool_migrates_nous_provider_state(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"active_provider": "nous",
"providers": {
"nous": {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
"agent_key": "agent-key",
"agent_key_expires_at": "2026-03-24T13:30:00+00:00",
}
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("nous")
entry = pool.select()
assert entry is not None
assert entry.source == "device_code"
assert entry.portal_base_url == "https://portal.example.com"
assert entry.agent_key == "agent-key"
def test_load_pool_removes_stale_file_backed_singleton_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "seeded-file",
"label": "claude-code",
"auth_type": "oauth",
"priority": 0,
"source": "claude_code",
"access_token": "stale-access-token",
"refresh_token": "stale-refresh-token",
"expires_at_ms": int(time.time() * 1000) + 60_000,
}
]
},
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_hermes_oauth_credentials",
lambda: None,
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: None,
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
assert pool.entries() == []
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
assert auth_payload["credential_pool"]["anthropic"] == []
def test_load_pool_migrates_nous_provider_state_preserves_tls(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"active_provider": "nous",
"providers": {
"nous": {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
"agent_key": "agent-key",
"agent_key_expires_at": "2026-03-24T13:30:00+00:00",
"tls": {
"insecure": True,
"ca_bundle": "/tmp/nous-ca.pem",
},
}
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("nous")
entry = pool.select()
assert entry is not None
assert entry.tls == {
"insecure": True,
"ca_bundle": "/tmp/nous-ca.pem",
}
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
assert auth_payload["credential_pool"]["nous"][0]["tls"] == {
"insecure": True,
"ca_bundle": "/tmp/nous-ca.pem",
}
def test_singleton_seed_does_not_clobber_manual_oauth_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "manual-1",
"label": "manual-pkce",
"auth_type": "oauth",
"priority": 0,
"source": "manual:hermes_pkce",
"access_token": "manual-token",
"refresh_token": "manual-refresh",
"expires_at_ms": 1711234567000,
}
]
},
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_hermes_oauth_credentials",
lambda: {
"accessToken": "seeded-token",
"refreshToken": "seeded-refresh",
"expiresAt": 1711234999000,
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: None,
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entries = pool.entries()
assert len(entries) == 2
assert {entry.source for entry in entries} == {"manual:hermes_pkce", "hermes_pkce"}
def test_load_pool_prefers_anthropic_env_token_over_file_backed_oauth(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.setenv("ANTHROPIC_TOKEN", "env-override-token")
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
monkeypatch.setattr(
"agent.anthropic_adapter.read_hermes_oauth_credentials",
lambda: {
"accessToken": "file-backed-token",
"refreshToken": "refresh-token",
"expiresAt": int(time.time() * 1000) + 3_600_000,
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: None,
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entry = pool.select()
assert entry is not None
assert entry.source == "env:ANTHROPIC_TOKEN"
assert entry.access_token == "env-override-token"
def test_least_used_strategy_selects_lowest_count(tmp_path, monkeypatch):
"""least_used strategy should select the credential with the lowest request_count."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setattr(
"agent.credential_pool.get_pool_strategy",
lambda _provider: "least_used",
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_singletons",
lambda provider, entries: (False, set()),
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_env",
lambda provider, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "key-a",
"label": "heavy",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-heavy",
"request_count": 100,
},
{
"id": "key-b",
"label": "light",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-or-light",
"request_count": 10,
},
{
"id": "key-c",
"label": "medium",
"auth_type": "api_key",
"priority": 2,
"source": "manual",
"access_token": "sk-or-medium",
"request_count": 50,
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.id == "key-b"
assert entry.access_token == "sk-or-light"
def test_mark_used_increments_request_count(tmp_path, monkeypatch):
"""mark_used should increment the request_count of the current entry."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setattr(
"agent.credential_pool.get_pool_strategy",
lambda _provider: "fill_first",
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_singletons",
lambda provider, entries: (False, set()),
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_env",
lambda provider, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "key-a",
"label": "test",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-test",
"request_count": 5,
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.request_count == 5
pool.mark_used()
updated = pool.current()
assert updated is not None
assert updated.request_count == 6
def test_thread_safety_concurrent_select(tmp_path, monkeypatch):
"""Concurrent select() calls should not corrupt pool state."""
import threading as _threading
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setattr(
"agent.credential_pool.get_pool_strategy",
lambda _provider: "round_robin",
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_singletons",
lambda provider, entries: (False, set()),
)
monkeypatch.setattr(
"agent.credential_pool._seed_from_env",
lambda provider, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": f"key-{i}",
"label": f"key-{i}",
"auth_type": "api_key",
"priority": i,
"source": "manual",
"access_token": f"sk-or-{i}",
}
for i in range(5)
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
results = []
errors = []
def worker():
try:
for _ in range(20):
entry = pool.select()
if entry:
results.append(entry.id)
pool.mark_used(entry.id)
except Exception as exc:
errors.append(exc)
threads = [_threading.Thread(target=worker) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors, f"Thread errors: {errors}"
assert len(results) == 80 # 4 threads * 20 selects
def test_custom_endpoint_pool_keyed_by_name(tmp_path, monkeypatch):
"""Verify load_pool('custom:together.ai') works and returns entries from auth.json."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
# Disable seeding so we only test stored entries
monkeypatch.setattr(
"agent.credential_pool._seed_custom_pool",
lambda pool_key, entries: (False, set()),
)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"custom:together.ai": [
{
"id": "cred-1",
"label": "together-key",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-together-xxx",
"base_url": "https://api.together.ai/v1",
},
{
"id": "cred-2",
"label": "together-key-2",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-together-yyy",
"base_url": "https://api.together.ai/v1",
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("custom:together.ai")
assert pool.has_credentials()
entries = pool.entries()
assert len(entries) == 2
assert entries[0].access_token == "sk-together-xxx"
assert entries[1].access_token == "sk-together-yyy"
# Select should return the first entry (fill_first default)
entry = pool.select()
assert entry is not None
assert entry.id == "cred-1"
def test_custom_endpoint_pool_seeds_from_config(tmp_path, monkeypatch):
"""Verify seeding from custom_providers api_key in config.yaml."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1})
# Write config.yaml with a custom_providers entry
config_path = tmp_path / "hermes" / "config.yaml"
import yaml
config_path.write_text(yaml.dump({
"custom_providers": [
{
"name": "Together.ai",
"base_url": "https://api.together.ai/v1",
"api_key": "sk-config-seeded",
}
]
}))
from agent.credential_pool import load_pool
pool = load_pool("custom:together.ai")
assert pool.has_credentials()
entries = pool.entries()
assert len(entries) == 1
assert entries[0].access_token == "sk-config-seeded"
assert entries[0].source == "config:Together.ai"
def test_custom_endpoint_pool_seeds_from_model_config(tmp_path, monkeypatch):
"""Verify seeding from model.api_key when model.provider=='custom' and base_url matches."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1})
import yaml
config_path = tmp_path / "hermes" / "config.yaml"
config_path.write_text(yaml.dump({
"custom_providers": [
{
"name": "Together.ai",
"base_url": "https://api.together.ai/v1",
}
],
"model": {
"provider": "custom",
"base_url": "https://api.together.ai/v1",
"api_key": "sk-model-key",
},
}))
from agent.credential_pool import load_pool
pool = load_pool("custom:together.ai")
assert pool.has_credentials()
entries = pool.entries()
# Should have the model_config entry
model_entries = [e for e in entries if e.source == "model_config"]
assert len(model_entries) == 1
assert model_entries[0].access_token == "sk-model-key"
def test_custom_pool_does_not_break_existing_providers(tmp_path, monkeypatch):
"""Existing registry providers work exactly as before with custom pool support."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-test")
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.source == "env:OPENROUTER_API_KEY"
assert entry.access_token == "sk-or-test"
def test_get_custom_provider_pool_key(tmp_path, monkeypatch):
"""get_custom_provider_pool_key maps base_url to custom:<name> pool key."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
(tmp_path / "hermes").mkdir(parents=True, exist_ok=True)
import yaml
config_path = tmp_path / "hermes" / "config.yaml"
config_path.write_text(yaml.dump({
"custom_providers": [
{
"name": "Together.ai",
"base_url": "https://api.together.ai/v1",
"api_key": "sk-xxx",
},
{
"name": "My Local Server",
"base_url": "http://localhost:8080/v1",
},
]
}))
from agent.credential_pool import get_custom_provider_pool_key
assert get_custom_provider_pool_key("https://api.together.ai/v1") == "custom:together.ai"
assert get_custom_provider_pool_key("https://api.together.ai/v1/") == "custom:together.ai"
assert get_custom_provider_pool_key("http://localhost:8080/v1") == "custom:my-local-server"
assert get_custom_provider_pool_key("https://unknown.example.com/v1") is None
assert get_custom_provider_pool_key("") is None
def test_list_custom_pool_providers(tmp_path, monkeypatch):
"""list_custom_pool_providers returns custom: pool keys from auth.json."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "a1",
"label": "test",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-xxx",
}
],
"custom:together.ai": [
{
"id": "c1",
"label": "together",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-tog-xxx",
}
],
"custom:fireworks": [
{
"id": "c2",
"label": "fireworks",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-fw-xxx",
}
],
"custom:empty": [],
},
},
)
from agent.credential_pool import list_custom_pool_providers
result = list_custom_pool_providers()
assert result == ["custom:fireworks", "custom:together.ai"]
# "custom:empty" not included because it's empty

View File

@@ -1771,6 +1771,62 @@ class TestNousCredentialRefresh:
assert isinstance(agent.client, _RebuiltClient)
class TestCredentialPoolRecovery:
def test_recover_with_pool_rotates_on_402(self, agent):
current = SimpleNamespace(label="primary")
next_entry = SimpleNamespace(label="secondary")
class _Pool:
def current(self):
return current
def mark_exhausted_and_rotate(self, *, status_code):
assert status_code == 402
return next_entry
agent._credential_pool = _Pool()
agent._swap_credential = MagicMock()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=402,
has_retried_429=False,
)
assert recovered is True
assert retry_same is False
agent._swap_credential.assert_called_once_with(next_entry)
def test_recover_with_pool_retries_first_429_then_rotates(self, agent):
next_entry = SimpleNamespace(label="secondary")
class _Pool:
def current(self):
return SimpleNamespace(label="primary")
def mark_exhausted_and_rotate(self, *, status_code):
assert status_code == 429
return next_entry
agent._credential_pool = _Pool()
agent._swap_credential = MagicMock()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=False,
)
assert recovered is False
assert retry_same is True
agent._swap_credential.assert_not_called()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=True,
)
assert recovered is True
assert retry_same is False
agent._swap_credential.assert_called_once_with(next_entry)
class TestMaxTokensParam:
"""Verify _max_tokens_param returns the correct key for each provider."""

View File

@@ -1,6 +1,123 @@
from hermes_cli import runtime_provider as rp
def test_resolve_runtime_provider_uses_credential_pool(monkeypatch):
class _Entry:
access_token = "pool-token"
source = "manual"
base_url = "https://chatgpt.com/backend-api/codex"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["provider"] == "openai-codex"
assert resolved["api_key"] == "pool-token"
assert resolved["credential_pool"] is not None
assert resolved["source"] == "manual"
def test_resolve_runtime_provider_anthropic_pool_respects_config_base_url(monkeypatch):
class _Entry:
access_token = "pool-token"
source = "manual"
base_url = "https://api.anthropic.com"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "anthropic")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "anthropic",
"base_url": "https://proxy.example.com/anthropic",
},
)
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
resolved = rp.resolve_runtime_provider(requested="anthropic")
assert resolved["provider"] == "anthropic"
assert resolved["api_mode"] == "anthropic_messages"
assert resolved["api_key"] == "pool-token"
assert resolved["base_url"] == "https://proxy.example.com/anthropic"
def test_resolve_runtime_provider_anthropic_explicit_override_skips_pool(monkeypatch):
def _unexpected_pool(provider):
raise AssertionError(f"load_pool should not be called for {provider}")
def _unexpected_anthropic_token():
raise AssertionError("resolve_anthropic_token should not be called")
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "anthropic")
monkeypatch.setattr(
rp,
"_get_model_config",
lambda: {
"provider": "anthropic",
"base_url": "https://config.example.com/anthropic",
},
)
monkeypatch.setattr(rp, "load_pool", _unexpected_pool)
monkeypatch.setattr(
"agent.anthropic_adapter.resolve_anthropic_token",
_unexpected_anthropic_token,
)
resolved = rp.resolve_runtime_provider(
requested="anthropic",
explicit_api_key="anthropic-explicit-token",
explicit_base_url="https://proxy.example.com/anthropic/",
)
assert resolved["provider"] == "anthropic"
assert resolved["api_mode"] == "anthropic_messages"
assert resolved["api_key"] == "anthropic-explicit-token"
assert resolved["base_url"] == "https://proxy.example.com/anthropic"
assert resolved["source"] == "explicit"
assert resolved.get("credential_pool") is None
def test_resolve_runtime_provider_falls_back_when_pool_empty(monkeypatch):
class _Pool:
def has_credentials(self):
return False
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.setattr(
rp,
"resolve_codex_runtime_credentials",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
"source": "hermes-auth-store",
"last_refresh": "2026-02-26T00:00:00Z",
},
)
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["api_key"] == "codex-token"
assert resolved.get("credential_pool") is None
def test_resolve_runtime_provider_codex(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(
@@ -40,6 +157,36 @@ def test_resolve_runtime_provider_ai_gateway(monkeypatch):
assert resolved["requested_provider"] == "ai-gateway"
def test_resolve_runtime_provider_ai_gateway_explicit_override_skips_pool(monkeypatch):
def _unexpected_pool(provider):
raise AssertionError(f"load_pool should not be called for {provider}")
def _unexpected_provider_resolution(provider):
raise AssertionError(f"resolve_api_key_provider_credentials should not be called for {provider}")
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "ai-gateway")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setattr(rp, "load_pool", _unexpected_pool)
monkeypatch.setattr(
rp,
"resolve_api_key_provider_credentials",
_unexpected_provider_resolution,
)
resolved = rp.resolve_runtime_provider(
requested="ai-gateway",
explicit_api_key="ai-gateway-explicit-token",
explicit_base_url="https://proxy.example.com/v1/",
)
assert resolved["provider"] == "ai-gateway"
assert resolved["api_mode"] == "chat_completions"
assert resolved["api_key"] == "ai-gateway-explicit-token"
assert resolved["base_url"] == "https://proxy.example.com/v1"
assert resolved["source"] == "explicit"
assert resolved.get("credential_pool") is None
def test_resolve_runtime_provider_openrouter_explicit(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
@@ -61,6 +208,69 @@ def test_resolve_runtime_provider_openrouter_explicit(monkeypatch):
assert resolved["source"] == "explicit"
def test_resolve_runtime_provider_auto_uses_openrouter_pool(monkeypatch):
class _Entry:
access_token = "pool-key"
source = "manual"
base_url = "https://openrouter.ai/api/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="auto")
assert resolved["provider"] == "openrouter"
assert resolved["api_key"] == "pool-key"
assert resolved["base_url"] == "https://openrouter.ai/api/v1"
assert resolved["source"] == "manual"
assert resolved.get("credential_pool") is not None
def test_resolve_runtime_provider_openrouter_explicit_api_key_skips_pool(monkeypatch):
class _Entry:
access_token = "pool-key"
source = "manual"
base_url = "https://openrouter.ai/api/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(
requested="openrouter",
explicit_api_key="explicit-key",
)
assert resolved["provider"] == "openrouter"
assert resolved["api_key"] == "explicit-key"
assert resolved["base_url"] == rp.OPENROUTER_BASE_URL
assert resolved["source"] == "explicit"
assert resolved.get("credential_pool") is None
def test_resolve_runtime_provider_openrouter_ignores_codex_config_base_url(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(
@@ -136,16 +346,19 @@ def test_openai_key_used_when_no_openrouter_key(monkeypatch):
def test_custom_endpoint_prefers_openai_key(monkeypatch):
"""Custom endpoint should use OPENAI_API_KEY, not OPENROUTER_API_KEY.
"""Custom endpoint should use config api_key over OPENROUTER_API_KEY.
Regression test for #560: when base_url is a non-OpenRouter endpoint,
OPENROUTER_API_KEY was being sent as the auth header instead of OPENAI_API_KEY.
Updated for #4165: config.yaml is now the source of truth for endpoint URLs,
OPENAI_BASE_URL env var is no longer consulted.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://api.z.ai/api/coding/paas/v4")
monkeypatch.setattr(rp, "_get_model_config", lambda: {
"provider": "custom",
"base_url": "https://api.z.ai/api/coding/paas/v4",
"api_key": "zai-key",
})
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "zai-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "openrouter-key")
resolved = rp.resolve_runtime_provider(requested="custom")
@@ -221,19 +434,22 @@ def test_custom_endpoint_uses_config_api_field_when_no_api_key(monkeypatch):
assert resolved["api_key"] == "config-api-field"
def test_custom_endpoint_auto_provider_prefers_openai_key(monkeypatch):
"""Auto provider with non-OpenRouter base_url should prefer OPENAI_API_KEY.
def test_custom_endpoint_explicit_custom_prefers_config_key(monkeypatch):
"""Explicit 'custom' provider with config base_url+api_key should use them.
Same as #560 but via 'hermes model' flow which sets provider to 'auto'.
Updated for #4165: config.yaml is the source of truth, not OPENAI_BASE_URL.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://my-vllm-server.example.com/v1")
monkeypatch.setattr(rp, "_get_model_config", lambda: {
"provider": "custom",
"base_url": "https://my-vllm-server.example.com/v1",
"api_key": "sk-vllm-key",
})
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-vllm-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-...leak")
resolved = rp.resolve_runtime_provider(requested="auto")
resolved = rp.resolve_runtime_provider(requested="custom")
assert resolved["base_url"] == "https://my-vllm-server.example.com/v1"
assert resolved["api_key"] == "sk-vllm-key"
@@ -359,6 +575,36 @@ def test_explicit_openrouter_skips_openai_base_url(monkeypatch):
assert resolved["api_key"] == "or-test-key"
def test_explicit_openrouter_honors_openrouter_base_url_over_pool(monkeypatch):
class _Entry:
access_token = "pool-key"
source = "manual"
base_url = "https://openrouter.ai/api/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.setenv("OPENROUTER_BASE_URL", "https://mirror.example.com/v1")
monkeypatch.setenv("OPENROUTER_API_KEY", "mirror-key")
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
resolved = rp.resolve_runtime_provider(requested="openrouter")
assert resolved["provider"] == "openrouter"
assert resolved["base_url"] == "https://mirror.example.com/v1"
assert resolved["api_key"] == "mirror-key"
assert resolved["source"] == "env/config"
assert resolved.get("credential_pool") is None
def test_resolve_requested_provider_precedence(monkeypatch):
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})

View File

@@ -593,7 +593,14 @@ class TestDelegationCredentialResolution(unittest.TestCase):
"model": "qwen2.5-coder",
"base_url": "http://localhost:1234/v1",
}
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "env-openrouter-key"}, clear=False):
with patch.dict(
os.environ,
{
"OPENROUTER_API_KEY": "env-openrouter-key",
"OPENAI_API_KEY": "",
},
clear=False,
):
with self.assertRaises(ValueError) as ctx:
_resolve_delegation_credentials(cfg, parent)
self.assertIn("OPENAI_API_KEY", str(ctx.exception))

View File

@@ -18,6 +18,11 @@ import pytest
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _clear_openai_env(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
class TestGetProvider:
"""_get_provider() picks the right backend based on config + availability."""

View File

@@ -38,6 +38,7 @@ hermes [global-options] <command> [subcommand/options]
| `hermes setup` | Interactive setup wizard for all or part of the configuration. |
| `hermes whatsapp` | Configure and pair the WhatsApp bridge. |
| `hermes login` / `logout` | Authenticate with OAuth-backed providers. |
| `hermes auth` | Manage credential pools — add, list, remove, reset, set strategy. |
| `hermes status` | Show agent, auth, and platform status. |
| `hermes cron` | Inspect and tick the cron scheduler. |
| `hermes webhook` | Manage dynamic webhook subscriptions for event-driven activation. |
@@ -192,6 +193,22 @@ Useful options for `login`:
- `--ca-bundle <pem>`
- `--insecure`
## `hermes auth`
Manage credential pools for same-provider key rotation. See [Credential Pools](/docs/user-guide/features/credential-pools) for full documentation.
```bash
hermes auth # Interactive wizard
hermes auth list # Show all pools
hermes auth list openrouter # Show specific provider
hermes auth add openrouter --api-key sk-or-v1-xxx # Add API key
hermes auth add anthropic --type oauth # Add OAuth credential
hermes auth remove openrouter 2 # Remove by index
hermes auth reset openrouter # Clear cooldowns
```
Subcommands: `add`, `list`, `remove`, `reset`. When called with no subcommand, launches the interactive management wizard.
## `hermes status`
```bash

View File

@@ -478,6 +478,18 @@ If auto-compression is disabled, the warning tells you context may be truncated
Context pressure is automatic — no configuration needed. It fires purely as a user-facing notification and does not modify the message stream or inject anything into the model's context.
## Credential Pool Strategies
When you have multiple API keys or OAuth tokens for the same provider, configure the rotation strategy:
```yaml
credential_pool_strategies:
openrouter: round_robin # cycle through keys evenly
anthropic: least_used # always pick the least-used key
```
Options: `fill_first` (default), `round_robin`, `least_used`, `random`. See [Credential Pools](/docs/user-guide/features/credential-pools) for full documentation.
## Auxiliary Models
Hermes uses lightweight "auxiliary" models for side tasks like image analysis, web page summarization, and browser screenshot analysis. By default, these use **Gemini Flash** via auto-detection — you don't need to configure anything.

View File

@@ -0,0 +1,230 @@
---
title: Credential Pools
description: Pool multiple API keys or OAuth tokens per provider for automatic rotation and rate limit recovery.
sidebar_label: Credential Pools
sidebar_position: 9
---
# Credential Pools
Credential pools let you register multiple API keys or OAuth tokens for the same provider. When one key hits a rate limit or billing quota, Hermes automatically rotates to the next healthy key — keeping your session alive without switching providers.
This is different from [fallback providers](./fallback-providers.md), which switch to a *different* provider entirely. Credential pools are same-provider rotation; fallback providers are cross-provider failover. Pools are tried first — if all pool keys are exhausted, *then* the fallback provider activates.
## How It Works
```
Your request
→ Pick key from pool (round_robin / least_used / fill_first / random)
→ Send to provider
→ 429 rate limit?
→ Retry same key once (transient blip)
→ Second 429 → rotate to next pool key
→ All keys exhausted → fallback_model (different provider)
→ 402 billing error?
→ Immediately rotate to next pool key (24h cooldown)
→ 401 auth expired?
→ Try refreshing the token (OAuth)
→ Refresh failed → rotate to next pool key
→ Success → continue normally
```
## Quick Start
If you already have an API key set in `.env`, Hermes auto-discovers it as a 1-key pool. To benefit from pooling, add more keys:
```bash
# Add a second OpenRouter key
hermes auth add openrouter --api-key sk-or-v1-your-second-key
# Add a second Anthropic key
hermes auth add anthropic --type api-key --api-key sk-ant-api03-your-second-key
# Add an Anthropic OAuth credential (Claude Code subscription)
hermes auth add anthropic --type oauth
# Opens browser for OAuth login
```
Check your pools:
```bash
hermes auth list
```
Output:
```
openrouter (2 credentials):
#1 OPENROUTER_API_KEY api_key env:OPENROUTER_API_KEY ←
#2 backup-key api_key manual
anthropic (3 credentials):
#1 hermes_pkce oauth hermes_pkce ←
#2 claude_code oauth claude_code
#3 ANTHROPIC_API_KEY api_key env:ANTHROPIC_API_KEY
```
The `←` marks the currently selected credential.
## Interactive Management
Run `hermes auth` with no subcommand for an interactive wizard:
```bash
hermes auth
```
This shows your full pool status and offers a menu:
```
What would you like to do?
1. Add a credential
2. Remove a credential
3. Reset cooldowns for a provider
4. Set rotation strategy for a provider
5. Exit
```
For providers that support both API keys and OAuth (Anthropic, Nous, Codex), the add flow asks which type:
```
anthropic supports both API keys and OAuth login.
1. API key (paste a key from the provider dashboard)
2. OAuth login (authenticate via browser)
Type [1/2]:
```
## CLI Commands
| Command | Description |
|---------|-------------|
| `hermes auth` | Interactive pool management wizard |
| `hermes auth list` | Show all pools and credentials |
| `hermes auth list <provider>` | Show a specific provider's pool |
| `hermes auth add <provider>` | Add a credential (prompts for type and key) |
| `hermes auth add <provider> --type api-key --api-key <key>` | Add an API key non-interactively |
| `hermes auth add <provider> --type oauth` | Add an OAuth credential via browser login |
| `hermes auth remove <provider> <index>` | Remove credential by 1-based index |
| `hermes auth reset <provider>` | Clear all cooldowns/exhaustion status |
## Rotation Strategies
Configure via `hermes auth` → "Set rotation strategy" or in `config.yaml`:
```yaml
credential_pool_strategies:
openrouter: round_robin
anthropic: least_used
```
| Strategy | Behavior |
|----------|----------|
| `fill_first` (default) | Use the first healthy key until it's exhausted, then move to the next |
| `round_robin` | Cycle through keys evenly, rotating after each selection |
| `least_used` | Always pick the key with the lowest request count |
| `random` | Random selection among healthy keys |
## Error Recovery
The pool handles different errors differently:
| Error | Behavior | Cooldown |
|-------|----------|----------|
| **429 Rate Limit** | Retry same key once (transient). Second consecutive 429 rotates to next key | 1 hour |
| **402 Billing/Quota** | Immediately rotate to next key | 24 hours |
| **401 Auth Expired** | Try refreshing the OAuth token first. Rotate only if refresh fails | — |
| **All keys exhausted** | Fall through to `fallback_model` if configured | — |
The `has_retried_429` flag resets on every successful API call, so a single transient 429 doesn't trigger rotation.
## Custom Endpoint Pools
Custom OpenAI-compatible endpoints (Together.ai, RunPod, local servers) get their own pools, keyed by the endpoint name from `custom_providers` in config.yaml.
When you set up a custom endpoint via `hermes model`, it auto-generates a name like "Together.ai" or "Local (localhost:8080)". This name becomes the pool key.
```bash
# After setting up a custom endpoint via hermes model:
hermes auth list
# Shows:
# Together.ai (1 credential):
# #1 config key api_key config:Together.ai ←
# Add a second key for the same endpoint:
hermes auth add Together.ai --api-key sk-together-second-key
```
Custom endpoint pools are stored in `auth.json` under `credential_pool` with a `custom:` prefix:
```json
{
"credential_pool": {
"openrouter": [...],
"custom:together.ai": [...]
}
}
```
## Auto-Discovery
Hermes automatically discovers credentials from multiple sources and seeds the pool on startup:
| Source | Example | Auto-seeded? |
|--------|---------|-------------|
| Environment variables | `OPENROUTER_API_KEY`, `ANTHROPIC_API_KEY` | Yes |
| OAuth tokens (auth.json) | Codex device code, Nous device code | Yes |
| Claude Code credentials | `~/.claude/.credentials.json` | Yes (Anthropic) |
| Hermes PKCE OAuth | `~/.hermes/auth.json` | Yes (Anthropic) |
| Custom endpoint config | `model.api_key` in config.yaml | Yes (custom endpoints) |
| Manual entries | Added via `hermes auth add` | Persisted in auth.json |
Auto-seeded entries are updated on each pool load — if you remove an env var, its pool entry is automatically pruned. Manual entries (added via `hermes auth add`) are never auto-pruned.
## Thread Safety
The credential pool uses a threading lock for all state mutations (`select()`, `mark_exhausted_and_rotate()`, `try_refresh_current()`, `mark_used()`). This ensures safe concurrent access when the gateway handles multiple chat sessions simultaneously.
## Architecture
For the full data flow diagram, see [`docs/credential-pool-flow.excalidraw`](https://excalidraw.com/#json=2Ycqhqpi6f12E_3ITyiwh,c7u9jSt5BwrmiVzHGbm87g) in the repository.
The credential pool integrates at the provider resolution layer:
1. **`agent/credential_pool.py`** — Pool manager: storage, selection, rotation, cooldowns
2. **`hermes_cli/auth_commands.py`** — CLI commands and interactive wizard
3. **`hermes_cli/runtime_provider.py`** — Pool-aware credential resolution
4. **`run_agent.py`** — Error recovery: 429/402/401 → pool rotation → fallback
## Storage
Pool state is stored in `~/.hermes/auth.json` under the `credential_pool` key:
```json
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "abc123",
"label": "OPENROUTER_API_KEY",
"auth_type": "api_key",
"priority": 0,
"source": "env:OPENROUTER_API_KEY",
"access_token": "sk-or-v1-...",
"last_status": "ok",
"request_count": 142
}
]
},
"credential_pool_strategies": {
"openrouter": "round_robin"
}
}
```
Strategies are stored in `config.yaml` (not `auth.json`):
```yaml
credential_pool_strategies:
openrouter: round_robin
anthropic: least_used
```

View File

@@ -7,12 +7,13 @@ sidebar_position: 8
# Fallback Providers
Hermes Agent has two separate fallback systems that keep your sessions running when providers hit issues:
Hermes Agent has three layers of resilience that keep your sessions running when providers hit issues:
1. **Primary model fallback**automatically switches to a backup provider:model when your main model fails
2. **Auxiliary task fallback**independent provider resolution for side tasks like vision, compression, and web extraction
1. **[Credential pools](./credential-pools.md)** — rotate across multiple API keys for the *same* provider (tried first)
2. **Primary model fallback**automatically switches to a *different* provider:model when your main model fails
3. **Auxiliary task fallback** — independent provider resolution for side tasks like vision, compression, and web extraction
Both are optional and work independently.
Credential pools handle same-provider rotation (e.g., multiple OpenRouter keys). This page covers cross-provider fallback. Both are optional and work independently.
## Primary Model Fallback