- Integration with Nous Portal via subscription, using device auth flow, standard refresh/access token flow, and short-lived API key rotation

- Optional request dumping for debugging
This commit is contained in:
Robin Fernandes
2026-02-19 23:57:14 +11:00
parent 2c7deb41f6
commit 2d0b9edbf6
5 changed files with 1138 additions and 3 deletions

113
cli.py
View File

@@ -80,6 +80,7 @@ def load_cli_config() -> Dict[str, Any]:
"model": {
"default": "anthropic/claude-opus-4.6",
"base_url": "https://openrouter.ai/api/v1",
"provider": "auto",
},
"terminal": {
"env_type": "local",
@@ -605,6 +606,7 @@ class HermesCLI:
self,
model: str = None,
toolsets: List[str] = None,
provider: str = None,
api_key: str = None,
base_url: str = None,
max_turns: int = 60,
@@ -617,6 +619,7 @@ class HermesCLI:
Args:
model: Model to use (default: from env or claude-sonnet)
toolsets: List of toolsets to enable (default: all)
provider: Inference provider ("auto", "openrouter", "nous")
api_key: API key (default: from environment)
base_url: API base URL (default: OpenRouter)
max_turns: Maximum tool-calling iterations (default: 60)
@@ -637,6 +640,20 @@ class HermesCLI:
# API key: custom endpoint (OPENAI_API_KEY) takes precedence over OpenRouter
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
self.requested_provider = (
provider
or os.getenv("HERMES_INFERENCE_PROVIDER")
or CLI_CONFIG["model"].get("provider")
or "auto"
)
self.provider = self._resolve_inference_provider(
self.requested_provider,
explicit_api_key=api_key,
explicit_base_url=base_url,
)
self._nous_key_expires_at: Optional[str] = None
self._nous_key_source: Optional[str] = None
# Max turns priority: CLI arg > env var > config file (agent.max_turns or root max_turns) > default
if max_turns != 60: # CLI arg was explicitly set
self.max_turns = max_turns
@@ -676,6 +693,85 @@ class HermesCLI:
# History file for persistent input recall across sessions
self._history_file = Path.home() / ".hermes_history"
def _resolve_inference_provider(
self,
requested_provider: str,
*,
explicit_api_key: Optional[str],
explicit_base_url: Optional[str],
) -> str:
normalized = (requested_provider or "auto").strip().lower()
if normalized not in {"auto", "openrouter", "nous"}:
self.console.print(
f"[yellow]Unknown provider '{requested_provider}', falling back to auto.[/]"
)
normalized = "auto"
if normalized != "auto":
return normalized
# Explicit one-off CLI creds should always win.
if explicit_api_key or explicit_base_url:
return "openrouter"
try:
from hermes_cli.login import get_nous_portal_auth_state
nous_state = get_nous_portal_auth_state() or {}
except Exception:
nous_state = {}
if nous_state.get("access_token") or nous_state.get("refresh_token"):
return "nous"
if os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY"):
return "openrouter"
return "openrouter"
def _ensure_nous_runtime_credentials(self) -> bool:
if self.provider != "nous":
return True
from hermes_cli.login import (
format_nous_auth_error,
resolve_nous_runtime_credentials,
)
try:
credentials = resolve_nous_runtime_credentials(
min_key_ttl_seconds=max(
60, int(os.getenv("HERMES_NOUS_MIN_KEY_TTL_SECONDS", "1800"))
),
timeout_seconds=float(os.getenv("HERMES_NOUS_TIMEOUT_SECONDS", "15")),
)
except Exception as exc:
# Keep UX-friendly error mapping in one place (re-auth/subscription hints).
message = format_nous_auth_error(exc) if "format_nous_auth_error" in locals() else str(exc)
self.console.print(f"[bold red]{message}[/]")
return False
api_key = credentials.get("api_key")
base_url = credentials.get("base_url")
if not isinstance(api_key, str) or not api_key:
self.console.print("[bold red]Nous credential resolver returned an empty API key.[/]")
return False
if not isinstance(base_url, str) or not base_url:
self.console.print("[bold red]Nous credential resolver returned an empty base URL.[/]")
return False
credentials_changed = api_key != self.api_key or base_url != self.base_url
self.api_key = api_key
self.base_url = base_url
self._nous_key_expires_at = credentials.get("expires_at")
self._nous_key_source = credentials.get("source")
# AIAgent/OpenAI client holds auth at initialization time, so rebuild if key rotated.
if credentials_changed and self.agent is not None:
self.agent = None
return True
def _init_agent(self) -> bool:
"""
@@ -686,6 +782,9 @@ class HermesCLI:
"""
if self.agent is not None:
return True
if self.provider == "nous" and not self._ensure_nous_runtime_credentials():
return False
try:
self.agent = AIAgent(
@@ -777,11 +876,17 @@ class HermesCLI:
toolsets_info = ""
if self.enabled_toolsets and "all" not in self.enabled_toolsets:
toolsets_info = f" [dim #B8860B]·[/] [#CD7F32]toolsets: {', '.join(self.enabled_toolsets)}[/]"
provider_info = f" [dim #B8860B]·[/] [dim]provider: {self.provider}[/]"
if self.provider == "nous" and self._nous_key_source:
provider_info = (
f"{provider_info} [dim #B8860B]·[/] [dim]key: {self._nous_key_source}[/]"
)
self.console.print(
f" {api_indicator} [#FFBF00]{model_short}[/] "
f"[dim #B8860B]·[/] [bold cyan]{tool_count} tools[/]"
f"{toolsets_info}"
f"{toolsets_info}{provider_info}"
)
def show_help(self):
@@ -1389,6 +1494,9 @@ class HermesCLI:
Returns:
The agent's response, or None on error
"""
if self.provider == "nous" and not self._ensure_nous_runtime_credentials():
return None
# Initialize agent if needed
if not self._init_agent():
return None
@@ -1665,6 +1773,7 @@ def main(
q: str = None,
toolsets: str = None,
model: str = None,
provider: str = None,
api_key: str = None,
base_url: str = None,
max_turns: int = 60,
@@ -1684,6 +1793,7 @@ def main(
q: Shorthand for --query
toolsets: Comma-separated list of toolsets to enable (e.g., "web,terminal")
model: Model to use (default: anthropic/claude-opus-4-20250514)
provider: Inference provider ("auto", "openrouter", "nous")
api_key: API key for authentication
base_url: Base URL for the API
max_turns: Maximum tool-calling iterations (default: 60)
@@ -1754,6 +1864,7 @@ def main(
cli = HermesCLI(
model=model,
toolsets=toolsets_list,
provider=provider,
api_key=api_key,
base_url=base_url,
max_turns=max_turns,

816
hermes_cli/login.py Normal file
View File

@@ -0,0 +1,816 @@
"""
Device authorization login flow and runtime auth helpers for Hermes CLI.
"""
from __future__ import annotations
import json
import os
import time
import webbrowser
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Optional, List
import httpx
import yaml
from hermes_cli.config import get_hermes_home
try:
import fcntl # POSIX file locking (macOS/Linux)
except Exception: # pragma: no cover - Windows fallback
fcntl = None
DEFAULT_PORTAL_BASE_URL = "https://portal.nousresearch.com"
DEFAULT_INFERENCE_BASE_URL = "https://inference-api.nousresearch.com/v1"
DEFAULT_CLIENT_ID = "hermes-cli"
DEFAULT_SCOPE = "inference:mint_agent_key"
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS = 30 * 60
ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS = 1
AUTH_STORE_VERSION = 1
NOUS_PORTAL_AUTH_KEY = "nous_portal"
AUTH_LOCK_TIMEOUT_SECONDS = 15.0
class NousAuthError(RuntimeError):
"""Structured auth error for CLI UX mapping."""
def __init__(
self,
message: str,
*,
code: Optional[str] = None,
relogin_required: bool = False,
) -> None:
super().__init__(message)
self.code = code
self.relogin_required = relogin_required
def format_nous_auth_error(error: Exception) -> str:
"""Map low-level auth failures to concise user-facing guidance."""
if not isinstance(error, NousAuthError):
return str(error)
if error.relogin_required:
return f"{error} Run `hermes login` to re-authenticate."
if error.code == "subscription_required":
return (
"No active paid subscription found on Nous Portal. "
"Please purchase/activate a subscription, then retry."
)
if error.code == "insufficient_credits":
return (
"Subscription credits are exhausted. "
"Top up/renew credits in Nous Portal, then retry."
)
if error.code == "temporarily_unavailable":
return f"{error} Please retry in a few seconds."
return str(error)
def _resolve_portal_base_url(explicit_url: Optional[str]) -> str:
base_url = (
explicit_url
or os.getenv("HERMES_PORTAL_BASE_URL")
or os.getenv("NOUS_PORTAL_BASE_URL")
or DEFAULT_PORTAL_BASE_URL
)
return base_url.rstrip("/")
def _resolve_inference_base_url(explicit_url: Optional[str] = None) -> str:
base_url = explicit_url or os.getenv("NOUS_INFERENCE_BASE_URL") or DEFAULT_INFERENCE_BASE_URL
return base_url.rstrip("/")
def _optional_base_url(value: Any) -> Optional[str]:
if not isinstance(value, str):
return None
cleaned = value.strip().rstrip("/")
return cleaned if cleaned else None
def _auth_file_path() -> Path:
return get_hermes_home() / "config" / "auth.json"
def _auth_lock_path() -> Path:
return _auth_file_path().with_suffix(".lock")
@contextmanager
def _auth_store_lock(timeout_seconds: float = AUTH_LOCK_TIMEOUT_SECONDS):
"""Cross-process lock for auth.json reads+writes and mint/refresh operations."""
lock_path = _auth_lock_path()
lock_path.parent.mkdir(parents=True, exist_ok=True)
with lock_path.open("a+") as lock_file:
if fcntl is None:
yield
return
deadline = time.time() + max(1.0, timeout_seconds)
while True:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() >= deadline:
raise TimeoutError("Timed out waiting for auth store lock")
time.sleep(0.05)
try:
yield
finally:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
def _load_auth_store(auth_file: Path) -> Dict[str, Any]:
if not auth_file.exists():
return {
"version": AUTH_STORE_VERSION,
"systems": {},
}
try:
raw = json.loads(auth_file.read_text())
except Exception:
return {
"version": AUTH_STORE_VERSION,
"systems": {},
}
if isinstance(raw, dict) and isinstance(raw.get("systems"), dict):
return raw
return {
"version": AUTH_STORE_VERSION,
"systems": {},
}
def _save_auth_store(auth_store: Dict[str, Any]) -> Path:
auth_file = _auth_file_path()
auth_file.parent.mkdir(parents=True, exist_ok=True)
auth_store["version"] = AUTH_STORE_VERSION
auth_store["updated_at"] = datetime.now(timezone.utc).isoformat()
auth_file.write_text(json.dumps(auth_store, indent=2) + "\n")
return auth_file
def _load_nous_state(auth_store: Dict[str, Any]) -> Optional[Dict[str, Any]]:
systems = auth_store.get("systems")
if not isinstance(systems, dict):
return None
state = systems.get(NOUS_PORTAL_AUTH_KEY)
return state if isinstance(state, dict) else None
def _save_nous_state(auth_store: Dict[str, Any], payload: Dict[str, Any]) -> None:
systems = auth_store.setdefault("systems", {})
if not isinstance(systems, dict):
auth_store["systems"] = {}
systems = auth_store["systems"]
systems[NOUS_PORTAL_AUTH_KEY] = payload
def get_nous_portal_auth_state() -> Optional[Dict[str, Any]]:
"""Return persisted Nous auth state if present."""
auth_store = _load_auth_store(_auth_file_path())
state = _load_nous_state(auth_store)
if not state:
return None
return dict(state)
def _save_auth_state(payload: Dict[str, Any]) -> Path:
with _auth_store_lock():
auth_store = _load_auth_store(_auth_file_path())
_save_nous_state(auth_store, payload)
return _save_auth_store(auth_store)
def _update_cli_model_config_for_nous(inference_base_url: str) -> Path:
"""
Set CLI defaults to use Nous provider after successful portal login.
Preserves existing model.default when config currently stores model as a string.
"""
config_path = get_hermes_home() / "config.yaml"
config_path.parent.mkdir(parents=True, exist_ok=True)
config: Dict[str, Any] = {}
if config_path.exists():
try:
loaded = yaml.safe_load(config_path.read_text()) or {}
if isinstance(loaded, dict):
config = loaded
except Exception:
config = {}
current_model = config.get("model")
if isinstance(current_model, dict):
model_cfg = dict(current_model)
elif isinstance(current_model, str) and current_model.strip():
model_cfg = {"default": current_model.strip()}
else:
model_cfg = {}
model_cfg["provider"] = "nous"
model_cfg["base_url"] = inference_base_url.rstrip("/")
config["model"] = model_cfg
config_path.write_text(yaml.safe_dump(config, sort_keys=False))
return config_path
def _parse_iso_timestamp(value: Any) -> Optional[float]:
if not isinstance(value, str) or not value:
return None
text = value.strip()
if not text:
return None
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(text)
except Exception:
return None
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.timestamp()
def _is_expiring(expires_at_iso: Any, skew_seconds: int) -> bool:
expires_epoch = _parse_iso_timestamp(expires_at_iso)
if expires_epoch is None:
return True
return expires_epoch <= (time.time() + skew_seconds)
def _coerce_ttl_seconds(expires_in: Any) -> int:
try:
ttl = int(expires_in)
except Exception:
ttl = 0
return max(0, ttl)
def _resolve_verify(
*,
insecure: Optional[bool],
ca_bundle: Optional[str],
auth_state: Optional[Dict[str, Any]],
) -> bool | str:
tls_state = auth_state.get("tls") if isinstance(auth_state, dict) else {}
tls_state = tls_state if isinstance(tls_state, dict) else {}
effective_insecure = (
bool(insecure)
if insecure is not None
else bool(tls_state.get("insecure", False))
)
effective_ca = (
ca_bundle
or tls_state.get("ca_bundle")
or os.getenv("HERMES_CA_BUNDLE")
or os.getenv("SSL_CERT_FILE")
)
if effective_insecure:
return False
if effective_ca:
return str(effective_ca)
return True
def _request_device_code(
client: httpx.Client,
portal_base_url: str,
client_id: str,
scope: Optional[str],
) -> Dict[str, Any]:
response = client.post(
f"{portal_base_url}/api/oauth/device/code",
data={
"client_id": client_id,
**({"scope": scope} if scope else {}),
},
)
response.raise_for_status()
data = response.json()
required_fields = [
"device_code",
"user_code",
"verification_uri",
"verification_uri_complete",
"expires_in",
"interval",
]
missing = [field for field in required_fields if field not in data]
if missing:
raise ValueError(f"Device code response missing fields: {', '.join(missing)}")
return data
def _poll_for_token(
client: httpx.Client,
portal_base_url: str,
client_id: str,
device_code: str,
expires_in: int,
poll_interval: int,
) -> Dict[str, Any]:
deadline = time.time() + max(1, expires_in)
# Cap the client polling cadence to keep post-approval latency low.
# If the server needs slower polling it can respond with slow_down.
current_interval = max(1, min(poll_interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS))
while time.time() < deadline:
response = client.post(
f"{portal_base_url}/api/oauth/token",
data={
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"client_id": client_id,
"device_code": device_code,
},
)
if response.status_code == 200:
payload = response.json()
if "access_token" not in payload:
raise ValueError("Token response did not include access_token")
return payload
try:
error_payload = response.json()
except Exception:
response.raise_for_status()
raise RuntimeError("Token endpoint returned a non-JSON error response")
error_code = error_payload.get("error", "")
if error_code == "authorization_pending":
time.sleep(current_interval)
continue
if error_code == "slow_down":
current_interval = min(current_interval + 1, 30)
time.sleep(current_interval)
continue
description = error_payload.get("error_description") or "Unknown authentication error"
raise RuntimeError(f"{error_code}: {description}")
raise TimeoutError("Timed out waiting for device authorization")
def _refresh_access_token(
*,
client: httpx.Client,
portal_base_url: str,
client_id: str,
refresh_token: str,
) -> Dict[str, Any]:
response = client.post(
f"{portal_base_url}/api/oauth/token",
data={
"grant_type": "refresh_token",
"client_id": client_id,
"refresh_token": refresh_token,
},
)
if response.status_code == 200:
payload = response.json()
if "access_token" not in payload:
raise NousAuthError("Refresh response missing access_token", code="invalid_token", relogin_required=True)
return payload
try:
error_payload = response.json()
except Exception as exc:
raise NousAuthError("Refresh token exchange failed", relogin_required=True) from exc
code = str(error_payload.get("error", "invalid_grant"))
description = str(error_payload.get("error_description") or "Refresh token exchange failed")
relogin = code in {"invalid_grant", "invalid_token"}
raise NousAuthError(description, code=code, relogin_required=relogin)
def _mint_agent_key(
*,
client: httpx.Client,
portal_base_url: str,
access_token: str,
min_ttl_seconds: int,
) -> Dict[str, Any]:
response = client.post(
f"{portal_base_url}/api/oauth/agent-key",
headers={"Authorization": f"Bearer {access_token}"},
json={"min_ttl_seconds": max(60, int(min_ttl_seconds))},
)
if response.status_code == 200:
payload = response.json()
if "api_key" not in payload:
raise NousAuthError("Mint response missing api_key", code="server_error")
return payload
try:
error_payload = response.json()
except Exception as exc:
raise NousAuthError("Agent key mint request failed", code="server_error") from exc
code = str(error_payload.get("error", "server_error"))
description = str(error_payload.get("error_description") or "Agent key mint request failed")
relogin = code in {"invalid_token", "invalid_grant"}
raise NousAuthError(description, code=code, relogin_required=relogin)
def _fetch_available_models(
*,
client: httpx.Client,
inference_base_url: str,
api_key: str,
) -> List[str]:
response = client.get(
f"{inference_base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code != 200:
description = f"/models request failed with status {response.status_code}"
try:
error_payload = response.json()
description = str(error_payload.get("error_description") or error_payload.get("error") or description)
except Exception:
pass
raise NousAuthError(description, code="models_fetch_failed")
payload = response.json()
data = payload.get("data")
if not isinstance(data, list):
return []
model_ids: List[str] = []
for item in data:
if not isinstance(item, dict):
continue
model_id = item.get("id")
if isinstance(model_id, str) and model_id.strip():
model_ids.append(model_id.strip())
# Keep stable order from API while removing duplicates.
return list(dict.fromkeys(model_ids))
def _agent_key_is_usable(state: Dict[str, Any], min_ttl_seconds: int) -> bool:
key = state.get("agent_key")
if not isinstance(key, str) or not key.strip():
return False
return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds)
def resolve_nous_runtime_credentials(
*,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
force_mint: bool = False,
) -> Dict[str, Any]:
"""
Resolve Nous inference credentials for runtime use.
Ensures:
- access_token is valid (refreshes if needed)
- short-lived inference key is present and has minimum TTL (mints/reuses)
- concurrent processes coordinate through auth store lock
"""
min_key_ttl_seconds = max(60, int(min_key_ttl_seconds))
with _auth_store_lock():
auth_file = _auth_file_path()
auth_store = _load_auth_store(auth_file)
state = _load_nous_state(auth_store)
if not state:
raise NousAuthError("Hermes is not logged into Nous Portal.", relogin_required=True)
portal_base_url = _resolve_portal_base_url(state.get("portal_base_url"))
inference_base_url = _resolve_inference_base_url(state.get("inference_base_url"))
client_id = str(state.get("client_id") or DEFAULT_CLIENT_ID)
verify = _resolve_verify(insecure=insecure, ca_bundle=ca_bundle, auth_state=state)
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
access_token = state.get("access_token")
refresh_token = state.get("refresh_token")
if not isinstance(access_token, str) or not access_token:
raise NousAuthError("No access token found for Nous Portal login.", relogin_required=True)
if _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
if not isinstance(refresh_token, str) or not refresh_token:
raise NousAuthError("Session expired and no refresh token is available.", relogin_required=True)
refreshed = _refresh_access_token(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
refresh_token=refresh_token,
)
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or refresh_token
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_inference_url = _optional_base_url(refreshed.get("inference_base_url"))
if refreshed_inference_url:
inference_base_url = refreshed_inference_url
state["obtained_at"] = now.isoformat()
state["expires_in"] = access_ttl
state["expires_at"] = datetime.fromtimestamp(now.timestamp() + access_ttl, tz=timezone.utc).isoformat()
access_token = state["access_token"]
used_cached_key = False
mint_payload: Optional[Dict[str, Any]] = None
if not force_mint and _agent_key_is_usable(state, min_key_ttl_seconds):
used_cached_key = True
else:
try:
mint_payload = _mint_agent_key(
client=client,
portal_base_url=portal_base_url,
access_token=access_token,
min_ttl_seconds=min_key_ttl_seconds,
)
except NousAuthError as exc:
# One retry path: token may be stale on server side despite local expiry check.
if exc.code in {"invalid_token", "invalid_grant"} and isinstance(refresh_token, str) and refresh_token:
refreshed = _refresh_access_token(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
refresh_token=refresh_token,
)
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or refresh_token
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_inference_url = _optional_base_url(
refreshed.get("inference_base_url")
)
if refreshed_inference_url:
inference_base_url = refreshed_inference_url
state["obtained_at"] = now.isoformat()
state["expires_in"] = access_ttl
state["expires_at"] = datetime.fromtimestamp(now.timestamp() + access_ttl, tz=timezone.utc).isoformat()
access_token = state["access_token"]
mint_payload = _mint_agent_key(
client=client,
portal_base_url=portal_base_url,
access_token=access_token,
min_ttl_seconds=min_key_ttl_seconds,
)
else:
raise
if mint_payload is not None:
now = datetime.now(timezone.utc)
state["agent_key"] = mint_payload.get("api_key")
state["agent_key_id"] = mint_payload.get("key_id")
state["agent_key_expires_at"] = mint_payload.get("expires_at")
state["agent_key_expires_in"] = mint_payload.get("expires_in")
state["agent_key_reused"] = bool(mint_payload.get("reused", False))
state["agent_key_obtained_at"] = now.isoformat()
minted_inference_url = _optional_base_url(mint_payload.get("inference_base_url"))
if minted_inference_url:
inference_base_url = minted_inference_url
# Persist TLS and routing metadata for future non-interactive refresh/mint calls
state["portal_base_url"] = portal_base_url
state["inference_base_url"] = inference_base_url
state["client_id"] = client_id
state["tls"] = {
"insecure": verify is False,
"ca_bundle": verify if isinstance(verify, str) else None,
}
_save_nous_state(auth_store, state)
_save_auth_store(auth_store)
api_key = state.get("agent_key")
if not isinstance(api_key, str) or not api_key:
raise NousAuthError("Failed to resolve a Nous inference API key", code="server_error")
expires_at = state.get("agent_key_expires_at")
expires_epoch = _parse_iso_timestamp(expires_at)
expires_in = (
max(0, int(expires_epoch - time.time()))
if expires_epoch is not None
else _coerce_ttl_seconds(state.get("agent_key_expires_in"))
)
return {
"provider": "nous",
"base_url": inference_base_url,
"api_key": api_key,
"key_id": state.get("agent_key_id"),
"expires_at": expires_at,
"expires_in": expires_in,
"source": "cache" if used_cached_key else "portal",
}
def get_nous_auth_status() -> Dict[str, Any]:
"""Small status snapshot for `hermes status` output."""
state = get_nous_portal_auth_state()
if not state:
return {
"logged_in": False,
"portal_base_url": None,
"access_expires_at": None,
"agent_key_expires_at": None,
"has_refresh_token": False,
}
return {
"logged_in": bool(state.get("access_token")),
"portal_base_url": state.get("portal_base_url"),
"inference_base_url": state.get("inference_base_url"),
"access_expires_at": state.get("expires_at"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
"has_refresh_token": bool(state.get("refresh_token")),
}
def login_command(args) -> None:
portal_base_url = _resolve_portal_base_url(getattr(args, "portal_url", None))
requested_inference_base_url = _resolve_inference_base_url(getattr(args, "inference_url", None))
client_id = getattr(args, "client_id", None) or DEFAULT_CLIENT_ID
scope = getattr(args, "scope", None) or DEFAULT_SCOPE
open_browser = not getattr(args, "no_browser", False)
timeout_seconds = getattr(args, "timeout", None)
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
insecure = bool(getattr(args, "insecure", False))
ca_bundle = getattr(args, "ca_bundle", None) or os.getenv("HERMES_CA_BUNDLE") or os.getenv("SSL_CERT_FILE")
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
print("Starting Hermes login via device authorization flow...")
print(f"Portal: {portal_base_url}")
if insecure:
print("TLS verification: disabled (--insecure)")
elif ca_bundle:
print(f"TLS verification: custom CA bundle ({ca_bundle})")
try:
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
device_data = _request_device_code(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
scope=scope,
)
verification_uri_complete = str(device_data["verification_uri_complete"])
user_code = str(device_data["user_code"])
expires_in = int(device_data["expires_in"])
interval = int(device_data["interval"])
print()
print("To continue:")
print(f"1. Open: {verification_uri_complete}")
print(f"2. If prompted, enter code: {user_code}")
if open_browser:
opened = webbrowser.open(verification_uri_complete)
if opened:
print("Opened browser for verification.")
else:
print("Could not automatically open browser; use the URL above.")
effective_poll_interval = max(
1,
min(interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS),
)
print(f"Waiting for approval (polling every {effective_poll_interval}s)...")
token_data = _poll_for_token(
client=client,
portal_base_url=portal_base_url,
client_id=client_id,
device_code=str(device_data["device_code"]),
expires_in=expires_in,
poll_interval=interval,
)
now = datetime.now(timezone.utc)
token_expires_in = _coerce_ttl_seconds(token_data.get("expires_in", 0))
expires_at = now.timestamp() + token_expires_in
inference_base_url = (
_optional_base_url(token_data.get("inference_base_url"))
or requested_inference_base_url
)
if inference_base_url != requested_inference_base_url:
print(f"Using portal-provided inference URL: {inference_base_url}")
auth_state = {
"portal_base_url": portal_base_url,
"inference_base_url": inference_base_url,
"client_id": client_id,
"scope": token_data.get("scope") or scope,
"token_type": token_data.get("token_type", "Bearer"),
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"obtained_at": now.isoformat(),
"expires_at": datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(),
"expires_in": token_expires_in,
"tls": {
"insecure": verify is False,
"ca_bundle": verify if isinstance(verify, str) else None,
},
# Clear cached key material from older sessions on fresh login
"agent_key": None,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
}
saved_to = _save_auth_state(auth_state)
config_path = _update_cli_model_config_for_nous(inference_base_url)
print("Login successful.")
print(f"Saved auth state to: {saved_to} (systems.{NOUS_PORTAL_AUTH_KEY})")
print(
"Updated CLI config to prefer Nous provider: "
f"{config_path} (model.provider=nous, model.base_url={inference_base_url})"
)
try:
runtime_creds = resolve_nous_runtime_credentials(
min_key_ttl_seconds=5 * 60,
timeout_seconds=timeout_seconds if timeout_seconds else 15.0,
insecure=insecure,
ca_bundle=ca_bundle,
)
runtime_key = runtime_creds.get("api_key")
runtime_base_url = runtime_creds.get("base_url") or inference_base_url
if not isinstance(runtime_key, str) or not runtime_key:
raise NousAuthError("No runtime API key available to fetch models", code="invalid_token")
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as model_client:
model_ids = _fetch_available_models(
client=model_client,
inference_base_url=runtime_base_url,
api_key=runtime_key,
)
print()
if model_ids:
print(f"Available models ({len(model_ids)}):")
for model_id in model_ids:
print(f" - {model_id}")
else:
print("No models were returned by the inference API.")
except Exception as exc:
message = format_nous_auth_error(exc) if isinstance(exc, NousAuthError) else str(exc)
print()
print(
"Login succeeded, but could not fetch available models. "
f"Reason: {message}"
)
except KeyboardInterrupt:
print("Login cancelled.")
raise SystemExit(130)
except Exception as exc:
print(f"Login failed: {exc}")
raise SystemExit(1)

View File

@@ -12,6 +12,7 @@ Usage:
hermes gateway install # Install gateway service
hermes gateway uninstall # Uninstall gateway service
hermes setup # Interactive setup wizard
hermes login # Authenticate with Nous Portal
hermes status # Show status of all components
hermes cron # Manage cron jobs
hermes cron list # List cron jobs
@@ -48,6 +49,7 @@ def cmd_chat(args):
# Build kwargs from args
kwargs = {
"model": args.model,
"provider": getattr(args, "provider", None),
"toolsets": args.toolsets,
"verbose": args.verbose,
"query": args.query,
@@ -69,6 +71,11 @@ def cmd_setup(args):
from hermes_cli.setup import run_setup_wizard
run_setup_wizard(args)
def cmd_login(args):
"""Authenticate Hermes CLI with Nous Portal."""
from hermes_cli.login import login_command
login_command(args)
def cmd_status(args):
"""Show status of all components."""
@@ -250,6 +257,7 @@ Examples:
hermes Start interactive chat
hermes chat -q "Hello" Single query mode
hermes setup Run setup wizard
hermes login Authenticate with Nous Portal
hermes config View configuration
hermes config edit Edit config in $EDITOR
hermes config set model gpt-4 Set a config value
@@ -286,6 +294,12 @@ For more help on a command:
"-m", "--model",
help="Model to use (e.g., anthropic/claude-sonnet-4)"
)
chat_parser.add_argument(
"--provider",
choices=["auto", "openrouter", "nous"],
default=None,
help="Inference provider (default: auto)"
)
chat_parser.add_argument(
"-t", "--toolsets",
help="Comma-separated toolsets to enable"
@@ -352,7 +366,55 @@ For more help on a command:
help="Reset configuration to defaults"
)
setup_parser.set_defaults(func=cmd_setup)
# =========================================================================
# login command
# =========================================================================
login_parser = subparsers.add_parser(
"login",
help="Authenticate with Nous Portal",
description="Run OAuth device authorization flow for Hermes CLI"
)
login_parser.add_argument(
"--portal-url",
help="Portal base URL (default: HERMES_PORTAL_BASE_URL or NOUS_PORTAL_BASE_URL env var, else production portal)"
)
login_parser.add_argument(
"--inference-url",
help="Inference API base URL (default: NOUS_INFERENCE_BASE_URL env var, else production inference API)"
)
login_parser.add_argument(
"--client-id",
default="hermes-cli",
help="OAuth client id to use (default: hermes-cli)"
)
login_parser.add_argument(
"--scope",
default="inference:mint_agent_key",
help="OAuth scope to request (default: inference:mint_agent_key)"
)
login_parser.add_argument(
"--no-browser",
action="store_true",
help="Do not attempt to open the browser automatically"
)
login_parser.add_argument(
"--timeout",
type=float,
default=15.0,
help="HTTP request timeout in seconds (default: 15)"
)
login_parser.add_argument(
"--ca-bundle",
help="Path to CA bundle PEM file for TLS verification (default: HERMES_CA_BUNDLE or SSL_CERT_FILE)"
)
login_parser.add_argument(
"--insecure",
action="store_true",
help="Disable TLS verification (testing only)"
)
login_parser.set_defaults(func=cmd_login)
# =========================================================================
# status command
# =========================================================================
@@ -528,6 +590,7 @@ For more help on a command:
# No command = run chat
args.query = None
args.model = None
args.provider = None
args.toolsets = None
args.verbose = False
cmd_chat(args)

View File

@@ -7,6 +7,7 @@ Shows the status of all Hermes Agent components.
import os
import sys
import subprocess
from datetime import datetime, timezone
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
@@ -40,6 +41,28 @@ def redact_key(key: str) -> str:
return key[:4] + "..." + key[-4:]
def format_iso_timestamp(value: str) -> str:
"""Format ISO timestamps for status output."""
if not value:
return "(unknown)"
text = value.strip()
if not text:
return "(unknown)"
if text.endswith("Z"):
text = text[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
except Exception:
return value
return parsed.astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
def show_status(args):
"""Show status of all Hermes Agent components."""
show_all = getattr(args, 'all', False)
@@ -84,6 +107,28 @@ def show_status(args):
has_key = bool(value)
display = redact_key(value) if not show_all else value
print(f" {name:<12} {check_mark(has_key)} {display}")
try:
from hermes_cli.login import get_nous_auth_status
nous_status = get_nous_auth_status()
except Exception:
nous_status = {}
nous_logged_in = bool(nous_status.get("logged_in"))
print(
f" {'Nous Portal':<12} {check_mark(nous_logged_in)} "
f"{'logged in' if nous_logged_in else 'not logged in'}"
)
if nous_logged_in:
portal_base_url = nous_status.get("portal_base_url") or "(unknown)"
access_exp = format_iso_timestamp(nous_status.get("access_expires_at"))
key_exp = format_iso_timestamp(nous_status.get("agent_key_expires_at"))
refresh_label = "yes" if nous_status.get("has_refresh_token") else "no"
print(f" Portal URL: {portal_base_url}")
print(f" Access exp: {access_exp}")
print(f" Key exp: {key_exp}")
print(f" Refresh: {refresh_label}")
# =========================================================================
# Terminal Configuration

View File

@@ -1858,6 +1858,95 @@ class AIAgent:
# Silent fail - don't interrupt the agent for debug logging
if self.verbose_logging:
logging.warning(f"Failed to log API payload: {e}")
def _mask_api_key_for_logs(self, key: Optional[str]) -> Optional[str]:
if not key:
return None
if len(key) <= 12:
return "***"
return f"{key[:8]}...{key[-4:]}"
def _dump_api_request_debug(
self,
api_kwargs: Dict[str, Any],
*,
reason: str,
error: Optional[Exception] = None,
) -> Optional[Path]:
"""
Dump a debug-friendly HTTP request record for chat.completions.create().
The body is captured from `api_kwargs` exactly as passed to the OpenAI SDK
(excluding transport-only keys like timeout). This is intended for debugging
provider-side 4xx failures where retries are not useful.
"""
try:
body = copy.deepcopy(api_kwargs)
body.pop("timeout", None)
body = {k: v for k, v in body.items() if v is not None}
api_key = None
try:
api_key = getattr(self.client, "api_key", None)
except Exception:
api_key = None
dump_payload: Dict[str, Any] = {
"timestamp": datetime.now().isoformat(),
"session_id": self.session_id,
"reason": reason,
"request": {
"method": "POST",
"url": f"{self.base_url.rstrip('/')}/chat/completions",
"headers": {
"Authorization": f"Bearer {self._mask_api_key_for_logs(api_key)}",
"Content-Type": "application/json",
},
"body": body,
},
}
if error is not None:
error_info: Dict[str, Any] = {
"type": type(error).__name__,
"message": str(error),
}
for attr_name in ("status_code", "request_id", "code", "param", "type"):
attr_value = getattr(error, attr_name, None)
if attr_value is not None:
error_info[attr_name] = attr_value
# OpenAI exceptions often include structured body/response data.
body_attr = getattr(error, "body", None)
if body_attr is not None:
error_info["body"] = body_attr
response_obj = getattr(error, "response", None)
if response_obj is not None:
try:
error_info["response_status"] = getattr(response_obj, "status_code", None)
error_info["response_text"] = response_obj.text
except Exception:
pass
dump_payload["error"] = error_info
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
dump_file = self.logs_dir / f"request_dump_{self.session_id}_{timestamp}.json"
dump_file.write_text(json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8")
print(f"{self.log_prefix}🧾 Request debug dump written to: {dump_file}")
# Optional inline dump for immediate terminal inspection.
if os.getenv("HERMES_DUMP_REQUEST_STDOUT", "").strip().lower() in {"1", "true", "yes", "on"}:
print(json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str))
return dump_file
except Exception as dump_error:
if self.verbose_logging:
logging.warning(f"Failed to dump API request debug payload: {dump_error}")
return None
def _save_session_log(self, messages: List[Dict[str, Any]] = None):
"""
@@ -2163,6 +2252,10 @@ class AIAgent:
if extra_body:
api_kwargs["extra_body"] = extra_body
# Optional: dump every outgoing request payload before send.
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
response = self.client.chat.completions.create(**api_kwargs)
@@ -2362,7 +2455,9 @@ class AIAgent:
# Check for non-retryable client errors (4xx HTTP status codes).
# These indicate a problem with the request itself (bad model ID,
# invalid API key, forbidden, etc.) and will never succeed on retry.
is_client_error = any(phrase in error_msg for phrase in [
status_code = getattr(api_error, "status_code", None)
is_client_status_error = isinstance(status_code, int) and 400 <= status_code < 500
is_client_error = is_client_status_error or any(phrase in error_msg for phrase in [
'error code: 400', 'error code: 401', 'error code: 403',
'error code: 404', 'error code: 422',
'is not a valid model', 'invalid model', 'model not found',
@@ -2371,6 +2466,11 @@ class AIAgent:
])
if is_client_error:
self._dump_api_request_debug(
api_kwargs,
reason="non_retryable_client_error",
error=api_error,
)
print(f"{self.log_prefix}❌ Non-retryable client error detected. Aborting immediately.")
print(f"{self.log_prefix} 💡 This type of error won't be fixed by retrying.")
logging.error(f"{self.log_prefix}Non-retryable client error: {api_error}")