Compare commits

..

21 Commits

Author SHA1 Message Date
teknium1
8cd4a96686 fix(browser): race condition in session creation can orphan cloud sessions
Two concurrent threads (e.g. parallel subagents) could both pass the
'task_id in _active_sessions' check, both create cloud sessions via
network calls, and then one would overwrite the other — leaking the
first cloud session.

Add double-check after the lock is re-acquired: if another thread
already created a session while we were doing the network call, use
the existing one instead of orphaning it.
2026-03-17 04:09:16 -07:00
Teknium
4433b83378 feat(web): add Parallel as alternative web search/extract backend (#1696)
* feat(web): add Parallel as alternative web search/extract backend

Adds Parallel (parallel.ai) as a drop-in alternative to Firecrawl for
web_search and web_extract tools using the official parallel-web SDK.

- Backend selection via WEB_SEARCH_BACKEND env var (auto/parallel/firecrawl)
- Auto mode prefers Firecrawl when both keys present; Parallel when sole backend
- web_crawl remains Firecrawl-only with clear error when unavailable
- Lazy SDK imports, interrupt support, singleton clients
- 16 new unit tests for backend selection and client config

Co-authored-by: s-jag <s-jag@users.noreply.github.com>

* fix: add PARALLEL_API_KEY to config registry and fix web_crawl policy tests

Follow-up for Parallel backend integration:
- Add PARALLEL_API_KEY to OPTIONAL_ENV_VARS (hermes doctor, env blocklist)
- Add to set_config_value api_keys list (hermes config set)
- Add to doctor keys display
- Fix 2 web_crawl policy tests that didn't set FIRECRAWL_API_KEY
  (needed now that web_crawl has a Firecrawl availability guard)

* refactor: explicit backend selection via hermes tools, not auto-detect

Replace the auto-detect backend selection with explicit user choice:
- hermes tools saves WEB_SEARCH_BACKEND to .env when user picks a provider
- _get_backend() reads the explicit choice first
- Fallback only for manual/legacy config (uses whichever key is present)
- _is_provider_active() shows [active] for the selected web backend
- Updated tests, docs, and .env.example to remove 'auto' mode language

* refactor: use config.yaml for web backend, not env var

Match the TTS/browser pattern — web.backend is stored in config.yaml
(set by hermes tools), not as a WEB_SEARCH_BACKEND env var.

- _load_web_config() reads web: section from config.yaml
- _get_backend() reads web.backend from config, falls back to key detection
- _configure_provider() saves to config dict (saved to config.yaml)
- _is_provider_active() reads from config dict
- Removed WEB_SEARCH_BACKEND from .env.example, set_config_value, docs
- Updated all tests to mock _load_web_config instead of env vars

---------

Co-authored-by: s-jag <s-jag@users.noreply.github.com>
2026-03-17 04:02:02 -07:00
Teknium
6405d389aa test: align Hermes setup and full-suite expectations (#1710)
Salvaged from PR #1708 by @kartikkabadi. Cherry-picked with authorship preserved.

Fixes pre-existing test failures from setup TTS prompt flow changes and environment-sensitive assumptions.

Co-authored-by: Kartik <user2@RentKars-MacBook-Air.local>
2026-03-17 04:01:37 -07:00
Teknium
b16186a32a feat(telegram): auto-detect HTML tags and use parse_mode=HTML in send_message (#1709)
* feat: interactive MCP tool configuration in hermes tools

Add the ability to selectively enable/disable individual MCP server
tools through the interactive 'hermes tools' TUI.

Changes:
- tools/mcp_tool.py: Add probe_mcp_server_tools() — lightweight function
  that temporarily connects to configured MCP servers, discovers their
  tools (names + descriptions), and disconnects. No registry side effects.

- hermes_cli/tools_config.py: Add 'Configure MCP tools' option to the
  interactive menu. When selected:
  1. Probes all enabled MCP servers for their available tools
  2. Shows a per-server curses checklist with tool descriptions
  3. Pre-selects tools based on existing include/exclude config
  4. Writes changes back as tools.exclude entries in config.yaml
  5. Reports which servers failed to connect

The existing CLI commands (hermes tools enable/disable server:tool)
continue to work unchanged. This adds the interactive TUI counterpart
so users can browse and toggle MCP tools visually.

Tests: 22 new tests covering probe function edge cases and interactive
flow (pre-selection, exclude/include modes, description truncation,
multi-server handling, error paths).

* feat(telegram): auto-detect HTML tags and use parse_mode=HTML in send_message

When _send_telegram detects HTML tags in the message body, it now sends
with parse_mode='HTML' instead of converting to MarkdownV2. This allows
cron jobs and agents to send rich HTML-formatted Telegram messages with
bold, italic, code blocks, etc. that render correctly.

Detection uses the same regex from PR #1568 by @ashaney:
  re.search(r'<[a-zA-Z/][^>]*>', message)

Plain-text and markdown messages continue through the existing
MarkdownV2 pipeline. The HTML fallback path also catches HTML parse
errors and falls back to plain text, matching the existing MarkdownV2
error handling.

Inspired by: github.com/ashaney — PR #1568
2026-03-17 03:56:06 -07:00
Teknium
abdb4660d4 Merge pull request #1705 from NousResearch/fix/dingtalk-requirements-check
fix(dingtalk): requirements check passes with only one credential set
2026-03-17 03:53:51 -07:00
Teknium
ed3bcae8bd Merge pull request #1704 from NousResearch/fix/hermes-state-thread-locks
fix(state): add missing thread locks to 4 SessionDB methods
2026-03-17 03:53:48 -07:00
Teknium
75c5136e5a Merge pull request #1703 from NousResearch/fix/anthropic-adapter-merge-content-loss
fix(anthropic): consecutive assistant message merge drops content on mixed types
2026-03-17 03:53:45 -07:00
Teknium
1781c05adb Merge pull request #1701 from NousResearch/fix/gateway-yaml-pii-redaction
fix(gateway): PII redaction config never read — missing yaml import
2026-03-17 03:53:43 -07:00
Teknium
d87655afff fix(gateway): persist watcher metadata in checkpoint for crash recovery (#1706)
Salvaged from PR #1573 by @eren-karakus0. Cherry-picked with authorship preserved.

Fixes #1143 — background process notifications resume after gateway restart.

Co-authored-by: Muhammet Eren Karakuş <erenkar950@gmail.com>
2026-03-17 03:52:15 -07:00
teknium1
a9da944a5d fix(dingtalk): requirements check passes with only one credential set
check_dingtalk_requirements() used 'and' to check for missing env vars:
  if not CLIENT_ID and not CLIENT_SECRET: return False

This only returns False when BOTH are missing. If only one is set
(e.g. CLIENT_ID without CLIENT_SECRET), the check passes and
connect() fails later with a cryptic error.

Fix: Change 'and' to 'or' so it returns False when EITHER is missing.
2026-03-17 03:50:45 -07:00
teknium1
efa778a0ef fix(state): add missing thread locks to 4 SessionDB methods
search_sessions(), clear_messages(), delete_session(), and
prune_sessions() all accessed self._conn without acquiring self._lock.
Every other method in the class uses the lock. In multi-threaded
contexts (gateway serving concurrent platform messages), these
unprotected methods can cause sqlite3.ProgrammingError from concurrent
cursor operations on the same connection.
2026-03-17 03:50:06 -07:00
teknium1
8b411b234d fix(anthropic): merge consecutive assistant messages with mixed content types
When two consecutive assistant messages had mixed content types (one
string, one list), the merge logic just replaced the earlier message
entirely with the later one (fixed[-1] = m), silently dropping the
earlier message's content.

Apply the same normalization pattern used in the tool_use merge path
(lines 952-956): convert both to list format before concatenating.
This preserves all content from both messages.
2026-03-17 03:48:55 -07:00
Teknium
ce7418e274 feat: interactive MCP tool configuration in hermes tools (#1694)
Add the ability to selectively enable/disable individual MCP server
tools through the interactive 'hermes tools' TUI.

Changes:
- tools/mcp_tool.py: Add probe_mcp_server_tools() — lightweight function
  that temporarily connects to configured MCP servers, discovers their
  tools (names + descriptions), and disconnects. No registry side effects.

- hermes_cli/tools_config.py: Add 'Configure MCP tools' option to the
  interactive menu. When selected:
  1. Probes all enabled MCP servers for their available tools
  2. Shows a per-server curses checklist with tool descriptions
  3. Pre-selects tools based on existing include/exclude config
  4. Writes changes back as tools.exclude entries in config.yaml
  5. Reports which servers failed to connect

The existing CLI commands (hermes tools enable/disable server:tool)
continue to work unchanged. This adds the interactive TUI counterpart
so users can browse and toggle MCP tools visually.

Tests: 22 new tests covering probe function edge cases and interactive
flow (pre-selection, exclude/include modes, description truncation,
multi-server handling, error paths).
2026-03-17 03:48:44 -07:00
teknium1
7c9beb5829 fix(gateway): add missing yaml import for PII redaction config read
The privacy.redact_pii config reader on line 1546 used bare 'yaml'
which is not in scope — yaml is imported as '_yaml' at module level
(line 93) and as '_y' in other methods. The NameError was silently
caught by the try/except, so PII redaction never activated even when
configured.

Add a local 'import yaml as _pii_yaml' consistent with the pattern
used elsewhere in the file.
2026-03-17 03:48:15 -07:00
Teknium
56e0c90445 Merge pull request #1700 from NousResearch/fix/redacting-formatter-import
fix(core): RedactingFormatter NameError when verbose_logging=True
2026-03-17 03:46:49 -07:00
Teknium
490d37bb80 Merge pull request #1699 from NousResearch/fix/nous-model-fetch-kwargs
fix(cli): fetch_nous_models called with positional args — always TypeError
2026-03-17 03:46:43 -07:00
Teknium
ea238721f0 Merge pull request #1697 from NousResearch/fix/gateway-skill-command-nameref
fix(gateway): NameError on skill slash commands — wrong variable reference
2026-03-17 03:46:08 -07:00
Teknium
d417ba2a48 feat: add route-aware pricing estimates (#1695)
Salvaged from PR #1563 by @kshitijk4poor. Cherry-picked with authorship preserved.

- Route-aware pricing architecture replacing static MODEL_PRICING + heuristics
- Canonical usage normalization (Anthropic/OpenAI/Codex API shapes)
- Cache-aware billing (separate cache_read/cache_write rates)
- Cost status tracking (estimated/included/unknown/actual)
- OpenRouter live pricing via models API
- Schema migration v4→v5 with billing metadata columns
- Removed speculative forward-looking entries
- Removed cost display from CLI status bar
- Threaded OpenRouter metadata pre-warm

Co-authored-by: kshitij <82637225+kshitijk4poor@users.noreply.github.com>
2026-03-17 03:44:44 -07:00
teknium1
c713d01e72 fix(core): move RedactingFormatter import before conditional block
RedactingFormatter was imported inside 'if not has_errors_log_handler:'
(line 461) but also used unconditionally in the verbose_logging block
(line 479). When the error log handler already exists (e.g. second
AIAgent in the same process) AND verbose_logging=True, the import was
skipped and line 479 raised NameError.

Fix: Move the import one level up so it's always available regardless
of whether the error log handler already exists.
2026-03-17 03:43:21 -07:00
teknium1
f95c6a221b fix(cli): use keyword args for fetch_nous_models (always TypeError)
fetch_nous_models() uses keyword-only parameters (the * separator in
its signature), but models.py called it with positional args and in
the wrong order (api_key first, base_url second). This always raised
TypeError, silently caught by except Exception: pass.

Result: Nous provider model list was completely broken — /model
autocomplete and provider_model_ids('nous') always fell back to the
static model catalog instead of fetching live models.
2026-03-17 03:42:46 -07:00
Teknium
d9b9987ad3 docs: comprehensive documentation update for recent features
New documentation:
- DingTalk messaging platform setup guide (dingtalk.md)

Updated existing docs:
- quickstart.md: add Alibaba Cloud, Kilo Code, Vercel AI Gateway to provider table
- configuration.md: add Alibaba Cloud provider, website blocklist config,
  light/dark theme mode, smart approvals (ask/smart/off)
- environment-variables.md: add Mattermost, Matrix, DingTalk, Browser Use,
  DashScope env vars
- browser.md: add Browser Use cloud provider, /browser connect CDP mode,
  multi-provider architecture, fix limitation section contradiction
- slash-commands.md: add /tools enable/disable/list, /browser connect/disconnect/status
- messaging/index.md: add DingTalk, Mattermost, Matrix to architecture diagram,
  platform toolset table, security allowlists, and Next Steps links
- security.md: add website access policy (blocklist) documentation
- sidebars.ts: add Mattermost, Matrix, DingTalk to Messaging Gateway sidebar
2026-03-17 03:42:02 -07:00
55 changed files with 3724 additions and 670 deletions

View File

@@ -65,10 +65,15 @@ OPENCODE_GO_API_KEY=
# TOOL API KEYS
# =============================================================================
# Parallel API Key - AI-native web search and extract
# Get at: https://parallel.ai
PARALLEL_API_KEY=
# Firecrawl API Key - Web search, extract, and crawl
# Get at: https://firecrawl.dev/
FIRECRAWL_API_KEY=
# FAL.ai API Key - Image generation
# Get at: https://fal.ai/
FAL_KEY=

View File

@@ -44,7 +44,7 @@ hermes-agent/
│ ├── terminal_tool.py # Terminal orchestration
│ ├── process_registry.py # Background process management
│ ├── file_tools.py # File read/write/search/patch
│ ├── web_tools.py # Firecrawl search/extract
│ ├── web_tools.py # Web search/extract (Parallel + Firecrawl)
│ ├── browser_tool.py # Browserbase browser automation
│ ├── code_execution_tool.py # execute_code sandbox
│ ├── delegate_tool.py # Subagent delegation

View File

@@ -147,7 +147,7 @@ hermes-agent/
│ ├── approval.py # Dangerous command detection + per-session approval
│ ├── terminal_tool.py # Terminal orchestration (sudo, env lifecycle, backends)
│ ├── file_operations.py # read_file, write_file, search, patch, etc.
│ ├── web_tools.py # web_search, web_extract (Firecrawl + Gemini summarization)
│ ├── web_tools.py # web_search, web_extract (Parallel/Firecrawl + Gemini summarization)
│ ├── vision_tools.py # Image analysis via multimodal models
│ ├── delegate_tool.py # Subagent spawning and parallel task execution
│ ├── code_execution_tool.py # Sandboxed Python with RPC tool access

View File

@@ -963,8 +963,12 @@ def convert_messages_to_anthropic(
elif isinstance(prev_blocks, str) and isinstance(curr_blocks, str):
fixed[-1]["content"] = prev_blocks + "\n" + curr_blocks
else:
# Keep the later message
fixed[-1] = m
# Mixed types — normalize both to list and merge
if isinstance(prev_blocks, str):
prev_blocks = [{"type": "text", "text": prev_blocks}]
if isinstance(curr_blocks, str):
curr_blocks = [{"type": "text", "text": curr_blocks}]
fixed[-1]["content"] = prev_blocks + curr_blocks
else:
fixed.append(m)
result = fixed

View File

@@ -22,14 +22,21 @@ from collections import Counter, defaultdict
from datetime import datetime
from typing import Any, Dict, List
from agent.usage_pricing import DEFAULT_PRICING, estimate_cost_usd, format_duration_compact, get_pricing, has_known_pricing
from agent.usage_pricing import (
CanonicalUsage,
DEFAULT_PRICING,
estimate_usage_cost,
format_duration_compact,
get_pricing,
has_known_pricing,
)
_DEFAULT_PRICING = DEFAULT_PRICING
def _has_known_pricing(model_name: str) -> bool:
def _has_known_pricing(model_name: str, provider: str = None, base_url: str = None) -> bool:
"""Check if a model has known pricing (vs unknown/custom endpoint)."""
return has_known_pricing(model_name)
return has_known_pricing(model_name, provider=provider, base_url=base_url)
def _get_pricing(model_name: str) -> Dict[str, float]:
@@ -41,9 +48,43 @@ def _get_pricing(model_name: str) -> Dict[str, float]:
return get_pricing(model_name)
def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate the USD cost for a given model and token counts."""
return estimate_cost_usd(model, input_tokens, output_tokens)
def _estimate_cost(
session_or_model: Dict[str, Any] | str,
input_tokens: int = 0,
output_tokens: int = 0,
*,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
provider: str = None,
base_url: str = None,
) -> tuple[float, str]:
"""Estimate the USD cost for a session row or a model/token tuple."""
if isinstance(session_or_model, dict):
session = session_or_model
model = session.get("model") or ""
usage = CanonicalUsage(
input_tokens=session.get("input_tokens") or 0,
output_tokens=session.get("output_tokens") or 0,
cache_read_tokens=session.get("cache_read_tokens") or 0,
cache_write_tokens=session.get("cache_write_tokens") or 0,
)
provider = session.get("billing_provider")
base_url = session.get("billing_base_url")
else:
model = session_or_model or ""
usage = CanonicalUsage(
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
)
result = estimate_usage_cost(
model,
usage,
provider=provider,
base_url=base_url,
)
return float(result.amount_usd or 0.0), result.status
def _format_duration(seconds: float) -> str:
@@ -135,7 +176,10 @@ class InsightsEngine:
# Columns we actually need (skip system_prompt, model_config blobs)
_SESSION_COLS = ("id, source, model, started_at, ended_at, "
"message_count, tool_call_count, input_tokens, output_tokens")
"message_count, tool_call_count, input_tokens, output_tokens, "
"cache_read_tokens, cache_write_tokens, billing_provider, "
"billing_base_url, billing_mode, estimated_cost_usd, "
"actual_cost_usd, cost_status, cost_source")
def _get_sessions(self, cutoff: float, source: str = None) -> List[Dict]:
"""Fetch sessions within the time window."""
@@ -287,21 +331,30 @@ class InsightsEngine:
"""Compute high-level overview statistics."""
total_input = sum(s.get("input_tokens") or 0 for s in sessions)
total_output = sum(s.get("output_tokens") or 0 for s in sessions)
total_tokens = total_input + total_output
total_cache_read = sum(s.get("cache_read_tokens") or 0 for s in sessions)
total_cache_write = sum(s.get("cache_write_tokens") or 0 for s in sessions)
total_tokens = total_input + total_output + total_cache_read + total_cache_write
total_tool_calls = sum(s.get("tool_call_count") or 0 for s in sessions)
total_messages = sum(s.get("message_count") or 0 for s in sessions)
# Cost estimation (weighted by model)
total_cost = 0.0
actual_cost = 0.0
models_with_pricing = set()
models_without_pricing = set()
unknown_cost_sessions = 0
included_cost_sessions = 0
for s in sessions:
model = s.get("model") or ""
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
total_cost += _estimate_cost(model, inp, out)
estimated, status = _estimate_cost(s)
total_cost += estimated
actual_cost += s.get("actual_cost_usd") or 0.0
display = model.split("/")[-1] if "/" in model else (model or "unknown")
if _has_known_pricing(model):
if status == "included":
included_cost_sessions += 1
elif status == "unknown":
unknown_cost_sessions += 1
if _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url")):
models_with_pricing.add(display)
else:
models_without_pricing.add(display)
@@ -328,8 +381,11 @@ class InsightsEngine:
"total_tool_calls": total_tool_calls,
"total_input_tokens": total_input,
"total_output_tokens": total_output,
"total_cache_read_tokens": total_cache_read,
"total_cache_write_tokens": total_cache_write,
"total_tokens": total_tokens,
"estimated_cost": total_cost,
"actual_cost": actual_cost,
"total_hours": total_hours,
"avg_session_duration": avg_duration,
"avg_messages_per_session": total_messages / len(sessions) if sessions else 0,
@@ -341,12 +397,15 @@ class InsightsEngine:
"date_range_end": date_range_end,
"models_with_pricing": sorted(models_with_pricing),
"models_without_pricing": sorted(models_without_pricing),
"unknown_cost_sessions": unknown_cost_sessions,
"included_cost_sessions": included_cost_sessions,
}
def _compute_model_breakdown(self, sessions: List[Dict]) -> List[Dict]:
"""Break down usage by model."""
model_data = defaultdict(lambda: {
"sessions": 0, "input_tokens": 0, "output_tokens": 0,
"cache_read_tokens": 0, "cache_write_tokens": 0,
"total_tokens": 0, "tool_calls": 0, "cost": 0.0,
})
@@ -358,12 +417,18 @@ class InsightsEngine:
d["sessions"] += 1
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
cache_read = s.get("cache_read_tokens") or 0
cache_write = s.get("cache_write_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["total_tokens"] += inp + out
d["cache_read_tokens"] += cache_read
d["cache_write_tokens"] += cache_write
d["total_tokens"] += inp + out + cache_read + cache_write
d["tool_calls"] += s.get("tool_call_count") or 0
d["cost"] += _estimate_cost(model, inp, out)
d["has_pricing"] = _has_known_pricing(model)
estimate, status = _estimate_cost(s)
d["cost"] += estimate
d["has_pricing"] = _has_known_pricing(model, s.get("billing_provider"), s.get("billing_base_url"))
d["cost_status"] = status
result = [
{"model": model, **data}
@@ -377,7 +442,8 @@ class InsightsEngine:
"""Break down usage by platform/source."""
platform_data = defaultdict(lambda: {
"sessions": 0, "messages": 0, "input_tokens": 0,
"output_tokens": 0, "total_tokens": 0, "tool_calls": 0,
"output_tokens": 0, "cache_read_tokens": 0,
"cache_write_tokens": 0, "total_tokens": 0, "tool_calls": 0,
})
for s in sessions:
@@ -387,9 +453,13 @@ class InsightsEngine:
d["messages"] += s.get("message_count") or 0
inp = s.get("input_tokens") or 0
out = s.get("output_tokens") or 0
cache_read = s.get("cache_read_tokens") or 0
cache_write = s.get("cache_write_tokens") or 0
d["input_tokens"] += inp
d["output_tokens"] += out
d["total_tokens"] += inp + out
d["cache_read_tokens"] += cache_read
d["cache_write_tokens"] += cache_write
d["total_tokens"] += inp + out + cache_read + cache_write
d["tool_calls"] += s.get("tool_call_count") or 0
result = [

View File

@@ -1,101 +1,593 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from decimal import Decimal
from typing import Dict
from typing import Any, Dict, Literal, Optional
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4.1": {"input": 2.00, "output": 8.00},
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"gpt-4.5-preview": {"input": 75.00, "output": 150.00},
"gpt-5": {"input": 10.00, "output": 30.00},
"gpt-5.4": {"input": 10.00, "output": 30.00},
"o3": {"input": 10.00, "output": 40.00},
"o3-mini": {"input": 1.10, "output": 4.40},
"o4-mini": {"input": 1.10, "output": 4.40},
"claude-opus-4-20250514": {"input": 15.00, "output": 75.00},
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00},
"claude-3-opus-20240229": {"input": 15.00, "output": 75.00},
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
"deepseek-chat": {"input": 0.14, "output": 0.28},
"deepseek-reasoner": {"input": 0.55, "output": 2.19},
"gemini-2.5-pro": {"input": 1.25, "output": 10.00},
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
"gemini-2.0-flash": {"input": 0.10, "output": 0.40},
"llama-4-maverick": {"input": 0.50, "output": 0.70},
"llama-4-scout": {"input": 0.20, "output": 0.30},
"glm-5": {"input": 0.0, "output": 0.0},
"glm-4.7": {"input": 0.0, "output": 0.0},
"glm-4.5": {"input": 0.0, "output": 0.0},
"glm-4.5-flash": {"input": 0.0, "output": 0.0},
"kimi-k2.5": {"input": 0.0, "output": 0.0},
"kimi-k2-thinking": {"input": 0.0, "output": 0.0},
"kimi-k2-turbo-preview": {"input": 0.0, "output": 0.0},
"kimi-k2-0905-preview": {"input": 0.0, "output": 0.0},
"MiniMax-M2.5": {"input": 0.0, "output": 0.0},
"MiniMax-M2.5-highspeed": {"input": 0.0, "output": 0.0},
"MiniMax-M2.1": {"input": 0.0, "output": 0.0},
}
from agent.model_metadata import fetch_model_metadata
DEFAULT_PRICING = {"input": 0.0, "output": 0.0}
_ZERO = Decimal("0")
_ONE_MILLION = Decimal("1000000")
def get_pricing(model_name: str) -> Dict[str, float]:
if not model_name:
return DEFAULT_PRICING
bare = model_name.split("/")[-1].lower()
if bare in MODEL_PRICING:
return MODEL_PRICING[bare]
best_match = None
best_len = 0
for key, price in MODEL_PRICING.items():
if bare.startswith(key) and len(key) > best_len:
best_match = price
best_len = len(key)
if best_match:
return best_match
if "opus" in bare:
return {"input": 15.00, "output": 75.00}
if "sonnet" in bare:
return {"input": 3.00, "output": 15.00}
if "haiku" in bare:
return {"input": 0.80, "output": 4.00}
if "gpt-4o-mini" in bare:
return {"input": 0.15, "output": 0.60}
if "gpt-4o" in bare:
return {"input": 2.50, "output": 10.00}
if "gpt-5" in bare:
return {"input": 10.00, "output": 30.00}
if "deepseek" in bare:
return {"input": 0.14, "output": 0.28}
if "gemini" in bare:
return {"input": 0.15, "output": 0.60}
return DEFAULT_PRICING
CostStatus = Literal["actual", "estimated", "included", "unknown"]
CostSource = Literal[
"provider_cost_api",
"provider_generation_api",
"provider_models_api",
"official_docs_snapshot",
"user_override",
"custom_contract",
"none",
]
def has_known_pricing(model_name: str) -> bool:
pricing = get_pricing(model_name)
return pricing is not DEFAULT_PRICING and any(
float(value) > 0 for value in pricing.values()
@dataclass(frozen=True)
class CanonicalUsage:
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
request_count: int = 1
raw_usage: Optional[dict[str, Any]] = None
@property
def prompt_tokens(self) -> int:
return self.input_tokens + self.cache_read_tokens + self.cache_write_tokens
@property
def total_tokens(self) -> int:
return self.prompt_tokens + self.output_tokens
@dataclass(frozen=True)
class BillingRoute:
provider: str
model: str
base_url: str = ""
billing_mode: str = "unknown"
@dataclass(frozen=True)
class PricingEntry:
input_cost_per_million: Optional[Decimal] = None
output_cost_per_million: Optional[Decimal] = None
cache_read_cost_per_million: Optional[Decimal] = None
cache_write_cost_per_million: Optional[Decimal] = None
request_cost: Optional[Decimal] = None
source: CostSource = "none"
source_url: Optional[str] = None
pricing_version: Optional[str] = None
fetched_at: Optional[datetime] = None
@dataclass(frozen=True)
class CostResult:
amount_usd: Optional[Decimal]
status: CostStatus
source: CostSource
label: str
fetched_at: Optional[datetime] = None
pricing_version: Optional[str] = None
notes: tuple[str, ...] = ()
_UTC_NOW = lambda: datetime.now(timezone.utc)
# Official docs snapshot entries. Models whose published pricing and cache
# semantics are stable enough to encode exactly.
_OFFICIAL_DOCS_PRICING: Dict[tuple[str, str], PricingEntry] = {
(
"anthropic",
"claude-opus-4-20250514",
): PricingEntry(
input_cost_per_million=Decimal("15.00"),
output_cost_per_million=Decimal("75.00"),
cache_read_cost_per_million=Decimal("1.50"),
cache_write_cost_per_million=Decimal("18.75"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-prompt-caching-2026-03-16",
),
(
"anthropic",
"claude-sonnet-4-20250514",
): PricingEntry(
input_cost_per_million=Decimal("3.00"),
output_cost_per_million=Decimal("15.00"),
cache_read_cost_per_million=Decimal("0.30"),
cache_write_cost_per_million=Decimal("3.75"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-prompt-caching-2026-03-16",
),
# OpenAI
(
"openai",
"gpt-4o",
): PricingEntry(
input_cost_per_million=Decimal("2.50"),
output_cost_per_million=Decimal("10.00"),
cache_read_cost_per_million=Decimal("1.25"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"gpt-4o-mini",
): PricingEntry(
input_cost_per_million=Decimal("0.15"),
output_cost_per_million=Decimal("0.60"),
cache_read_cost_per_million=Decimal("0.075"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"gpt-4.1",
): PricingEntry(
input_cost_per_million=Decimal("2.00"),
output_cost_per_million=Decimal("8.00"),
cache_read_cost_per_million=Decimal("0.50"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"gpt-4.1-mini",
): PricingEntry(
input_cost_per_million=Decimal("0.40"),
output_cost_per_million=Decimal("1.60"),
cache_read_cost_per_million=Decimal("0.10"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"gpt-4.1-nano",
): PricingEntry(
input_cost_per_million=Decimal("0.10"),
output_cost_per_million=Decimal("0.40"),
cache_read_cost_per_million=Decimal("0.025"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"o3",
): PricingEntry(
input_cost_per_million=Decimal("10.00"),
output_cost_per_million=Decimal("40.00"),
cache_read_cost_per_million=Decimal("2.50"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
(
"openai",
"o3-mini",
): PricingEntry(
input_cost_per_million=Decimal("1.10"),
output_cost_per_million=Decimal("4.40"),
cache_read_cost_per_million=Decimal("0.55"),
source="official_docs_snapshot",
source_url="https://openai.com/api/pricing/",
pricing_version="openai-pricing-2026-03-16",
),
# Anthropic older models (pre-4.6 generation)
(
"anthropic",
"claude-3-5-sonnet-20241022",
): PricingEntry(
input_cost_per_million=Decimal("3.00"),
output_cost_per_million=Decimal("15.00"),
cache_read_cost_per_million=Decimal("0.30"),
cache_write_cost_per_million=Decimal("3.75"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-pricing-2026-03-16",
),
(
"anthropic",
"claude-3-5-haiku-20241022",
): PricingEntry(
input_cost_per_million=Decimal("0.80"),
output_cost_per_million=Decimal("4.00"),
cache_read_cost_per_million=Decimal("0.08"),
cache_write_cost_per_million=Decimal("1.00"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-pricing-2026-03-16",
),
(
"anthropic",
"claude-3-opus-20240229",
): PricingEntry(
input_cost_per_million=Decimal("15.00"),
output_cost_per_million=Decimal("75.00"),
cache_read_cost_per_million=Decimal("1.50"),
cache_write_cost_per_million=Decimal("18.75"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-pricing-2026-03-16",
),
(
"anthropic",
"claude-3-haiku-20240307",
): PricingEntry(
input_cost_per_million=Decimal("0.25"),
output_cost_per_million=Decimal("1.25"),
cache_read_cost_per_million=Decimal("0.03"),
cache_write_cost_per_million=Decimal("0.30"),
source="official_docs_snapshot",
source_url="https://docs.anthropic.com/en/docs/build-with-claude/prompt-caching",
pricing_version="anthropic-pricing-2026-03-16",
),
# DeepSeek
(
"deepseek",
"deepseek-chat",
): PricingEntry(
input_cost_per_million=Decimal("0.14"),
output_cost_per_million=Decimal("0.28"),
source="official_docs_snapshot",
source_url="https://api-docs.deepseek.com/quick_start/pricing",
pricing_version="deepseek-pricing-2026-03-16",
),
(
"deepseek",
"deepseek-reasoner",
): PricingEntry(
input_cost_per_million=Decimal("0.55"),
output_cost_per_million=Decimal("2.19"),
source="official_docs_snapshot",
source_url="https://api-docs.deepseek.com/quick_start/pricing",
pricing_version="deepseek-pricing-2026-03-16",
),
# Google Gemini
(
"google",
"gemini-2.5-pro",
): PricingEntry(
input_cost_per_million=Decimal("1.25"),
output_cost_per_million=Decimal("10.00"),
source="official_docs_snapshot",
source_url="https://ai.google.dev/pricing",
pricing_version="google-pricing-2026-03-16",
),
(
"google",
"gemini-2.5-flash",
): PricingEntry(
input_cost_per_million=Decimal("0.15"),
output_cost_per_million=Decimal("0.60"),
source="official_docs_snapshot",
source_url="https://ai.google.dev/pricing",
pricing_version="google-pricing-2026-03-16",
),
(
"google",
"gemini-2.0-flash",
): PricingEntry(
input_cost_per_million=Decimal("0.10"),
output_cost_per_million=Decimal("0.40"),
source="official_docs_snapshot",
source_url="https://ai.google.dev/pricing",
pricing_version="google-pricing-2026-03-16",
),
}
def _to_decimal(value: Any) -> Optional[Decimal]:
if value is None:
return None
try:
return Decimal(str(value))
except Exception:
return None
def _to_int(value: Any) -> int:
try:
return int(value or 0)
except Exception:
return 0
def resolve_billing_route(
model_name: str,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> BillingRoute:
provider_name = (provider or "").strip().lower()
base = (base_url or "").strip().lower()
model = (model_name or "").strip()
if not provider_name and "/" in model:
inferred_provider, bare_model = model.split("/", 1)
if inferred_provider in {"anthropic", "openai", "google"}:
provider_name = inferred_provider
model = bare_model
if provider_name == "openai-codex":
return BillingRoute(provider="openai-codex", model=model, base_url=base_url or "", billing_mode="subscription_included")
if provider_name == "openrouter" or "openrouter.ai" in base:
return BillingRoute(provider="openrouter", model=model, base_url=base_url or "", billing_mode="official_models_api")
if provider_name == "anthropic":
return BillingRoute(provider="anthropic", model=model.split("/")[-1], base_url=base_url or "", billing_mode="official_docs_snapshot")
if provider_name == "openai":
return BillingRoute(provider="openai", model=model.split("/")[-1], base_url=base_url or "", billing_mode="official_docs_snapshot")
if provider_name in {"custom", "local"} or (base and "localhost" in base):
return BillingRoute(provider=provider_name or "custom", model=model, base_url=base_url or "", billing_mode="unknown")
return BillingRoute(provider=provider_name or "unknown", model=model.split("/")[-1] if model else "", base_url=base_url or "", billing_mode="unknown")
def _lookup_official_docs_pricing(route: BillingRoute) -> Optional[PricingEntry]:
return _OFFICIAL_DOCS_PRICING.get((route.provider, route.model.lower()))
def _openrouter_pricing_entry(route: BillingRoute) -> Optional[PricingEntry]:
metadata = fetch_model_metadata()
model_id = route.model
if model_id not in metadata:
return None
pricing = metadata[model_id].get("pricing") or {}
prompt = _to_decimal(pricing.get("prompt"))
completion = _to_decimal(pricing.get("completion"))
request = _to_decimal(pricing.get("request"))
cache_read = _to_decimal(
pricing.get("cache_read")
or pricing.get("cached_prompt")
or pricing.get("input_cache_read")
)
cache_write = _to_decimal(
pricing.get("cache_write")
or pricing.get("cache_creation")
or pricing.get("input_cache_write")
)
if prompt is None and completion is None and request is None:
return None
def _per_token_to_per_million(value: Optional[Decimal]) -> Optional[Decimal]:
if value is None:
return None
return value * _ONE_MILLION
return PricingEntry(
input_cost_per_million=_per_token_to_per_million(prompt),
output_cost_per_million=_per_token_to_per_million(completion),
cache_read_cost_per_million=_per_token_to_per_million(cache_read),
cache_write_cost_per_million=_per_token_to_per_million(cache_write),
request_cost=request,
source="provider_models_api",
source_url="https://openrouter.ai/docs/api/api-reference/models/get-models",
pricing_version="openrouter-models-api",
fetched_at=_UTC_NOW(),
)
def estimate_cost_usd(model: str, input_tokens: int, output_tokens: int) -> float:
pricing = get_pricing(model)
total = (
Decimal(input_tokens) * Decimal(str(pricing["input"]))
+ Decimal(output_tokens) * Decimal(str(pricing["output"]))
) / Decimal("1000000")
return float(total)
def get_pricing_entry(
model_name: str,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> Optional[PricingEntry]:
route = resolve_billing_route(model_name, provider=provider, base_url=base_url)
if route.billing_mode == "subscription_included":
return PricingEntry(
input_cost_per_million=_ZERO,
output_cost_per_million=_ZERO,
cache_read_cost_per_million=_ZERO,
cache_write_cost_per_million=_ZERO,
source="none",
pricing_version="included-route",
)
if route.provider == "openrouter":
return _openrouter_pricing_entry(route)
return _lookup_official_docs_pricing(route)
def normalize_usage(
response_usage: Any,
*,
provider: Optional[str] = None,
api_mode: Optional[str] = None,
) -> CanonicalUsage:
"""Normalize raw API response usage into canonical token buckets.
Handles three API shapes:
- Anthropic: input_tokens/output_tokens/cache_read_input_tokens/cache_creation_input_tokens
- Codex Responses: input_tokens includes cache tokens; input_tokens_details.cached_tokens separates them
- OpenAI Chat Completions: prompt_tokens includes cache tokens; prompt_tokens_details.cached_tokens separates them
In both Codex and OpenAI modes, input_tokens is derived by subtracting cache
tokens from the total — the API contract is that input/prompt totals include
cached tokens and the details object breaks them out.
"""
if not response_usage:
return CanonicalUsage()
provider_name = (provider or "").strip().lower()
mode = (api_mode or "").strip().lower()
if mode == "anthropic_messages" or provider_name == "anthropic":
input_tokens = _to_int(getattr(response_usage, "input_tokens", 0))
output_tokens = _to_int(getattr(response_usage, "output_tokens", 0))
cache_read_tokens = _to_int(getattr(response_usage, "cache_read_input_tokens", 0))
cache_write_tokens = _to_int(getattr(response_usage, "cache_creation_input_tokens", 0))
elif mode == "codex_responses":
input_total = _to_int(getattr(response_usage, "input_tokens", 0))
output_tokens = _to_int(getattr(response_usage, "output_tokens", 0))
details = getattr(response_usage, "input_tokens_details", None)
cache_read_tokens = _to_int(getattr(details, "cached_tokens", 0) if details else 0)
cache_write_tokens = _to_int(
getattr(details, "cache_creation_tokens", 0) if details else 0
)
input_tokens = max(0, input_total - cache_read_tokens - cache_write_tokens)
else:
prompt_total = _to_int(getattr(response_usage, "prompt_tokens", 0))
output_tokens = _to_int(getattr(response_usage, "completion_tokens", 0))
details = getattr(response_usage, "prompt_tokens_details", None)
cache_read_tokens = _to_int(getattr(details, "cached_tokens", 0) if details else 0)
cache_write_tokens = _to_int(
getattr(details, "cache_write_tokens", 0) if details else 0
)
input_tokens = max(0, prompt_total - cache_read_tokens - cache_write_tokens)
reasoning_tokens = 0
output_details = getattr(response_usage, "output_tokens_details", None)
if output_details:
reasoning_tokens = _to_int(getattr(output_details, "reasoning_tokens", 0))
return CanonicalUsage(
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
reasoning_tokens=reasoning_tokens,
)
def estimate_usage_cost(
model_name: str,
usage: CanonicalUsage,
*,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> CostResult:
route = resolve_billing_route(model_name, provider=provider, base_url=base_url)
if route.billing_mode == "subscription_included":
return CostResult(
amount_usd=_ZERO,
status="included",
source="none",
label="included",
pricing_version="included-route",
)
entry = get_pricing_entry(model_name, provider=provider, base_url=base_url)
if not entry:
return CostResult(amount_usd=None, status="unknown", source="none", label="n/a")
notes: list[str] = []
amount = _ZERO
if usage.input_tokens and entry.input_cost_per_million is None:
return CostResult(amount_usd=None, status="unknown", source=entry.source, label="n/a")
if usage.output_tokens and entry.output_cost_per_million is None:
return CostResult(amount_usd=None, status="unknown", source=entry.source, label="n/a")
if usage.cache_read_tokens:
if entry.cache_read_cost_per_million is None:
return CostResult(
amount_usd=None,
status="unknown",
source=entry.source,
label="n/a",
notes=("cache-read pricing unavailable for route",),
)
if usage.cache_write_tokens:
if entry.cache_write_cost_per_million is None:
return CostResult(
amount_usd=None,
status="unknown",
source=entry.source,
label="n/a",
notes=("cache-write pricing unavailable for route",),
)
if entry.input_cost_per_million is not None:
amount += Decimal(usage.input_tokens) * entry.input_cost_per_million / _ONE_MILLION
if entry.output_cost_per_million is not None:
amount += Decimal(usage.output_tokens) * entry.output_cost_per_million / _ONE_MILLION
if entry.cache_read_cost_per_million is not None:
amount += Decimal(usage.cache_read_tokens) * entry.cache_read_cost_per_million / _ONE_MILLION
if entry.cache_write_cost_per_million is not None:
amount += Decimal(usage.cache_write_tokens) * entry.cache_write_cost_per_million / _ONE_MILLION
if entry.request_cost is not None and usage.request_count:
amount += Decimal(usage.request_count) * entry.request_cost
status: CostStatus = "estimated"
label = f"~${amount:.2f}"
if entry.source == "none" and amount == _ZERO:
status = "included"
label = "included"
if route.provider == "openrouter":
notes.append("OpenRouter cost is estimated from the models API until reconciled.")
return CostResult(
amount_usd=amount,
status=status,
source=entry.source,
label=label,
fetched_at=entry.fetched_at,
pricing_version=entry.pricing_version,
notes=tuple(notes),
)
def has_known_pricing(
model_name: str,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> bool:
"""Check whether we have pricing data for this model+route.
Uses direct lookup instead of routing through the full estimation
pipeline — avoids creating dummy usage objects just to check status.
"""
route = resolve_billing_route(model_name, provider=provider, base_url=base_url)
if route.billing_mode == "subscription_included":
return True
entry = get_pricing_entry(model_name, provider=provider, base_url=base_url)
return entry is not None
def get_pricing(
model_name: str,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> Dict[str, float]:
"""Backward-compatible thin wrapper for legacy callers.
Returns only non-cache input/output fields when a pricing entry exists.
Unknown routes return zeroes.
"""
entry = get_pricing_entry(model_name, provider=provider, base_url=base_url)
if not entry:
return {"input": 0.0, "output": 0.0}
return {
"input": float(entry.input_cost_per_million or _ZERO),
"output": float(entry.output_cost_per_million or _ZERO),
}
def estimate_cost_usd(
model: str,
input_tokens: int,
output_tokens: int,
*,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> float:
"""Backward-compatible helper for legacy callers.
This uses non-cached input/output only. New code should call
`estimate_usage_cost()` with canonical usage buckets.
"""
result = estimate_usage_cost(
model,
CanonicalUsage(input_tokens=input_tokens, output_tokens=output_tokens),
provider=provider,
base_url=base_url,
)
return float(result.amount_usd or _ZERO)
def format_duration_compact(seconds: float) -> str:

95
cli.py
View File

@@ -58,7 +58,12 @@ except (ImportError, AttributeError):
import threading
import queue
from agent.usage_pricing import estimate_cost_usd, format_duration_compact, format_token_count_compact, has_known_pricing
from agent.usage_pricing import (
CanonicalUsage,
estimate_usage_cost,
format_duration_compact,
format_token_count_compact,
)
from hermes_cli.banner import _format_context_length
_COMMAND_SPINNER_FRAMES = ("", "", "", "", "", "", "", "", "", "")
@@ -212,7 +217,7 @@ def load_cli_config() -> Dict[str, Any]:
"resume_display": "full",
"show_reasoning": False,
"streaming": False,
"show_cost": False,
"skin": "default",
"theme_mode": "auto",
},
@@ -1034,8 +1039,7 @@ class HermesCLI:
self.bell_on_complete = CLI_CONFIG["display"].get("bell_on_complete", False)
# show_reasoning: display model thinking/reasoning before the response
self.show_reasoning = CLI_CONFIG["display"].get("show_reasoning", False)
# show_cost: display $ cost in the status bar (off by default)
self.show_cost = CLI_CONFIG["display"].get("show_cost", False)
self.verbose = verbose if verbose is not None else (self.tool_progress_mode == "verbose")
# streaming: stream tokens to the terminal as they arrive (display.streaming in config.yaml)
@@ -1260,12 +1264,14 @@ class HermesCLI:
"context_tokens": 0,
"context_length": None,
"context_percent": None,
"session_input_tokens": 0,
"session_output_tokens": 0,
"session_cache_read_tokens": 0,
"session_cache_write_tokens": 0,
"session_prompt_tokens": 0,
"session_completion_tokens": 0,
"session_total_tokens": 0,
"session_api_calls": 0,
"session_cost": 0.0,
"pricing_known": has_known_pricing(model_name),
"compressions": 0,
}
@@ -1273,15 +1279,14 @@ class HermesCLI:
if not agent:
return snapshot
snapshot["session_input_tokens"] = getattr(agent, "session_input_tokens", 0) or 0
snapshot["session_output_tokens"] = getattr(agent, "session_output_tokens", 0) or 0
snapshot["session_cache_read_tokens"] = getattr(agent, "session_cache_read_tokens", 0) or 0
snapshot["session_cache_write_tokens"] = getattr(agent, "session_cache_write_tokens", 0) or 0
snapshot["session_prompt_tokens"] = getattr(agent, "session_prompt_tokens", 0) or 0
snapshot["session_completion_tokens"] = getattr(agent, "session_completion_tokens", 0) or 0
snapshot["session_total_tokens"] = getattr(agent, "session_total_tokens", 0) or 0
snapshot["session_api_calls"] = getattr(agent, "session_api_calls", 0) or 0
snapshot["session_cost"] = estimate_cost_usd(
model_name,
snapshot["session_prompt_tokens"],
snapshot["session_completion_tokens"],
)
compressor = getattr(agent, "context_compressor", None)
if compressor:
@@ -1302,19 +1307,11 @@ class HermesCLI:
percent = snapshot["context_percent"]
percent_label = f"{percent}%" if percent is not None else "--"
duration_label = snapshot["duration"]
show_cost = getattr(self, "show_cost", False)
if show_cost:
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
else:
cost_label = None
if width < 52:
return f"{snapshot['model_short']} · {duration_label}"
if width < 76:
parts = [f"{snapshot['model_short']}", percent_label]
if cost_label:
parts.append(cost_label)
parts.append(duration_label)
return " · ".join(parts)
@@ -1326,8 +1323,6 @@ class HermesCLI:
context_label = "ctx --"
parts = [f"{snapshot['model_short']}", context_label, percent_label]
if cost_label:
parts.append(cost_label)
parts.append(duration_label)
return "".join(parts)
except Exception:
@@ -1338,12 +1333,6 @@ class HermesCLI:
snapshot = self._get_status_bar_snapshot()
width = shutil.get_terminal_size((80, 24)).columns
duration_label = snapshot["duration"]
show_cost = getattr(self, "show_cost", False)
if show_cost:
cost_label = f"${snapshot['session_cost']:.2f}" if snapshot["pricing_known"] else "cost n/a"
else:
cost_label = None
if width < 52:
return [
@@ -1363,11 +1352,6 @@ class HermesCLI:
("class:status-bar-dim", " · "),
(self._status_bar_context_style(percent), percent_label),
]
if cost_label:
frags.extend([
("class:status-bar-dim", " · "),
("class:status-bar-dim", cost_label),
])
frags.extend([
("class:status-bar-dim", " · "),
("class:status-bar-dim", duration_label),
@@ -1393,11 +1377,6 @@ class HermesCLI:
("class:status-bar-dim", " "),
(bar_style, percent_label),
]
if cost_label:
frags.extend([
("class:status-bar-dim", ""),
("class:status-bar-dim", cost_label),
])
frags.extend([
("class:status-bar-dim", ""),
("class:status-bar-dim", duration_label),
@@ -4250,6 +4229,10 @@ class HermesCLI:
return
agent = self.agent
input_tokens = getattr(agent, "session_input_tokens", 0) or 0
output_tokens = getattr(agent, "session_output_tokens", 0) or 0
cache_read_tokens = getattr(agent, "session_cache_read_tokens", 0) or 0
cache_write_tokens = getattr(agent, "session_cache_write_tokens", 0) or 0
prompt = agent.session_prompt_tokens
completion = agent.session_completion_tokens
total = agent.session_total_tokens
@@ -4267,33 +4250,45 @@ class HermesCLI:
compressions = compressor.compression_count
msg_count = len(self.conversation_history)
cost = estimate_cost_usd(agent.model, prompt, completion)
prompt_cost = estimate_cost_usd(agent.model, prompt, 0)
completion_cost = estimate_cost_usd(agent.model, 0, completion)
pricing_known = has_known_pricing(agent.model)
cost_result = estimate_usage_cost(
agent.model,
CanonicalUsage(
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
),
provider=getattr(agent, "provider", None),
base_url=getattr(agent, "base_url", None),
)
elapsed = format_duration_compact((datetime.now() - self.session_start).total_seconds())
print(f" 📊 Session Token Usage")
print(f" {'' * 40}")
print(f" Model: {agent.model}")
print(f" Prompt tokens (input): {prompt:>10,}")
print(f" Completion tokens (output): {completion:>9,}")
print(f" Input tokens: {input_tokens:>10,}")
print(f" Cache read tokens: {cache_read_tokens:>10,}")
print(f" Cache write tokens: {cache_write_tokens:>10,}")
print(f" Output tokens: {output_tokens:>10,}")
print(f" Prompt tokens (total): {prompt:>10,}")
print(f" Completion tokens: {completion:>10,}")
print(f" Total tokens: {total:>10,}")
print(f" API calls: {calls:>10,}")
print(f" Session duration: {elapsed:>10}")
if pricing_known:
print(f" Input cost: ${prompt_cost:>10.4f}")
print(f" Output cost: ${completion_cost:>10.4f}")
print(f" Total cost: ${cost:>10.4f}")
print(f" Cost status: {cost_result.status:>10}")
print(f" Cost source: {cost_result.source:>10}")
if cost_result.amount_usd is not None:
prefix = "~" if cost_result.status == "estimated" else ""
print(f" Total cost: {prefix}${float(cost_result.amount_usd):>10.4f}")
elif cost_result.status == "included":
print(f" Total cost: {'included':>10}")
else:
print(f" Input cost: {'n/a':>10}")
print(f" Output cost: {'n/a':>10}")
print(f" Total cost: {'n/a':>10}")
print(f" {'' * 40}")
print(f" Current context: {last_prompt:,} / {ctx_len:,} ({pct:.0f}%)")
print(f" Messages: {msg_count}")
print(f" Compressions: {compressions}")
if not pricing_known:
if cost_result.status == "unknown":
print(f" Note: Pricing unknown for {agent.model}")
if self.verbose:

View File

@@ -0,0 +1,608 @@
# Pricing Accuracy Architecture
Date: 2026-03-16
## Goal
Hermes should only show dollar costs when they are backed by an official source for the user's actual billing path.
This design replaces the current static, heuristic pricing flow in:
- `run_agent.py`
- `agent/usage_pricing.py`
- `agent/insights.py`
- `cli.py`
with a provider-aware pricing system that:
- handles cache billing correctly
- distinguishes `actual` vs `estimated` vs `included` vs `unknown`
- reconciles post-hoc costs when providers expose authoritative billing data
- supports direct providers, OpenRouter, subscriptions, enterprise pricing, and custom endpoints
## Problems In The Current Design
Current Hermes behavior has four structural issues:
1. It stores only `prompt_tokens` and `completion_tokens`, which is insufficient for providers that bill cache reads and cache writes separately.
2. It uses a static model price table and fuzzy heuristics, which can drift from current official pricing.
3. It assumes public API list pricing matches the user's real billing path.
4. It has no distinction between live estimates and reconciled billed cost.
## Design Principles
1. Normalize usage before pricing.
2. Never fold cached tokens into plain input cost.
3. Track certainty explicitly.
4. Treat the billing path as part of the model identity.
5. Prefer official machine-readable sources over scraped docs.
6. Use post-hoc provider cost APIs when available.
7. Show `n/a` rather than inventing precision.
## High-Level Architecture
The new system has four layers:
1. `usage_normalization`
Converts raw provider usage into a canonical usage record.
2. `pricing_source_resolution`
Determines the billing path, source of truth, and applicable pricing source.
3. `cost_estimation_and_reconciliation`
Produces an immediate estimate when possible, then replaces or annotates it with actual billed cost later.
4. `presentation`
`/usage`, `/insights`, and the status bar display cost with certainty metadata.
## Canonical Usage Record
Add a canonical usage model that every provider path maps into before any pricing math happens.
Suggested structure:
```python
@dataclass
class CanonicalUsage:
provider: str
billing_provider: str
model: str
billing_route: str
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
request_count: int = 1
raw_usage: dict[str, Any] | None = None
raw_usage_fields: dict[str, str] | None = None
computed_fields: set[str] | None = None
provider_request_id: str | None = None
provider_generation_id: str | None = None
provider_response_id: str | None = None
```
Rules:
- `input_tokens` means non-cached input only.
- `cache_read_tokens` and `cache_write_tokens` are never merged into `input_tokens`.
- `output_tokens` excludes cache metrics.
- `reasoning_tokens` is telemetry unless a provider officially bills it separately.
This is the same normalization pattern used by `opencode`, extended with provenance and reconciliation ids.
## Provider Normalization Rules
### OpenAI Direct
Source usage fields:
- `prompt_tokens`
- `completion_tokens`
- `prompt_tokens_details.cached_tokens`
Normalization:
- `cache_read_tokens = cached_tokens`
- `input_tokens = prompt_tokens - cached_tokens`
- `cache_write_tokens = 0` unless OpenAI exposes it in the relevant route
- `output_tokens = completion_tokens`
### Anthropic Direct
Source usage fields:
- `input_tokens`
- `output_tokens`
- `cache_read_input_tokens`
- `cache_creation_input_tokens`
Normalization:
- `input_tokens = input_tokens`
- `output_tokens = output_tokens`
- `cache_read_tokens = cache_read_input_tokens`
- `cache_write_tokens = cache_creation_input_tokens`
### OpenRouter
Estimate-time usage normalization should use the response usage payload with the same rules as the underlying provider when possible.
Reconciliation-time records should also store:
- OpenRouter generation id
- native token fields when available
- `total_cost`
- `cache_discount`
- `upstream_inference_cost`
- `is_byok`
### Gemini / Vertex
Use official Gemini or Vertex usage fields where available.
If cached content tokens are exposed:
- map them to `cache_read_tokens`
If a route exposes no cache creation metric:
- store `cache_write_tokens = 0`
- preserve the raw usage payload for later extension
### DeepSeek And Other Direct Providers
Normalize only the fields that are officially exposed.
If a provider does not expose cache buckets:
- do not infer them unless the provider explicitly documents how to derive them
### Subscription / Included-Cost Routes
These still use the canonical usage model.
Tokens are tracked normally. Cost depends on billing mode, not on whether usage exists.
## Billing Route Model
Hermes must stop keying pricing solely by `model`.
Introduce a billing route descriptor:
```python
@dataclass
class BillingRoute:
provider: str
base_url: str | None
model: str
billing_mode: str
organization_hint: str | None = None
```
`billing_mode` values:
- `official_cost_api`
- `official_generation_api`
- `official_models_api`
- `official_docs_snapshot`
- `subscription_included`
- `user_override`
- `custom_contract`
- `unknown`
Examples:
- OpenAI direct API with Costs API access: `official_cost_api`
- Anthropic direct API with Usage & Cost API access: `official_cost_api`
- OpenRouter request before reconciliation: `official_models_api`
- OpenRouter request after generation lookup: `official_generation_api`
- GitHub Copilot style subscription route: `subscription_included`
- local OpenAI-compatible server: `unknown`
- enterprise contract with configured rates: `custom_contract`
## Cost Status Model
Every displayed cost should have:
```python
@dataclass
class CostResult:
amount_usd: Decimal | None
status: Literal["actual", "estimated", "included", "unknown"]
source: Literal[
"provider_cost_api",
"provider_generation_api",
"provider_models_api",
"official_docs_snapshot",
"user_override",
"custom_contract",
"none",
]
label: str
fetched_at: datetime | None
pricing_version: str | None
notes: list[str]
```
Presentation rules:
- `actual`: show dollar amount as final
- `estimated`: show dollar amount with estimate labeling
- `included`: show `included` or `$0.00 (included)` depending on UX choice
- `unknown`: show `n/a`
## Official Source Hierarchy
Resolve cost using this order:
1. Request-level or account-level official billed cost
2. Official machine-readable model pricing
3. Official docs snapshot
4. User override or custom contract
5. Unknown
The system must never skip to a lower level if a higher-confidence source exists for the current billing route.
## Provider-Specific Truth Rules
### OpenAI Direct
Preferred truth:
1. Costs API for reconciled spend
2. Official pricing page for live estimate
### Anthropic Direct
Preferred truth:
1. Usage & Cost API for reconciled spend
2. Official pricing docs for live estimate
### OpenRouter
Preferred truth:
1. `GET /api/v1/generation` for reconciled `total_cost`
2. `GET /api/v1/models` pricing for live estimate
Do not use underlying provider public pricing as the source of truth for OpenRouter billing.
### Gemini / Vertex
Preferred truth:
1. official billing export or billing API for reconciled spend when available for the route
2. official pricing docs for estimate
### DeepSeek
Preferred truth:
1. official machine-readable cost source if available in the future
2. official pricing docs snapshot today
### Subscription-Included Routes
Preferred truth:
1. explicit route config marking the model as included in subscription
These should display `included`, not an API list-price estimate.
### Custom Endpoint / Local Model
Preferred truth:
1. user override
2. custom contract config
3. unknown
These should default to `unknown`.
## Pricing Catalog
Replace the current `MODEL_PRICING` dict with a richer pricing catalog.
Suggested record:
```python
@dataclass
class PricingEntry:
provider: str
route_pattern: str
model_pattern: str
input_cost_per_million: Decimal | None = None
output_cost_per_million: Decimal | None = None
cache_read_cost_per_million: Decimal | None = None
cache_write_cost_per_million: Decimal | None = None
request_cost: Decimal | None = None
image_cost: Decimal | None = None
source: str = "official_docs_snapshot"
source_url: str | None = None
fetched_at: datetime | None = None
pricing_version: str | None = None
```
The catalog should be route-aware:
- `openai:gpt-5`
- `anthropic:claude-opus-4-6`
- `openrouter:anthropic/claude-opus-4.6`
- `copilot:gpt-4o`
This avoids conflating direct-provider billing with aggregator billing.
## Pricing Sync Architecture
Introduce a pricing sync subsystem instead of manually maintaining a single hardcoded table.
Suggested modules:
- `agent/pricing/catalog.py`
- `agent/pricing/sources.py`
- `agent/pricing/sync.py`
- `agent/pricing/reconcile.py`
- `agent/pricing/types.py`
### Sync Sources
- OpenRouter models API
- official provider docs snapshots where no API exists
- user overrides from config
### Sync Output
Cache pricing entries locally with:
- source URL
- fetch timestamp
- version/hash
- confidence/source type
### Sync Frequency
- startup warm cache
- background refresh every 6 to 24 hours depending on source
- manual `hermes pricing sync`
## Reconciliation Architecture
Live requests may produce only an estimate initially. Hermes should reconcile them later when a provider exposes actual billed cost.
Suggested flow:
1. Agent call completes.
2. Hermes stores canonical usage plus reconciliation ids.
3. Hermes computes an immediate estimate if a pricing source exists.
4. A reconciliation worker fetches actual cost when supported.
5. Session and message records are updated with `actual` cost.
This can run:
- inline for cheap lookups
- asynchronously for delayed provider accounting
## Persistence Changes
Session storage should stop storing only aggregate prompt/completion totals.
Add fields for both usage and cost certainty:
- `input_tokens`
- `output_tokens`
- `cache_read_tokens`
- `cache_write_tokens`
- `reasoning_tokens`
- `estimated_cost_usd`
- `actual_cost_usd`
- `cost_status`
- `cost_source`
- `pricing_version`
- `billing_provider`
- `billing_mode`
If schema expansion is too large for one PR, add a new pricing events table:
```text
session_cost_events
id
session_id
request_id
provider
model
billing_mode
input_tokens
output_tokens
cache_read_tokens
cache_write_tokens
estimated_cost_usd
actual_cost_usd
cost_status
cost_source
pricing_version
created_at
updated_at
```
## Hermes Touchpoints
### `run_agent.py`
Current responsibility:
- parse raw provider usage
- update session token counters
New responsibility:
- build `CanonicalUsage`
- update canonical counters
- store reconciliation ids
- emit usage event to pricing subsystem
### `agent/usage_pricing.py`
Current responsibility:
- static lookup table
- direct cost arithmetic
New responsibility:
- move or replace with pricing catalog facade
- no fuzzy model-family heuristics
- no direct pricing without billing-route context
### `cli.py`
Current responsibility:
- compute session cost directly from prompt/completion totals
New responsibility:
- display `CostResult`
- show status badges:
- `actual`
- `estimated`
- `included`
- `n/a`
### `agent/insights.py`
Current responsibility:
- recompute historical estimates from static pricing
New responsibility:
- aggregate stored pricing events
- prefer actual cost over estimate
- surface estimates only when reconciliation is unavailable
## UX Rules
### Status Bar
Show one of:
- `$1.42`
- `~$1.42`
- `included`
- `cost n/a`
Where:
- `$1.42` means `actual`
- `~$1.42` means `estimated`
- `included` means subscription-backed or explicitly zero-cost route
- `cost n/a` means unknown
### `/usage`
Show:
- token buckets
- estimated cost
- actual cost if available
- cost status
- pricing source
### `/insights`
Aggregate:
- actual cost totals
- estimated-only totals
- unknown-cost sessions count
- included-cost sessions count
## Config And Overrides
Add user-configurable pricing overrides in config:
```yaml
pricing:
mode: hybrid
sync_on_startup: true
sync_interval_hours: 12
overrides:
- provider: openrouter
model: anthropic/claude-opus-4.6
billing_mode: custom_contract
input_cost_per_million: 4.25
output_cost_per_million: 22.0
cache_read_cost_per_million: 0.5
cache_write_cost_per_million: 6.0
included_routes:
- provider: copilot
model: "*"
- provider: codex-subscription
model: "*"
```
Overrides must win over catalog defaults for the matching billing route.
## Rollout Plan
### Phase 1
- add canonical usage model
- split cache token buckets in `run_agent.py`
- stop pricing cache-inflated prompt totals
- preserve current UI with improved backend math
### Phase 2
- add route-aware pricing catalog
- integrate OpenRouter models API sync
- add `estimated` vs `included` vs `unknown`
### Phase 3
- add reconciliation for OpenRouter generation cost
- add actual cost persistence
- update `/insights` to prefer actual cost
### Phase 4
- add direct OpenAI and Anthropic reconciliation paths
- add user overrides and contract pricing
- add pricing sync CLI command
## Testing Strategy
Add tests for:
- OpenAI cached token subtraction
- Anthropic cache read/write separation
- OpenRouter estimated vs actual reconciliation
- subscription-backed models showing `included`
- custom endpoints showing `n/a`
- override precedence
- stale catalog fallback behavior
Current tests that assume heuristic pricing should be replaced with route-aware expectations.
## Non-Goals
- exact enterprise billing reconstruction without an official source or user override
- backfilling perfect historical cost for old sessions that lack cache bucket data
- scraping arbitrary provider web pages at request time
## Recommendation
Do not expand the existing `MODEL_PRICING` dict.
That path cannot satisfy the product requirement. Hermes should instead migrate to:
- canonical usage normalization
- route-aware pricing sources
- estimate-then-reconcile cost lifecycle
- explicit certainty states in the UI
This is the minimum architecture that makes the statement "Hermes pricing is backed by official sources where possible, and otherwise clearly labeled" defensible.

View File

@@ -60,7 +60,7 @@ def check_dingtalk_requirements() -> bool:
"""Check if DingTalk dependencies are available and configured."""
if not DINGTALK_STREAM_AVAILABLE or not HTTPX_AVAILABLE:
return False
if not os.getenv("DINGTALK_CLIENT_ID") and not os.getenv("DINGTALK_CLIENT_SECRET"):
if not os.getenv("DINGTALK_CLIENT_ID") or not os.getenv("DINGTALK_CLIENT_SECRET"):
return False
return True

View File

@@ -984,6 +984,16 @@ class GatewayRunner:
):
self._schedule_update_notification_watch()
# Drain any recovered process watchers (from crash recovery checkpoint)
try:
from tools.process_registry import process_registry
while process_registry.pending_watchers:
watcher = process_registry.pending_watchers.pop(0)
asyncio.create_task(self._run_process_watcher(watcher))
logger.info("Resumed watcher for recovered process %s", watcher.get("session_id"))
except Exception as e:
logger.error("Recovered watcher setup error: %s", e)
# Start background session expiry watcher for proactive memory flushing
asyncio.create_task(self._session_expiry_watcher())
@@ -1542,8 +1552,9 @@ class GatewayRunner:
# Read privacy.redact_pii from config (re-read per message)
_redact_pii = False
try:
import yaml as _pii_yaml
with open(_config_path, encoding="utf-8") as _pf:
_pcfg = yaml.safe_load(_pf) or {}
_pcfg = _pii_yaml.safe_load(_pf) or {}
_redact_pii = bool((_pcfg.get("privacy") or {}).get("redact_pii", False))
except Exception:
pass
@@ -2089,8 +2100,15 @@ class GatewayRunner:
session_entry.session_key,
input_tokens=agent_result.get("input_tokens", 0),
output_tokens=agent_result.get("output_tokens", 0),
cache_read_tokens=agent_result.get("cache_read_tokens", 0),
cache_write_tokens=agent_result.get("cache_write_tokens", 0),
last_prompt_tokens=agent_result.get("last_prompt_tokens", 0),
model=agent_result.get("model"),
estimated_cost_usd=agent_result.get("estimated_cost_usd"),
cost_status=agent_result.get("cost_status"),
cost_source=agent_result.get("cost_source"),
provider=agent_result.get("provider"),
base_url=agent_result.get("base_url"),
)
# Auto voice reply: send TTS audio before the text response

View File

@@ -343,7 +343,11 @@ class SessionEntry:
# Token tracking
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
total_tokens: int = 0
estimated_cost_usd: float = 0.0
cost_status: str = "unknown"
# Last API-reported prompt tokens (for accurate compression pre-check)
last_prompt_tokens: int = 0
@@ -363,8 +367,12 @@ class SessionEntry:
"chat_type": self.chat_type,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"cache_read_tokens": self.cache_read_tokens,
"cache_write_tokens": self.cache_write_tokens,
"total_tokens": self.total_tokens,
"last_prompt_tokens": self.last_prompt_tokens,
"estimated_cost_usd": self.estimated_cost_usd,
"cost_status": self.cost_status,
}
if self.origin:
result["origin"] = self.origin.to_dict()
@@ -394,8 +402,12 @@ class SessionEntry:
chat_type=data.get("chat_type", "dm"),
input_tokens=data.get("input_tokens", 0),
output_tokens=data.get("output_tokens", 0),
cache_read_tokens=data.get("cache_read_tokens", 0),
cache_write_tokens=data.get("cache_write_tokens", 0),
total_tokens=data.get("total_tokens", 0),
last_prompt_tokens=data.get("last_prompt_tokens", 0),
estimated_cost_usd=data.get("estimated_cost_usd", 0.0),
cost_status=data.get("cost_status", "unknown"),
)
@@ -696,8 +708,15 @@ class SessionStore:
session_key: str,
input_tokens: int = 0,
output_tokens: int = 0,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
last_prompt_tokens: int = None,
model: str = None,
estimated_cost_usd: Optional[float] = None,
cost_status: Optional[str] = None,
cost_source: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
) -> None:
"""Update a session's metadata after an interaction."""
self._ensure_loaded()
@@ -707,15 +726,35 @@ class SessionStore:
entry.updated_at = datetime.now()
entry.input_tokens += input_tokens
entry.output_tokens += output_tokens
entry.cache_read_tokens += cache_read_tokens
entry.cache_write_tokens += cache_write_tokens
if last_prompt_tokens is not None:
entry.last_prompt_tokens = last_prompt_tokens
entry.total_tokens = entry.input_tokens + entry.output_tokens
if estimated_cost_usd is not None:
entry.estimated_cost_usd += estimated_cost_usd
if cost_status:
entry.cost_status = cost_status
entry.total_tokens = (
entry.input_tokens
+ entry.output_tokens
+ entry.cache_read_tokens
+ entry.cache_write_tokens
)
self._save()
if self._db:
try:
self._db.update_token_counts(
entry.session_id, input_tokens, output_tokens,
entry.session_id,
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_write_tokens=cache_write_tokens,
estimated_cost_usd=estimated_cost_usd,
cost_status=cost_status,
cost_source=cost_source,
billing_provider=provider,
billing_base_url=base_url,
model=model,
)
except Exception as e:

View File

@@ -550,6 +550,14 @@ OPTIONAL_ENV_VARS = {
},
# ── Tool API keys ──
"PARALLEL_API_KEY": {
"description": "Parallel API key for AI-native web search and extract",
"prompt": "Parallel API key",
"url": "https://parallel.ai/",
"tools": ["web_search", "web_extract"],
"password": True,
"category": "tool",
},
"FIRECRAWL_API_KEY": {
"description": "Firecrawl API key for web search and scraping",
"prompt": "Firecrawl API key",
@@ -1506,6 +1514,7 @@ def show_config():
keys = [
("OPENROUTER_API_KEY", "OpenRouter"),
("VOICE_TOOLS_OPENAI_KEY", "OpenAI (STT/TTS)"),
("PARALLEL_API_KEY", "Parallel"),
("FIRECRAWL_API_KEY", "Firecrawl"),
("BROWSERBASE_API_KEY", "Browserbase"),
("BROWSER_USE_API_KEY", "Browser Use"),
@@ -1655,7 +1664,7 @@ def set_config_value(key: str, value: str):
# Check if it's an API key (goes to .env)
api_keys = [
'OPENROUTER_API_KEY', 'OPENAI_API_KEY', 'ANTHROPIC_API_KEY', 'VOICE_TOOLS_OPENAI_KEY',
'FIRECRAWL_API_KEY', 'FIRECRAWL_API_URL', 'BROWSERBASE_API_KEY', 'BROWSERBASE_PROJECT_ID', 'BROWSER_USE_API_KEY',
'PARALLEL_API_KEY', 'FIRECRAWL_API_KEY', 'FIRECRAWL_API_URL', 'BROWSERBASE_API_KEY', 'BROWSERBASE_PROJECT_ID', 'BROWSER_USE_API_KEY',
'FAL_KEY', 'TELEGRAM_BOT_TOKEN', 'DISCORD_BOT_TOKEN',
'TERMINAL_SSH_HOST', 'TERMINAL_SSH_USER', 'TERMINAL_SSH_KEY',
'SUDO_PASSWORD', 'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN',

View File

@@ -473,7 +473,7 @@ def provider_model_ids(provider: Optional[str]) -> list[str]:
from hermes_cli.auth import fetch_nous_models, resolve_nous_runtime_credentials
creds = resolve_nous_runtime_credentials()
if creds:
live = fetch_nous_models(creds.get("api_key", ""), creds.get("base_url", ""))
live = fetch_nous_models(api_key=creds.get("api_key", ""), inference_base_url=creds.get("base_url", ""))
if live:
return live
except Exception:

View File

@@ -444,11 +444,11 @@ def _print_setup_summary(config: dict, hermes_home):
else:
tool_status.append(("Mixture of Agents", False, "OPENROUTER_API_KEY"))
# Firecrawl (web tools)
if get_env_value("FIRECRAWL_API_KEY") or get_env_value("FIRECRAWL_API_URL"):
# Web tools (Parallel or Firecrawl)
if get_env_value("PARALLEL_API_KEY") or get_env_value("FIRECRAWL_API_KEY") or get_env_value("FIRECRAWL_API_URL"):
tool_status.append(("Web Search & Extract", True, None))
else:
tool_status.append(("Web Search & Extract", False, "FIRECRAWL_API_KEY"))
tool_status.append(("Web Search & Extract", False, "PARALLEL_API_KEY or FIRECRAWL_API_KEY"))
# Browser tools (local Chromium or Browserbase cloud)
import shutil

View File

@@ -151,19 +151,29 @@ TOOL_CATEGORIES = {
"web": {
"name": "Web Search & Extract",
"setup_title": "Select Search Provider",
"setup_note": "A free DuckDuckGo search skill is also included — skip this if you don't need Firecrawl.",
"setup_note": "A free DuckDuckGo search skill is also included — skip this if you don't need a premium provider.",
"icon": "🔍",
"providers": [
{
"name": "Firecrawl Cloud",
"tag": "Recommended - hosted service",
"tag": "Hosted service - search, extract, and crawl",
"web_backend": "firecrawl",
"env_vars": [
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
],
},
{
"name": "Parallel",
"tag": "AI-native search and extract",
"web_backend": "parallel",
"env_vars": [
{"key": "PARALLEL_API_KEY", "prompt": "Parallel API key", "url": "https://parallel.ai"},
],
},
{
"name": "Firecrawl Self-Hosted",
"tag": "Free - run your own instance",
"web_backend": "firecrawl",
"env_vars": [
{"key": "FIRECRAWL_API_URL", "prompt": "Your Firecrawl instance URL (e.g., http://localhost:3002)"},
],
@@ -618,6 +628,9 @@ def _is_provider_active(provider: dict, config: dict) -> bool:
if "browser_provider" in provider:
current = config.get("browser", {}).get("cloud_provider")
return provider["browser_provider"] == current
if provider.get("web_backend"):
current = config.get("web", {}).get("backend")
return current == provider["web_backend"]
return False
@@ -650,6 +663,11 @@ def _configure_provider(provider: dict, config: dict):
else:
config.get("browser", {}).pop("cloud_provider", None)
# Set web search backend in config if applicable
if provider.get("web_backend"):
config.setdefault("web", {})["backend"] = provider["web_backend"]
_print_success(f" Web backend set to: {provider['web_backend']}")
if not env_vars:
_print_success(f" {provider['name']} - no configuration needed!")
return
@@ -985,12 +1003,19 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
if len(platform_keys) > 1:
platform_choices.append("Configure all platforms (global)")
platform_choices.append("Reconfigure an existing tool's provider or API key")
# Show MCP option if any MCP servers are configured
_has_mcp = bool(config.get("mcp_servers"))
if _has_mcp:
platform_choices.append("Configure MCP server tools")
platform_choices.append("Done")
# Index offsets for the extra options after per-platform entries
_global_idx = len(platform_keys) if len(platform_keys) > 1 else -1
_reconfig_idx = len(platform_keys) + (1 if len(platform_keys) > 1 else 0)
_done_idx = _reconfig_idx + 1
_mcp_idx = (_reconfig_idx + 1) if _has_mcp else -1
_done_idx = _reconfig_idx + (2 if _has_mcp else 1)
while True:
idx = _prompt_choice("Select an option:", platform_choices, default=0)
@@ -1005,6 +1030,12 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
print()
continue
# "Configure MCP tools" selected
if idx == _mcp_idx:
_configure_mcp_tools_interactive(config)
print()
continue
# "Configure all platforms (global)" selected
if idx == _global_idx:
# Use the union of all platforms' current tools as the starting state
@@ -1091,6 +1122,137 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
print()
# ─── MCP Tools Interactive Configuration ─────────────────────────────────────
def _configure_mcp_tools_interactive(config: dict):
"""Probe MCP servers for available tools and let user toggle them on/off.
Connects to each configured MCP server, discovers tools, then shows
a per-server curses checklist. Writes changes back as ``tools.exclude``
entries in config.yaml.
"""
from hermes_cli.curses_ui import curses_checklist
mcp_servers = config.get("mcp_servers") or {}
if not mcp_servers:
_print_info("No MCP servers configured.")
return
# Count enabled servers
enabled_names = [
k for k, v in mcp_servers.items()
if v.get("enabled", True) not in (False, "false", "0", "no", "off")
]
if not enabled_names:
_print_info("All MCP servers are disabled.")
return
print()
print(color(" Discovering tools from MCP servers...", Colors.YELLOW))
print(color(f" Connecting to {len(enabled_names)} server(s): {', '.join(enabled_names)}", Colors.DIM))
try:
from tools.mcp_tool import probe_mcp_server_tools
server_tools = probe_mcp_server_tools()
except Exception as exc:
_print_error(f"Failed to probe MCP servers: {exc}")
return
if not server_tools:
_print_warning("Could not discover tools from any MCP server.")
_print_info("Check that server commands/URLs are correct and dependencies are installed.")
return
# Report discovery results
failed = [n for n in enabled_names if n not in server_tools]
if failed:
for name in failed:
_print_warning(f" Could not connect to '{name}'")
total_tools = sum(len(tools) for tools in server_tools.values())
print(color(f" Found {total_tools} tool(s) across {len(server_tools)} server(s)", Colors.GREEN))
print()
any_changes = False
for server_name, tools in server_tools.items():
if not tools:
_print_info(f" {server_name}: no tools found")
continue
srv_cfg = mcp_servers.get(server_name, {})
tools_cfg = srv_cfg.get("tools") or {}
include_list = tools_cfg.get("include") or []
exclude_list = tools_cfg.get("exclude") or []
# Build checklist labels
labels = []
for tool_name, description in tools:
desc_short = description[:70] + "..." if len(description) > 70 else description
if desc_short:
labels.append(f"{tool_name} ({desc_short})")
else:
labels.append(tool_name)
# Determine which tools are currently enabled
pre_selected: Set[int] = set()
tool_names = [t[0] for t in tools]
for i, tool_name in enumerate(tool_names):
if include_list:
# Include mode: only included tools are selected
if tool_name in include_list:
pre_selected.add(i)
elif exclude_list:
# Exclude mode: everything except excluded
if tool_name not in exclude_list:
pre_selected.add(i)
else:
# No filter: all enabled
pre_selected.add(i)
chosen = curses_checklist(
f"MCP Server: {server_name} ({len(tools)} tools)",
labels,
pre_selected,
cancel_returns=pre_selected,
)
if chosen == pre_selected:
_print_info(f" {server_name}: no changes")
continue
# Compute new exclude list based on unchecked tools
new_exclude = [tool_names[i] for i in range(len(tool_names)) if i not in chosen]
# Update config
srv_cfg = mcp_servers.setdefault(server_name, {})
tools_cfg = srv_cfg.setdefault("tools", {})
if new_exclude:
tools_cfg["exclude"] = new_exclude
# Remove include if present — we're switching to exclude mode
tools_cfg.pop("include", None)
else:
# All tools enabled — clear filters
tools_cfg.pop("exclude", None)
tools_cfg.pop("include", None)
enabled_count = len(chosen)
disabled_count = len(tools) - enabled_count
_print_success(
f" {server_name}: {enabled_count} enabled, {disabled_count} disabled"
)
any_changes = True
if any_changes:
save_config(config)
print()
print(color(" ✓ MCP tool configuration saved", Colors.GREEN))
else:
print(color(" No changes to MCP tools", Colors.DIM))
# ─── Non-interactive disable/enable ──────────────────────────────────────────

View File

@@ -26,7 +26,7 @@ from typing import Dict, Any, List, Optional
DEFAULT_DB_PATH = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "state.db"
SCHEMA_VERSION = 4
SCHEMA_VERSION = 5
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -48,6 +48,17 @@ CREATE TABLE IF NOT EXISTS sessions (
tool_call_count INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
billing_provider TEXT,
billing_base_url TEXT,
billing_mode TEXT,
estimated_cost_usd REAL,
actual_cost_usd REAL,
cost_status TEXT,
cost_source TEXT,
pricing_version TEXT,
title TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -154,6 +165,26 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Index already exists
cursor.execute("UPDATE schema_version SET version = 4")
if current_version < 5:
new_columns = [
("cache_read_tokens", "INTEGER DEFAULT 0"),
("cache_write_tokens", "INTEGER DEFAULT 0"),
("reasoning_tokens", "INTEGER DEFAULT 0"),
("billing_provider", "TEXT"),
("billing_base_url", "TEXT"),
("billing_mode", "TEXT"),
("estimated_cost_usd", "REAL"),
("actual_cost_usd", "REAL"),
("cost_status", "TEXT"),
("cost_source", "TEXT"),
("pricing_version", "TEXT"),
]
for name, column_type in new_columns:
try:
cursor.execute(f"ALTER TABLE sessions ADD COLUMN {name} {column_type}")
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 5")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -233,8 +264,22 @@ class SessionDB:
self._conn.commit()
def update_token_counts(
self, session_id: str, input_tokens: int = 0, output_tokens: int = 0,
self,
session_id: str,
input_tokens: int = 0,
output_tokens: int = 0,
model: str = None,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
reasoning_tokens: int = 0,
estimated_cost_usd: Optional[float] = None,
actual_cost_usd: Optional[float] = None,
cost_status: Optional[str] = None,
cost_source: Optional[str] = None,
pricing_version: Optional[str] = None,
billing_provider: Optional[str] = None,
billing_base_url: Optional[str] = None,
billing_mode: Optional[str] = None,
) -> None:
"""Increment token counters and backfill model if not already set."""
with self._lock:
@@ -242,9 +287,40 @@ class SessionDB:
"""UPDATE sessions SET
input_tokens = input_tokens + ?,
output_tokens = output_tokens + ?,
cache_read_tokens = cache_read_tokens + ?,
cache_write_tokens = cache_write_tokens + ?,
reasoning_tokens = reasoning_tokens + ?,
estimated_cost_usd = COALESCE(estimated_cost_usd, 0) + COALESCE(?, 0),
actual_cost_usd = CASE
WHEN ? IS NULL THEN actual_cost_usd
ELSE COALESCE(actual_cost_usd, 0) + ?
END,
cost_status = COALESCE(?, cost_status),
cost_source = COALESCE(?, cost_source),
pricing_version = COALESCE(?, pricing_version),
billing_provider = COALESCE(billing_provider, ?),
billing_base_url = COALESCE(billing_base_url, ?),
billing_mode = COALESCE(billing_mode, ?),
model = COALESCE(model, ?)
WHERE id = ?""",
(input_tokens, output_tokens, model, session_id),
(
input_tokens,
output_tokens,
cache_read_tokens,
cache_write_tokens,
reasoning_tokens,
estimated_cost_usd,
actual_cost_usd,
actual_cost_usd,
cost_status,
cost_source,
pricing_version,
billing_provider,
billing_base_url,
billing_mode,
model,
session_id,
),
)
self._conn.commit()
@@ -733,17 +809,18 @@ class SessionDB:
offset: int = 0,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
@@ -795,26 +872,28 @@ class SessionDB:
def clear_messages(self, session_id: str) -> None:
"""Delete all messages for a session and reset its counters."""
self._conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
self._conn.execute(
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
(session_id,),
)
self._conn.commit()
with self._lock:
self._conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
self._conn.execute(
"UPDATE sessions SET message_count = 0, tool_call_count = 0 WHERE id = ?",
(session_id,),
)
self._conn.commit()
def delete_session(self, session_id: str) -> bool:
"""Delete a session and all its messages. Returns True if found."""
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
)
if cursor.fetchone()[0] == 0:
return False
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
self._conn.commit()
return True
with self._lock:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE id = ?", (session_id,)
)
if cursor.fetchone()[0] == 0:
return False
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
self._conn.commit()
return True
def prune_sessions(self, older_than_days: int = 90, source: str = None) -> int:
"""
@@ -824,22 +903,23 @@ class SessionDB:
import time as _time
cutoff = _time.time() - (older_than_days * 86400)
if source:
cursor = self._conn.execute(
"""SELECT id FROM sessions
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
(cutoff,),
)
session_ids = [row["id"] for row in cursor.fetchall()]
with self._lock:
if source:
cursor = self._conn.execute(
"""SELECT id FROM sessions
WHERE started_at < ? AND ended_at IS NOT NULL AND source = ?""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"SELECT id FROM sessions WHERE started_at < ? AND ended_at IS NOT NULL",
(cutoff,),
)
session_ids = [row["id"] for row in cursor.fetchall()]
for sid in session_ids:
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
for sid in session_ids:
self._conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
self._conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
self._conn.commit()
self._conn.commit()
return len(session_ids)

View File

@@ -27,6 +27,7 @@ dependencies = [
"prompt_toolkit",
# Tools
"firecrawl-py",
"parallel-web>=0.4.2",
"fal-client",
# Text-to-speech (Edge TTS is free, no API key needed)
"edge-tts",

View File

@@ -18,6 +18,7 @@ PyJWT[crypto]
# Web tools
firecrawl-py
parallel-web>=0.4.2
# Image generation
fal-client

View File

@@ -86,6 +86,7 @@ from agent.model_metadata import (
from agent.context_compressor import ContextCompressor
from agent.prompt_caching import apply_anthropic_cache_control
from agent.prompt_builder import build_skills_system_prompt, build_context_files_prompt
from agent.usage_pricing import estimate_usage_cost, normalize_usage
from agent.display import (
KawaiiSpinner, build_tool_preview as _build_tool_preview,
get_cute_tool_message as _get_cute_tool_message_impl,
@@ -391,6 +392,15 @@ class AIAgent:
else:
self.api_mode = "chat_completions"
# Pre-warm OpenRouter model metadata cache in a background thread.
# fetch_model_metadata() is cached for 1 hour; this avoids a blocking
# HTTP request on the first API response when pricing is estimated.
if self.provider == "openrouter" or "openrouter" in self.base_url.lower():
threading.Thread(
target=lambda: fetch_model_metadata(),
daemon=True,
).start()
self.tool_progress_callback = tool_progress_callback
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
@@ -457,8 +467,8 @@ class AIAgent:
and Path(getattr(handler, "baseFilename", "")).resolve() == resolved_error_log_path
for handler in root_logger.handlers
)
from agent.redact import RedactingFormatter
if not has_errors_log_handler:
from agent.redact import RedactingFormatter
error_log_dir.mkdir(parents=True, exist_ok=True)
error_file_handler = RotatingFileHandler(
error_log_path, maxBytes=2 * 1024 * 1024, backupCount=2,
@@ -850,6 +860,14 @@ class AIAgent:
self.session_completion_tokens = 0
self.session_total_tokens = 0
self.session_api_calls = 0
self.session_input_tokens = 0
self.session_output_tokens = 0
self.session_cache_read_tokens = 0
self.session_cache_write_tokens = 0
self.session_reasoning_tokens = 0
self.session_estimated_cost_usd = 0.0
self.session_cost_status = "unknown"
self.session_cost_source = "none"
if not self.quiet_mode:
if compression_enabled:
@@ -5272,26 +5290,14 @@ class AIAgent:
# Track actual token usage from response for context management
if hasattr(response, 'usage') and response.usage:
if self.api_mode in ("codex_responses", "anthropic_messages"):
prompt_tokens = getattr(response.usage, 'input_tokens', 0) or 0
if self.api_mode == "anthropic_messages":
# Anthropic splits input into cache_read + cache_creation
# + non-cached input_tokens. Without adding the cached
# portions, the context bar shows only the tiny non-cached
# portion (e.g. 3 tokens) instead of the real total (~18K).
# Other providers (OpenAI/Codex) already include cached
# tokens in their input_tokens/prompt_tokens field.
prompt_tokens += getattr(response.usage, 'cache_read_input_tokens', 0) or 0
prompt_tokens += getattr(response.usage, 'cache_creation_input_tokens', 0) or 0
completion_tokens = getattr(response.usage, 'output_tokens', 0) or 0
total_tokens = (
getattr(response.usage, 'total_tokens', None)
or (prompt_tokens + completion_tokens)
)
else:
prompt_tokens = getattr(response.usage, 'prompt_tokens', 0) or 0
completion_tokens = getattr(response.usage, 'completion_tokens', 0) or 0
total_tokens = getattr(response.usage, 'total_tokens', 0) or 0
canonical_usage = normalize_usage(
response.usage,
provider=self.provider,
api_mode=self.api_mode,
)
prompt_tokens = canonical_usage.prompt_tokens
completion_tokens = canonical_usage.output_tokens
total_tokens = canonical_usage.total_tokens
usage_dict = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
@@ -5310,6 +5316,22 @@ class AIAgent:
self.session_completion_tokens += completion_tokens
self.session_total_tokens += total_tokens
self.session_api_calls += 1
self.session_input_tokens += canonical_usage.input_tokens
self.session_output_tokens += canonical_usage.output_tokens
self.session_cache_read_tokens += canonical_usage.cache_read_tokens
self.session_cache_write_tokens += canonical_usage.cache_write_tokens
self.session_reasoning_tokens += canonical_usage.reasoning_tokens
cost_result = estimate_usage_cost(
self.model,
canonical_usage,
provider=self.provider,
base_url=self.base_url,
)
if cost_result.amount_usd is not None:
self.session_estimated_cost_usd += float(cost_result.amount_usd)
self.session_cost_status = cost_result.status
self.session_cost_source = cost_result.source
# Persist token counts to session DB for /insights.
# Gateway sessions persist via session_store.update_session()
@@ -5320,8 +5342,19 @@ class AIAgent:
try:
self._session_db.update_token_counts(
self.session_id,
input_tokens=prompt_tokens,
output_tokens=completion_tokens,
input_tokens=canonical_usage.input_tokens,
output_tokens=canonical_usage.output_tokens,
cache_read_tokens=canonical_usage.cache_read_tokens,
cache_write_tokens=canonical_usage.cache_write_tokens,
reasoning_tokens=canonical_usage.reasoning_tokens,
estimated_cost_usd=float(cost_result.amount_usd)
if cost_result.amount_usd is not None else None,
cost_status=cost_result.status,
cost_source=cost_result.source,
billing_provider=self.provider,
billing_base_url=self.base_url,
billing_mode="subscription_included"
if cost_result.status == "included" else None,
model=self.model,
)
except Exception:
@@ -6242,6 +6275,21 @@ class AIAgent:
"partial": False, # True only when stopped due to invalid tool calls
"interrupted": interrupted,
"response_previewed": getattr(self, "_response_was_previewed", False),
"model": self.model,
"provider": self.provider,
"base_url": self.base_url,
"input_tokens": self.session_input_tokens,
"output_tokens": self.session_output_tokens,
"cache_read_tokens": self.session_cache_read_tokens,
"cache_write_tokens": self.session_cache_write_tokens,
"reasoning_tokens": self.session_reasoning_tokens,
"prompt_tokens": self.session_prompt_tokens,
"completion_tokens": self.session_completion_tokens,
"total_tokens": self.session_total_tokens,
"last_prompt_tokens": getattr(self.context_compressor, "last_prompt_tokens", 0) or 0,
"estimated_cost_usd": self.session_estimated_cost_usd,
"cost_status": self.session_cost_status,
"cost_source": self.session_cost_source,
}
self._response_was_previewed = False

View File

@@ -112,9 +112,14 @@ class TestDefaultContextLengths:
def test_gpt4_models_128k(self):
for key, value in DEFAULT_CONTEXT_LENGTHS.items():
if "gpt-4" in key:
if "gpt-4" in key and "gpt-4.1" not in key:
assert value == 128000, f"{key} should be 128000"
def test_gpt41_models_1m(self):
for key, value in DEFAULT_CONTEXT_LENGTHS.items():
if "gpt-4.1" in key:
assert value == 1047576, f"{key} should be 1047576"
def test_gemini_models_1m(self):
for key, value in DEFAULT_CONTEXT_LENGTHS.items():
if "gemini" in key:

View File

@@ -0,0 +1,101 @@
from types import SimpleNamespace
from agent.usage_pricing import (
CanonicalUsage,
estimate_usage_cost,
get_pricing_entry,
normalize_usage,
)
def test_normalize_usage_anthropic_keeps_cache_buckets_separate():
usage = SimpleNamespace(
input_tokens=1000,
output_tokens=500,
cache_read_input_tokens=2000,
cache_creation_input_tokens=400,
)
normalized = normalize_usage(usage, provider="anthropic", api_mode="anthropic_messages")
assert normalized.input_tokens == 1000
assert normalized.output_tokens == 500
assert normalized.cache_read_tokens == 2000
assert normalized.cache_write_tokens == 400
assert normalized.prompt_tokens == 3400
def test_normalize_usage_openai_subtracts_cached_prompt_tokens():
usage = SimpleNamespace(
prompt_tokens=3000,
completion_tokens=700,
prompt_tokens_details=SimpleNamespace(cached_tokens=1800),
)
normalized = normalize_usage(usage, provider="openai", api_mode="chat_completions")
assert normalized.input_tokens == 1200
assert normalized.cache_read_tokens == 1800
assert normalized.output_tokens == 700
def test_openrouter_models_api_pricing_is_converted_from_per_token_to_per_million(monkeypatch):
monkeypatch.setattr(
"agent.usage_pricing.fetch_model_metadata",
lambda: {
"anthropic/claude-opus-4.6": {
"pricing": {
"prompt": "0.000005",
"completion": "0.000025",
"input_cache_read": "0.0000005",
"input_cache_write": "0.00000625",
}
}
},
)
entry = get_pricing_entry(
"anthropic/claude-opus-4.6",
provider="openrouter",
base_url="https://openrouter.ai/api/v1",
)
assert float(entry.input_cost_per_million) == 5.0
assert float(entry.output_cost_per_million) == 25.0
assert float(entry.cache_read_cost_per_million) == 0.5
assert float(entry.cache_write_cost_per_million) == 6.25
def test_estimate_usage_cost_marks_subscription_routes_included():
result = estimate_usage_cost(
"gpt-5.3-codex",
CanonicalUsage(input_tokens=1000, output_tokens=500),
provider="openai-codex",
base_url="https://chatgpt.com/backend-api/codex",
)
assert result.status == "included"
assert float(result.amount_usd) == 0.0
def test_estimate_usage_cost_refuses_cache_pricing_without_official_cache_rate(monkeypatch):
monkeypatch.setattr(
"agent.usage_pricing.fetch_model_metadata",
lambda: {
"google/gemini-2.5-pro": {
"pricing": {
"prompt": "0.00000125",
"completion": "0.00001",
}
}
},
)
result = estimate_usage_cost(
"google/gemini-2.5-pro",
CanonicalUsage(input_tokens=1000, output_tokens=500, cache_read_tokens=100),
provider="openrouter",
base_url="https://openrouter.ai/api/v1",
)
assert result.status == "unknown"

View File

@@ -50,13 +50,16 @@ def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner:
return runner
def _watcher_dict(session_id="proc_test"):
return {
def _watcher_dict(session_id="proc_test", thread_id=""):
d = {
"session_id": session_id,
"check_interval": 0,
"platform": "telegram",
"chat_id": "123",
}
if thread_id:
d["thread_id"] = thread_id
return d
# ---------------------------------------------------------------------------
@@ -196,3 +199,47 @@ async def test_run_process_watcher_respects_notification_mode(
if expected_fragment is not None:
sent_message = adapter.send.await_args.args[1]
assert expected_fragment in sent_message
@pytest.mark.asyncio
async def test_thread_id_passed_to_send(monkeypatch, tmp_path):
"""thread_id from watcher dict is forwarded as metadata to adapter.send()."""
import tools.process_registry as pr_module
sessions = [SimpleNamespace(output_buffer="done\n", exited=True, exit_code=0)]
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
await runner._run_process_watcher(_watcher_dict(thread_id="42"))
assert adapter.send.await_count == 1
_, kwargs = adapter.send.call_args
assert kwargs["metadata"] == {"thread_id": "42"}
@pytest.mark.asyncio
async def test_no_thread_id_sends_no_metadata(monkeypatch, tmp_path):
"""When thread_id is empty, metadata should be None (general topic)."""
import tools.process_registry as pr_module
sessions = [SimpleNamespace(output_buffer="done\n", exited=True, exit_code=0)]
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
async def _instant_sleep(*_a, **_kw):
pass
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
runner = _build_runner(monkeypatch, tmp_path, "all")
adapter = runner.adapters[Platform.TELEGRAM]
await runner._run_process_watcher(_watcher_dict())
assert adapter.send.await_count == 1
_, kwargs = adapter.send.call_args
assert kwargs["metadata"] is None

View File

@@ -703,5 +703,15 @@ class TestLastPromptTokens:
store.update_session("k1", model="openai/gpt-5.4")
store._db.update_token_counts.assert_called_once_with(
"s1", 0, 0, model="openai/gpt-5.4"
"s1",
input_tokens=0,
output_tokens=0,
cache_read_tokens=0,
cache_write_tokens=0,
estimated_cost_usd=None,
cost_status=None,
cost_source=None,
billing_provider=None,
billing_base_url=None,
model="openai/gpt-5.4",
)

View File

@@ -128,6 +128,13 @@ async def test_handle_message_persists_agent_token_counts(monkeypatch):
session_entry.session_key,
input_tokens=120,
output_tokens=45,
cache_read_tokens=0,
cache_write_tokens=0,
last_prompt_tokens=80,
model="openai/test-model",
estimated_cost_usd=None,
cost_status=None,
cost_source=None,
provider=None,
base_url=None,
)

View File

@@ -0,0 +1,291 @@
"""Tests for MCP tools interactive configuration in hermes_cli.tools_config."""
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from hermes_cli.tools_config import _configure_mcp_tools_interactive
# Patch targets: imports happen inside the function body, so patch at source
_PROBE = "tools.mcp_tool.probe_mcp_server_tools"
_CHECKLIST = "hermes_cli.curses_ui.curses_checklist"
_SAVE = "hermes_cli.tools_config.save_config"
def test_no_mcp_servers_prints_info(capsys):
"""Returns immediately when no MCP servers are configured."""
config = {}
_configure_mcp_tools_interactive(config)
captured = capsys.readouterr()
assert "No MCP servers configured" in captured.out
def test_all_servers_disabled_prints_info(capsys):
"""Returns immediately when all configured servers have enabled=false."""
config = {
"mcp_servers": {
"github": {"command": "npx", "enabled": False},
"slack": {"command": "npx", "enabled": "false"},
}
}
_configure_mcp_tools_interactive(config)
captured = capsys.readouterr()
assert "disabled" in captured.out
def test_probe_failure_shows_warning(capsys):
"""Shows warning when probe returns no tools."""
config = {"mcp_servers": {"github": {"command": "npx"}}}
with patch(_PROBE, return_value={}):
_configure_mcp_tools_interactive(config)
captured = capsys.readouterr()
assert "Could not discover" in captured.out
def test_probe_exception_shows_error(capsys):
"""Shows error when probe raises an exception."""
config = {"mcp_servers": {"github": {"command": "npx"}}}
with patch(_PROBE, side_effect=RuntimeError("MCP not installed")):
_configure_mcp_tools_interactive(config)
captured = capsys.readouterr()
assert "Failed to probe" in captured.out
def test_no_changes_when_checklist_cancelled(capsys):
"""No config changes when user cancels (ESC) the checklist."""
config = {
"mcp_servers": {
"github": {"command": "npx", "args": ["-y", "server-github"]},
}
}
tools = [("create_issue", "Create an issue"), ("search_repos", "Search repos")]
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, return_value={0, 1}), \
patch(_SAVE) as mock_save:
_configure_mcp_tools_interactive(config)
mock_save.assert_not_called()
captured = capsys.readouterr()
assert "no changes" in captured.out.lower()
def test_disabling_tool_writes_exclude_list(capsys):
"""Unchecking a tool adds it to the exclude list."""
config = {
"mcp_servers": {
"github": {"command": "npx"},
}
}
tools = [
("create_issue", "Create an issue"),
("delete_repo", "Delete a repo"),
("search_repos", "Search repos"),
]
# User unchecks delete_repo (index 1)
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, return_value={0, 2}), \
patch(_SAVE) as mock_save:
_configure_mcp_tools_interactive(config)
mock_save.assert_called_once()
tools_cfg = config["mcp_servers"]["github"]["tools"]
assert tools_cfg["exclude"] == ["delete_repo"]
assert "include" not in tools_cfg
def test_enabling_all_clears_filters(capsys):
"""Checking all tools clears both include and exclude lists."""
config = {
"mcp_servers": {
"github": {
"command": "npx",
"tools": {"exclude": ["delete_repo"], "include": ["create_issue"]},
},
}
}
tools = [("create_issue", "Create"), ("delete_repo", "Delete")]
# User checks all tools — pre_selected would be {0} (include mode),
# so returning {0, 1} is a change
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, return_value={0, 1}), \
patch(_SAVE) as mock_save:
_configure_mcp_tools_interactive(config)
mock_save.assert_called_once()
tools_cfg = config["mcp_servers"]["github"]["tools"]
assert "exclude" not in tools_cfg
assert "include" not in tools_cfg
def test_pre_selection_respects_existing_exclude(capsys):
"""Tools in exclude list start unchecked."""
config = {
"mcp_servers": {
"github": {
"command": "npx",
"tools": {"exclude": ["delete_repo"]},
},
}
}
tools = [("create_issue", "Create"), ("delete_repo", "Delete"), ("search", "Search")]
captured_pre_selected = {}
def fake_checklist(title, labels, pre_selected, **kwargs):
captured_pre_selected["value"] = set(pre_selected)
return pre_selected # No changes
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, side_effect=fake_checklist), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
# create_issue (0) and search (2) should be pre-selected, delete_repo (1) should not
assert captured_pre_selected["value"] == {0, 2}
def test_pre_selection_respects_existing_include(capsys):
"""Only tools in include list start checked."""
config = {
"mcp_servers": {
"github": {
"command": "npx",
"tools": {"include": ["search"]},
},
}
}
tools = [("create_issue", "Create"), ("delete_repo", "Delete"), ("search", "Search")]
captured_pre_selected = {}
def fake_checklist(title, labels, pre_selected, **kwargs):
captured_pre_selected["value"] = set(pre_selected)
return pre_selected # No changes
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, side_effect=fake_checklist), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
# Only search (2) should be pre-selected
assert captured_pre_selected["value"] == {2}
def test_multiple_servers_each_get_checklist(capsys):
"""Each server gets its own checklist."""
config = {
"mcp_servers": {
"github": {"command": "npx"},
"slack": {"url": "https://mcp.example.com"},
}
}
checklist_calls = []
def fake_checklist(title, labels, pre_selected, **kwargs):
checklist_calls.append(title)
return pre_selected # No changes
with patch(
_PROBE,
return_value={
"github": [("create_issue", "Create")],
"slack": [("send_message", "Send")],
},
), patch(_CHECKLIST, side_effect=fake_checklist), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
assert len(checklist_calls) == 2
assert any("github" in t for t in checklist_calls)
assert any("slack" in t for t in checklist_calls)
def test_failed_server_shows_warning(capsys):
"""Servers that fail to connect show warnings."""
config = {
"mcp_servers": {
"github": {"command": "npx"},
"broken": {"command": "nonexistent"},
}
}
# Only github succeeds
with patch(
_PROBE, return_value={"github": [("create_issue", "Create")]},
), patch(_CHECKLIST, return_value={0}), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
captured = capsys.readouterr()
assert "broken" in captured.out
def test_description_truncation_in_labels():
"""Long descriptions are truncated in checklist labels."""
config = {
"mcp_servers": {
"github": {"command": "npx"},
}
}
long_desc = "A" * 100
captured_labels = {}
def fake_checklist(title, labels, pre_selected, **kwargs):
captured_labels["value"] = labels
return pre_selected
with patch(
_PROBE, return_value={"github": [("my_tool", long_desc)]},
), patch(_CHECKLIST, side_effect=fake_checklist), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
label = captured_labels["value"][0]
assert "..." in label
assert len(label) < len(long_desc) + 30 # truncated + tool name + parens
def test_switching_from_include_to_exclude(capsys):
"""When user modifies selection, include list is replaced by exclude list."""
config = {
"mcp_servers": {
"github": {
"command": "npx",
"tools": {"include": ["create_issue"]},
},
}
}
tools = [("create_issue", "Create"), ("search", "Search"), ("delete", "Delete")]
# User selects create_issue and search (deselects delete)
# pre_selected would be {0} (only create_issue from include), so {0, 1} is a change
with patch(_PROBE, return_value={"github": tools}), \
patch(_CHECKLIST, return_value={0, 1}), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
tools_cfg = config["mcp_servers"]["github"]["tools"]
assert tools_cfg["exclude"] == ["delete"]
assert "include" not in tools_cfg
def test_empty_tools_server_skipped(capsys):
"""Server with no tools shows info message and skips checklist."""
config = {
"mcp_servers": {
"empty": {"command": "npx"},
}
}
checklist_calls = []
def fake_checklist(title, labels, pre_selected, **kwargs):
checklist_calls.append(title)
return pre_selected
with patch(_PROBE, return_value={"empty": []}), \
patch(_CHECKLIST, side_effect=fake_checklist), \
patch(_SAVE):
_configure_mcp_tools_interactive(config)
assert len(checklist_calls) == 0
captured = capsys.readouterr()
assert "no tools found" in captured.out

View File

@@ -5,6 +5,13 @@ from hermes_cli.config import load_config, save_config
from hermes_cli.setup import setup_model_provider
def _maybe_keep_current_tts(question, choices):
if question != "Select TTS provider:":
return None
assert choices[-1].startswith("Keep current (")
return len(choices) - 1
def _clear_provider_env(monkeypatch):
for key in (
"NOUS_API_KEY",
@@ -25,16 +32,22 @@ def test_nous_oauth_setup_keeps_current_model_when_syncing_disk_provider(
config = load_config()
# Provider selection always comes first. Depending on available vision
# backends, setup may either skip the optional vision step or prompt for
# it before the default-model choice. Provide enough selections for both
# paths while still ending on "keep current model".
prompt_choices = iter([0, 2, 2])
monkeypatch.setattr(
"hermes_cli.setup.prompt_choice",
lambda *args, **kwargs: next(prompt_choices),
)
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
return 0
if question == "Configure vision:":
return len(choices) - 1
if question == "Select default model:":
assert choices[-1] == "Keep current (anthropic/claude-opus-4.6)"
return len(choices) - 1
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("hermes_cli.auth.detect_external_credentials", lambda: [])
def _fake_login_nous(*args, **kwargs):
auth_path = tmp_path / "auth.json"
@@ -74,20 +87,29 @@ def test_custom_setup_clears_active_oauth_provider(tmp_path, monkeypatch):
config = load_config()
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: 3)
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
return 3
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
prompt_values = iter(
[
"https://custom.example/v1",
"custom-api-key",
"custom/model",
"",
]
)
monkeypatch.setattr(
"hermes_cli.setup.prompt",
lambda *args, **kwargs: next(prompt_values),
)
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", lambda *args, **kwargs: False)
monkeypatch.setattr("hermes_cli.auth.detect_external_credentials", lambda: [])
setup_model_provider(config)
save_config(config)
@@ -109,11 +131,17 @@ def test_codex_setup_uses_runtime_access_token_for_live_model_list(tmp_path, mon
config = load_config()
prompt_choices = iter([1, 0])
monkeypatch.setattr(
"hermes_cli.setup.prompt_choice",
lambda *args, **kwargs: next(prompt_choices),
)
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
return 1
if question == "Select default model:":
return 0
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("hermes_cli.auth.detect_external_credentials", lambda: [])
monkeypatch.setattr("hermes_cli.auth._login_openai_codex", lambda *args, **kwargs: None)

View File

@@ -6,6 +6,13 @@ from hermes_cli.config import load_config, save_config, save_env_value
from hermes_cli.setup import _print_setup_summary, setup_model_provider
def _maybe_keep_current_tts(question, choices):
if question != "Select TTS provider:":
return None
assert choices[-1].startswith("Keep current (")
return len(choices) - 1
def _read_env(home):
env_path = home / ".env"
data = {}
@@ -50,13 +57,13 @@ def test_setup_keep_current_custom_from_config_does_not_fall_through(tmp_path, m
}
save_config(config)
calls = {"count": 0}
def fake_prompt_choice(question, choices, default=0):
calls["count"] += 1
if calls["count"] == 1:
if question == "Select your inference provider:":
assert choices[-1] == "Keep current (Custom: https://example.invalid/v1)"
return len(choices) - 1
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError("Model menu should not appear for keep-current custom")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
@@ -72,7 +79,6 @@ def test_setup_keep_current_custom_from_config_does_not_fall_through(tmp_path, m
assert reloaded["model"]["provider"] == "custom"
assert reloaded["model"]["default"] == "custom/model"
assert reloaded["model"]["base_url"] == "https://example.invalid/v1"
assert calls["count"] == 1
def test_setup_custom_endpoint_saves_working_v1_base_url(tmp_path, monkeypatch):
@@ -86,6 +92,9 @@ def test_setup_custom_endpoint_saves_working_v1_base_url(tmp_path, monkeypatch):
return 3 # Custom endpoint
if question == "Configure vision:":
return len(choices) - 1 # Skip
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
def fake_prompt(message, current=None, **kwargs):
@@ -140,22 +149,23 @@ def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(tm
save_config(config)
captured = {"provider_choices": None, "model_choices": None}
calls = {"count": 0}
def fake_prompt_choice(question, choices, default=0):
calls["count"] += 1
if calls["count"] == 1:
if question == "Select your inference provider:":
captured["provider_choices"] = list(choices)
assert choices[-1] == "Keep current (Anthropic)"
return len(choices) - 1
if calls["count"] == 2:
if question == "Configure vision:":
assert question == "Configure vision:"
assert choices[-1] == "Skip for now"
return len(choices) - 1
if calls["count"] == 3:
if question == "Select default model:":
captured["model_choices"] = list(choices)
return len(choices) - 1 # keep current model
raise AssertionError("Unexpected extra prompt_choice call")
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
@@ -172,7 +182,6 @@ def test_setup_keep_current_config_provider_uses_provider_specific_model_menu(tm
assert captured["model_choices"] is not None
assert captured["model_choices"][0] == "claude-opus-4-6"
assert "anthropic/claude-opus-4.6 (recommended)" not in captured["model_choices"]
assert calls["count"] == 3
def test_setup_keep_current_anthropic_can_configure_openai_vision_default(tmp_path, monkeypatch):
@@ -186,14 +195,24 @@ def test_setup_keep_current_anthropic_can_configure_openai_vision_default(tmp_pa
}
save_config(config)
picks = iter([
10, # keep current provider (shifted +1 by kilocode insertion)
1, # configure vision with OpenAI
5, # use default gpt-4o-mini vision model
4, # keep current Anthropic model
])
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
assert choices[-1] == "Keep current (Anthropic)"
return len(choices) - 1
if question == "Configure vision:":
return 1
if question == "Select vision model:":
assert choices[-1] == "Use default (gpt-4o-mini)"
return len(choices) - 1
if question == "Select default model:":
assert choices[-1] == "Keep current (claude-opus-4-6)"
return len(choices) - 1
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: next(picks))
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr(
"hermes_cli.setup.prompt",
lambda message, *args, **kwargs: "sk-openai" if "OpenAI API key" in message else "",
@@ -229,8 +248,17 @@ def test_setup_switch_custom_to_codex_clears_custom_endpoint_and_updates_config(
}
save_config(config)
picks = iter([1, 0])
monkeypatch.setattr("hermes_cli.setup.prompt_choice", lambda *args, **kwargs: next(picks))
def fake_prompt_choice(question, choices, default=0):
if question == "Select your inference provider:":
return 1
if question == "Select default model:":
return 0
tts_idx = _maybe_keep_current_tts(question, choices)
if tts_idx is not None:
return tts_idx
raise AssertionError(f"Unexpected prompt_choice call: {question}")
monkeypatch.setattr("hermes_cli.setup.prompt_choice", fake_prompt_choice)
monkeypatch.setattr("hermes_cli.setup.prompt", lambda *args, **kwargs: "")
monkeypatch.setattr("hermes_cli.setup.prompt_yes_no", lambda *args, **kwargs: False)
monkeypatch.setattr("hermes_cli.auth.get_active_provider", lambda: None)

View File

@@ -63,11 +63,13 @@ class TestFromEnv:
class TestFromGlobalConfig:
def test_missing_config_falls_back_to_env(self, tmp_path):
config = HonchoClientConfig.from_global_config(
config_path=tmp_path / "nonexistent.json"
)
with patch.dict(os.environ, {}, clear=True):
config = HonchoClientConfig.from_global_config(
config_path=tmp_path / "nonexistent.json"
)
# Should fall back to from_env
assert config.enabled is True or config.api_key is None # depends on env
assert config.enabled is False
assert config.api_key is None
def test_reads_full_config(self, tmp_path):
config_file = tmp_path / "config.json"

View File

@@ -3,7 +3,7 @@
Comprehensive Test Suite for Web Tools Module
This script tests all web tools functionality to ensure they work correctly.
Run this after any updates to the web_tools.py module or Firecrawl library.
Run this after any updates to the web_tools.py module or backend libraries.
Usage:
python test_web_tools.py # Run all tests
@@ -11,7 +11,7 @@ Usage:
python test_web_tools.py --verbose # Show detailed output
Requirements:
- FIRECRAWL_API_KEY environment variable must be set
- PARALLEL_API_KEY or FIRECRAWL_API_KEY environment variable must be set
- An auxiliary LLM provider (OPENROUTER_API_KEY or Nous Portal auth) (optional, for LLM tests)
"""
@@ -28,12 +28,14 @@ from typing import List
# Import the web tools to test (updated path after moving tools/)
from tools.web_tools import (
web_search_tool,
web_extract_tool,
web_search_tool,
web_extract_tool,
web_crawl_tool,
check_firecrawl_api_key,
check_web_api_key,
check_auxiliary_model,
get_debug_session_info
get_debug_session_info,
_get_backend,
)
@@ -121,12 +123,13 @@ class WebToolsTester:
"""Test environment setup and API keys"""
print_section("Environment Check")
# Check Firecrawl API key
if not check_firecrawl_api_key():
self.log_result("Firecrawl API Key", "failed", "FIRECRAWL_API_KEY not set")
# Check web backend API key (Parallel or Firecrawl)
if not check_web_api_key():
self.log_result("Web Backend API Key", "failed", "PARALLEL_API_KEY or FIRECRAWL_API_KEY not set")
return False
else:
self.log_result("Firecrawl API Key", "passed", "Found")
backend = _get_backend()
self.log_result("Web Backend API Key", "passed", f"Using {backend} backend")
# Check auxiliary LLM provider (optional)
if not check_auxiliary_model():
@@ -578,7 +581,9 @@ class WebToolsTester:
},
"results": self.test_results,
"environment": {
"web_backend": _get_backend() if check_web_api_key() else None,
"firecrawl_api_key": check_firecrawl_api_key(),
"parallel_api_key": bool(os.getenv("PARALLEL_API_KEY")),
"auxiliary_model": check_auxiliary_model(),
"debug_mode": get_debug_session_info()["enabled"]
}

View File

@@ -98,11 +98,14 @@ class TestProviderRegistry:
# =============================================================================
PROVIDER_ENV_VARS = (
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY",
"OPENROUTER_API_KEY", "OPENAI_API_KEY", "ANTHROPIC_API_KEY", "ANTHROPIC_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN",
"GLM_API_KEY", "ZAI_API_KEY", "Z_AI_API_KEY",
"KIMI_API_KEY", "KIMI_BASE_URL", "MINIMAX_API_KEY", "MINIMAX_CN_API_KEY",
"AI_GATEWAY_API_KEY", "AI_GATEWAY_BASE_URL",
"KILOCODE_API_KEY", "KILOCODE_BASE_URL",
"DASHSCOPE_API_KEY", "OPENCODE_ZEN_API_KEY", "OPENCODE_GO_API_KEY",
"NOUS_API_KEY",
"OPENAI_BASE_URL",
)
@@ -111,6 +114,7 @@ PROVIDER_ENV_VARS = (
def _clear_provider_env(monkeypatch):
for key in PROVIDER_ENV_VARS:
monkeypatch.delenv(key, raising=False)
monkeypatch.setattr("hermes_cli.auth._load_auth_store", lambda: {})
class TestResolveProvider:

View File

@@ -16,6 +16,10 @@ def _make_cli(model: str = "anthropic/claude-sonnet-4-20250514"):
def _attach_agent(
cli_obj,
*,
input_tokens: int | None = None,
output_tokens: int | None = None,
cache_read_tokens: int = 0,
cache_write_tokens: int = 0,
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
@@ -26,6 +30,12 @@ def _attach_agent(
):
cli_obj.agent = SimpleNamespace(
model=cli_obj.model,
provider="anthropic" if cli_obj.model.startswith("anthropic/") else None,
base_url="",
session_input_tokens=input_tokens if input_tokens is not None else prompt_tokens,
session_output_tokens=output_tokens if output_tokens is not None else completion_tokens,
session_cache_read_tokens=cache_read_tokens,
session_cache_write_tokens=cache_write_tokens,
session_prompt_tokens=prompt_tokens,
session_completion_tokens=completion_tokens,
session_total_tokens=total_tokens,
@@ -68,20 +78,19 @@ class TestCLIStatusBar:
assert "$0.06" not in text # cost hidden by default
assert "15m" in text
def test_build_status_bar_text_shows_cost_when_enabled(self):
def test_build_status_bar_text_no_cost_in_status_bar(self):
cli_obj = _attach_agent(
_make_cli(),
prompt_tokens=10000,
completion_tokens=2400,
total_tokens=12400,
completion_tokens=5000,
total_tokens=15000,
api_calls=7,
context_tokens=12400,
context_tokens=50000,
context_length=200_000,
)
cli_obj.show_cost = True
text = cli_obj._build_status_bar_text(width=120)
assert "$" in text # cost is shown when enabled
assert "$" not in text # cost is never shown in status bar
def test_build_status_bar_text_collapses_for_narrow_terminal(self):
cli_obj = _attach_agent(
@@ -128,8 +137,8 @@ class TestCLIUsageReport:
output = capsys.readouterr().out
assert "Model:" in output
assert "Input cost:" in output
assert "Output cost:" in output
assert "Cost status:" in output
assert "Cost source:" in output
assert "Total cost:" in output
assert "$" in output
assert "0.064" in output

View File

@@ -657,7 +657,7 @@ class TestSchemaInit:
def test_schema_version(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version")
version = cursor.fetchone()[0]
assert version == 4
assert version == 5
def test_title_column_exists(self, db):
"""Verify the title column was created in the sessions table."""
@@ -713,12 +713,12 @@ class TestSchemaInit:
conn.commit()
conn.close()
# Open with SessionDB — should migrate to v4
# Open with SessionDB — should migrate to v5
migrated_db = SessionDB(db_path=db_path)
# Verify migration
cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 4
assert cursor.fetchone()[0] == 5
# Verify title column exists and is NULL for existing sessions
session = migrated_db.get_session("existing")

View File

@@ -123,28 +123,16 @@ def populated_db(db):
# =========================================================================
class TestPricing:
def test_exact_match(self):
pricing = _get_pricing("gpt-4o")
assert pricing["input"] == 2.50
assert pricing["output"] == 10.00
def test_provider_prefix_stripped(self):
pricing = _get_pricing("anthropic/claude-sonnet-4-20250514")
assert pricing["input"] == 3.00
assert pricing["output"] == 15.00
def test_prefix_match(self):
pricing = _get_pricing("claude-3-5-sonnet-20241022")
assert pricing["input"] == 3.00
def test_keyword_heuristic_opus(self):
def test_unknown_models_do_not_use_heuristics(self):
pricing = _get_pricing("some-new-opus-model")
assert pricing["input"] == 15.00
assert pricing["output"] == 75.00
def test_keyword_heuristic_haiku(self):
assert pricing == _DEFAULT_PRICING
pricing = _get_pricing("anthropic/claude-haiku-future")
assert pricing["input"] == 0.80
assert pricing == _DEFAULT_PRICING
def test_unknown_model_returns_zero_cost(self):
"""Unknown/custom models should NOT have fabricated costs."""
@@ -168,40 +156,12 @@ class TestPricing:
pricing = _get_pricing("")
assert pricing == _DEFAULT_PRICING
def test_deepseek_heuristic(self):
pricing = _get_pricing("deepseek-v3")
assert pricing["input"] == 0.14
def test_gemini_heuristic(self):
pricing = _get_pricing("gemini-3.0-ultra")
assert pricing["input"] == 0.15
def test_dated_model_gpt4o_mini(self):
"""gpt-4o-mini-2024-07-18 should match gpt-4o-mini, NOT gpt-4o."""
pricing = _get_pricing("gpt-4o-mini-2024-07-18")
assert pricing["input"] == 0.15 # gpt-4o-mini price, not gpt-4o's 2.50
def test_dated_model_o3_mini(self):
"""o3-mini-2025-01-31 should match o3-mini, NOT o3."""
pricing = _get_pricing("o3-mini-2025-01-31")
assert pricing["input"] == 1.10 # o3-mini price, not o3's 10.00
def test_dated_model_gpt41_mini(self):
"""gpt-4.1-mini-2025-04-14 should match gpt-4.1-mini, NOT gpt-4.1."""
pricing = _get_pricing("gpt-4.1-mini-2025-04-14")
assert pricing["input"] == 0.40 # gpt-4.1-mini, not gpt-4.1's 2.00
def test_dated_model_gpt41_nano(self):
"""gpt-4.1-nano-2025-04-14 should match gpt-4.1-nano, NOT gpt-4.1."""
pricing = _get_pricing("gpt-4.1-nano-2025-04-14")
assert pricing["input"] == 0.10 # gpt-4.1-nano, not gpt-4.1's 2.00
class TestHasKnownPricing:
def test_known_commercial_model(self):
assert _has_known_pricing("gpt-4o") is True
assert _has_known_pricing("gpt-4o", provider="openai") is True
assert _has_known_pricing("anthropic/claude-sonnet-4-20250514") is True
assert _has_known_pricing("deepseek-chat") is True
assert _has_known_pricing("gpt-4.1", provider="openai") is True
def test_unknown_custom_model(self):
assert _has_known_pricing("FP16_Hermes_4.5") is False
@@ -210,26 +170,39 @@ class TestHasKnownPricing:
assert _has_known_pricing("") is False
assert _has_known_pricing(None) is False
def test_heuristic_matched_models(self):
"""Models matched by keyword heuristics should be considered known."""
assert _has_known_pricing("some-opus-model") is True
assert _has_known_pricing("future-sonnet-v2") is True
def test_heuristic_matched_models_are_not_considered_known(self):
assert _has_known_pricing("some-opus-model") is False
assert _has_known_pricing("future-sonnet-v2") is False
class TestEstimateCost:
def test_basic_cost(self):
# gpt-4o: 2.50/M input, 10.00/M output
cost = _estimate_cost("gpt-4o", 1_000_000, 1_000_000)
assert cost == pytest.approx(12.50, abs=0.01)
cost, status = _estimate_cost(
"anthropic/claude-sonnet-4-20250514",
1_000_000,
1_000_000,
provider="anthropic",
)
assert status == "estimated"
assert cost == pytest.approx(18.0, abs=0.01)
def test_zero_tokens(self):
cost = _estimate_cost("gpt-4o", 0, 0)
cost, status = _estimate_cost("gpt-4o", 0, 0, provider="openai")
assert status == "estimated"
assert cost == 0.0
def test_small_usage(self):
cost = _estimate_cost("gpt-4o", 1000, 500)
# 1000 * 2.50/1M + 500 * 10.00/1M = 0.0025 + 0.005 = 0.0075
assert cost == pytest.approx(0.0075, abs=0.0001)
def test_cache_aware_usage(self):
cost, status = _estimate_cost(
"anthropic/claude-sonnet-4-20250514",
1000,
500,
cache_read_tokens=2000,
cache_write_tokens=400,
provider="anthropic",
)
assert status == "estimated"
expected = (1000 * 3.0 + 500 * 15.0 + 2000 * 0.30 + 400 * 3.75) / 1_000_000
assert cost == pytest.approx(expected, abs=0.0001)
# =========================================================================
@@ -660,8 +633,13 @@ class TestEdgeCases:
def test_mixed_commercial_and_custom_models(self, db):
"""Mix of commercial and custom models: only commercial ones get costs."""
db.create_session(session_id="s1", source="cli", model="gpt-4o")
db.update_token_counts("s1", input_tokens=10000, output_tokens=5000)
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4-20250514")
db.update_token_counts(
"s1",
input_tokens=10000,
output_tokens=5000,
billing_provider="anthropic",
)
db.create_session(session_id="s2", source="cli", model="my-local-llama")
db.update_token_counts("s2", input_tokens=10000, output_tokens=5000)
db._conn.commit()
@@ -672,13 +650,13 @@ class TestEdgeCases:
# Cost should only come from gpt-4o, not from the custom model
overview = report["overview"]
assert overview["estimated_cost"] > 0
assert "gpt-4o" in overview["models_with_pricing"] # list now, not set
assert "claude-sonnet-4-20250514" in overview["models_with_pricing"] # list now, not set
assert "my-local-llama" in overview["models_without_pricing"]
# Verify individual model entries
gpt = next(m for m in report["models"] if m["model"] == "gpt-4o")
assert gpt["has_pricing"] is True
assert gpt["cost"] > 0
claude = next(m for m in report["models"] if m["model"] == "claude-sonnet-4-20250514")
assert claude["has_pricing"] is True
assert claude["cost"] > 0
llama = next(m for m in report["models"] if m["model"] == "my-local-llama")
assert llama["has_pricing"] is False

View File

@@ -0,0 +1,210 @@
"""Tests for probe_mcp_server_tools() in tools.mcp_tool."""
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def _reset_mcp_state():
"""Ensure clean MCP module state before/after each test."""
import tools.mcp_tool as mcp
old_loop = mcp._mcp_loop
old_thread = mcp._mcp_thread
old_servers = dict(mcp._servers)
yield
mcp._servers.clear()
mcp._servers.update(old_servers)
mcp._mcp_loop = old_loop
mcp._mcp_thread = old_thread
class TestProbeMcpServerTools:
"""Tests for the lightweight probe_mcp_server_tools function."""
def test_returns_empty_when_mcp_not_available(self):
with patch("tools.mcp_tool._MCP_AVAILABLE", False):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_empty_when_no_config(self):
with patch("tools.mcp_tool._load_mcp_config", return_value={}):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_empty_when_all_servers_disabled(self):
config = {
"github": {"command": "npx", "enabled": False},
"slack": {"command": "npx", "enabled": "off"},
}
with patch("tools.mcp_tool._load_mcp_config", return_value=config):
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
def test_returns_tools_from_successful_server(self):
"""Successfully probed server returns its tools list."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
}
mock_tool_1 = SimpleNamespace(name="create_issue", description="Create a new issue")
mock_tool_2 = SimpleNamespace(name="search_repos", description="Search repositories")
mock_server = MagicMock()
mock_server._tools = [mock_tool_1, mock_tool_2]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
# Simulate running the async probe
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert len(result["github"]) == 2
assert result["github"][0] == ("create_issue", "Create a new issue")
assert result["github"][1] == ("search_repos", "Search repositories")
mock_server.shutdown.assert_awaited_once()
def test_failed_server_omitted_from_results(self):
"""Servers that fail to connect are silently skipped."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
"broken": {"command": "nonexistent", "connect_timeout": 5},
}
mock_tool = SimpleNamespace(name="create_issue", description="Create")
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
if name == "broken":
raise ConnectionError("Server not found")
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert "broken" not in result
def test_handles_tool_without_description(self):
"""Tools without descriptions get empty string."""
config = {"github": {"command": "npx", "connect_timeout": 5}}
mock_tool = SimpleNamespace(name="my_tool") # no description attribute
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result["github"][0] == ("my_tool", "")
def test_cleanup_called_even_on_failure(self):
"""_stop_mcp_loop is called even when probe fails."""
config = {"github": {"command": "npx", "connect_timeout": 5}}
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop", side_effect=RuntimeError("boom")), \
patch("tools.mcp_tool._stop_mcp_loop") as mock_stop:
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert result == {}
mock_stop.assert_called_once()
def test_skips_disabled_servers(self):
"""Disabled servers are not probed."""
config = {
"github": {"command": "npx", "connect_timeout": 5},
"disabled_one": {"command": "npx", "enabled": False},
}
mock_tool = SimpleNamespace(name="create_issue", description="Create")
mock_server = MagicMock()
mock_server._tools = [mock_tool]
mock_server.shutdown = AsyncMock()
connect_calls = []
async def fake_connect(name, cfg):
connect_calls.append(name)
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
patch("tools.mcp_tool._stop_mcp_loop"):
def run_coro(coro, timeout=120):
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(coro)
finally:
loop.close()
mock_run.side_effect = run_coro
from tools.mcp_tool import probe_mcp_server_tools
result = probe_mcp_server_tools()
assert "github" in result
assert "disabled_one" not in result
assert "disabled_one" not in connect_calls

View File

@@ -2596,17 +2596,19 @@ class TestMCPSelectiveToolLoading:
async def run():
with patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch.dict("tools.mcp_tool._servers", {}, clear=True), \
patch("tools.registry.registry", mock_registry), \
patch("toolsets.create_custom_toolset"):
return await _discover_and_register_server(
registered = await _discover_and_register_server(
"ink_existing",
{"url": "https://mcp.example.com", "tools": {"include": ["create_service"]}},
)
return registered, _existing_tool_names()
try:
registered = asyncio.run(run())
registered, existing = asyncio.run(run())
assert registered == ["mcp_ink_existing_create_service"]
assert _existing_tool_names() == ["mcp_ink_existing_create_service"]
assert existing == ["mcp_ink_existing_create_service"]
finally:
_servers.pop("ink_existing", None)

View File

@@ -294,6 +294,61 @@ class TestCheckpoint:
recovered = registry.recover_from_checkpoint()
assert recovered == 0
def test_write_checkpoint_includes_watcher_metadata(self, registry, tmp_path):
with patch("tools.process_registry.CHECKPOINT_PATH", tmp_path / "procs.json"):
s = _make_session()
s.watcher_platform = "telegram"
s.watcher_chat_id = "999"
s.watcher_thread_id = "42"
s.watcher_interval = 60
registry._running[s.id] = s
registry._write_checkpoint()
data = json.loads((tmp_path / "procs.json").read_text())
assert len(data) == 1
assert data[0]["watcher_platform"] == "telegram"
assert data[0]["watcher_chat_id"] == "999"
assert data[0]["watcher_thread_id"] == "42"
assert data[0]["watcher_interval"] == 60
def test_recover_enqueues_watchers(self, registry, tmp_path):
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(), # current process — guaranteed alive
"task_id": "t1",
"session_key": "sk1",
"watcher_platform": "telegram",
"watcher_chat_id": "123",
"watcher_thread_id": "42",
"watcher_interval": 60,
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
assert len(registry.pending_watchers) == 1
w = registry.pending_watchers[0]
assert w["session_id"] == "proc_live"
assert w["platform"] == "telegram"
assert w["chat_id"] == "123"
assert w["thread_id"] == "42"
assert w["check_interval"] == 60
def test_recover_skips_watcher_when_no_interval(self, registry, tmp_path):
checkpoint = tmp_path / "procs.json"
checkpoint.write_text(json.dumps([{
"session_id": "proc_live",
"command": "sleep 999",
"pid": os.getpid(),
"task_id": "t1",
"watcher_interval": 0,
}]))
with patch("tools.process_registry.CHECKPOINT_PATH", checkpoint):
recovered = registry.recover_from_checkpoint()
assert recovered == 1
assert len(registry.pending_watchers) == 0
# =========================================================================
# Kill process

View File

@@ -25,7 +25,7 @@ def _make_config():
def _install_telegram_mock(monkeypatch, bot):
parse_mode = SimpleNamespace(MARKDOWN_V2="MarkdownV2")
parse_mode = SimpleNamespace(MARKDOWN_V2="MarkdownV2", HTML="HTML")
constants_mod = SimpleNamespace(ParseMode=parse_mode)
telegram_mod = SimpleNamespace(Bot=lambda token: bot, constants=constants_mod)
monkeypatch.setitem(sys.modules, "telegram", telegram_mod)
@@ -391,3 +391,97 @@ class TestSendToPlatformChunking:
assert len(sent_calls) >= 3
assert all(call == [] for call in sent_calls[:-1])
assert sent_calls[-1] == media
# ---------------------------------------------------------------------------
# HTML auto-detection in Telegram send
# ---------------------------------------------------------------------------
class TestSendTelegramHtmlDetection:
"""Verify that messages containing HTML tags are sent with parse_mode=HTML
and that plain / markdown messages use MarkdownV2."""
def _make_bot(self):
bot = MagicMock()
bot.send_message = AsyncMock(return_value=SimpleNamespace(message_id=1))
bot.send_photo = AsyncMock()
bot.send_video = AsyncMock()
bot.send_voice = AsyncMock()
bot.send_audio = AsyncMock()
bot.send_document = AsyncMock()
return bot
def test_html_message_uses_html_parse_mode(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(
_send_telegram("tok", "123", "<b>Hello</b> world")
)
bot.send_message.assert_awaited_once()
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
assert kwargs["text"] == "<b>Hello</b> world"
def test_plain_text_uses_markdown_v2(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(
_send_telegram("tok", "123", "Just plain text, no tags")
)
bot.send_message.assert_awaited_once()
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "MarkdownV2"
def test_html_with_code_and_pre_tags(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
html = "<pre>code block</pre> and <code>inline</code>"
asyncio.run(_send_telegram("tok", "123", html))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
def test_closing_tag_detected(self, monkeypatch):
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(_send_telegram("tok", "123", "text </div> more"))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "HTML"
def test_angle_brackets_in_math_not_detected(self, monkeypatch):
"""Expressions like 'x < 5' or '3 > 2' should not trigger HTML mode."""
bot = self._make_bot()
_install_telegram_mock(monkeypatch, bot)
asyncio.run(_send_telegram("tok", "123", "if x < 5 then y > 2"))
kwargs = bot.send_message.await_args.kwargs
assert kwargs["parse_mode"] == "MarkdownV2"
def test_html_parse_failure_falls_back_to_plain(self, monkeypatch):
"""If Telegram rejects the HTML, fall back to plain text."""
bot = self._make_bot()
bot.send_message = AsyncMock(
side_effect=[
Exception("Bad Request: can't parse entities: unsupported html tag"),
SimpleNamespace(message_id=2), # plain fallback succeeds
]
)
_install_telegram_mock(monkeypatch, bot)
result = asyncio.run(
_send_telegram("tok", "123", "<invalid>broken html</invalid>")
)
assert result["success"] is True
assert bot.send_message.await_count == 2
second_call = bot.send_message.await_args_list[1].kwargs
assert second_call["parse_mode"] is None

View File

@@ -1,8 +1,11 @@
"""Tests for Firecrawl client configuration and singleton behavior.
"""Tests for web backend client configuration and singleton behavior.
Coverage:
_get_firecrawl_client() — configuration matrix, singleton caching,
constructor failure recovery, return value verification, edge cases.
_get_backend() — backend selection logic with env var combinations.
_get_parallel_client() — Parallel client configuration, singleton caching.
check_web_api_key() — unified availability check.
"""
import os
@@ -117,3 +120,157 @@ class TestFirecrawlClientConfig:
from tools.web_tools import _get_firecrawl_client
with pytest.raises(ValueError):
_get_firecrawl_client()
class TestBackendSelection:
"""Test suite for _get_backend() backend selection logic.
The backend is configured via config.yaml (web.backend), set by
``hermes tools``. Falls back to key-based detection for legacy/manual
setups.
"""
_ENV_KEYS = ("PARALLEL_API_KEY", "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL")
def setup_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def teardown_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
# ── Config-based selection (web.backend in config.yaml) ───────────
def test_config_parallel(self):
"""web.backend=parallel in config → 'parallel' regardless of keys."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "parallel"}):
assert _get_backend() == "parallel"
def test_config_firecrawl(self):
"""web.backend=firecrawl in config → 'firecrawl' even if Parallel key set."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "firecrawl"}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "firecrawl"
def test_config_case_insensitive(self):
"""web.backend=Parallel (mixed case) → 'parallel'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "Parallel"}):
assert _get_backend() == "parallel"
# ── Fallback (no web.backend in config) ───────────────────────────
def test_fallback_parallel_only_key(self):
"""Only PARALLEL_API_KEY set → 'parallel'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "parallel"
def test_fallback_both_keys_defaults_to_firecrawl(self):
"""Both keys set, no config → 'firecrawl' (backward compat)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key", "FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "firecrawl"
def test_fallback_firecrawl_only_key(self):
"""Only FIRECRAWL_API_KEY set → 'firecrawl'."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}), \
patch.dict(os.environ, {"FIRECRAWL_API_KEY": "fc-test"}):
assert _get_backend() == "firecrawl"
def test_fallback_no_keys_defaults_to_firecrawl(self):
"""No keys, no config → 'firecrawl' (will fail at client init)."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={}):
assert _get_backend() == "firecrawl"
def test_invalid_config_falls_through_to_fallback(self):
"""web.backend=invalid → ignored, uses key-based fallback."""
from tools.web_tools import _get_backend
with patch("tools.web_tools._load_web_config", return_value={"backend": "tavily"}), \
patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
assert _get_backend() == "parallel"
class TestParallelClientConfig:
"""Test suite for Parallel client initialization."""
def setup_method(self):
import tools.web_tools
tools.web_tools._parallel_client = None
os.environ.pop("PARALLEL_API_KEY", None)
def teardown_method(self):
import tools.web_tools
tools.web_tools._parallel_client = None
os.environ.pop("PARALLEL_API_KEY", None)
def test_creates_client_with_key(self):
"""PARALLEL_API_KEY set → creates Parallel client."""
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import _get_parallel_client
from parallel import Parallel
client = _get_parallel_client()
assert client is not None
assert isinstance(client, Parallel)
def test_no_key_raises_with_helpful_message(self):
"""No PARALLEL_API_KEY → ValueError with guidance."""
from tools.web_tools import _get_parallel_client
with pytest.raises(ValueError, match="PARALLEL_API_KEY"):
_get_parallel_client()
def test_singleton_returns_same_instance(self):
"""Second call returns cached client."""
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import _get_parallel_client
client1 = _get_parallel_client()
client2 = _get_parallel_client()
assert client1 is client2
class TestCheckWebApiKey:
"""Test suite for check_web_api_key() unified availability check."""
_ENV_KEYS = ("PARALLEL_API_KEY", "FIRECRAWL_API_KEY", "FIRECRAWL_API_URL")
def setup_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def teardown_method(self):
for key in self._ENV_KEYS:
os.environ.pop(key, None)
def test_parallel_key_only(self):
with patch.dict(os.environ, {"PARALLEL_API_KEY": "test-key"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_firecrawl_key_only(self):
with patch.dict(os.environ, {"FIRECRAWL_API_KEY": "fc-test"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_firecrawl_url_only(self):
with patch.dict(os.environ, {"FIRECRAWL_API_URL": "http://localhost:3002"}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True
def test_no_keys_returns_false(self):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is False
def test_both_keys_returns_true(self):
with patch.dict(os.environ, {
"PARALLEL_API_KEY": "test-key",
"FIRECRAWL_API_KEY": "fc-test",
}):
from tools.web_tools import check_web_api_key
assert check_web_api_key() is True

View File

@@ -426,6 +426,8 @@ async def test_web_extract_blocks_redirected_final_url(monkeypatch):
async def test_web_crawl_short_circuits_blocked_url(monkeypatch):
from tools import web_tools
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
monkeypatch.setattr(
web_tools,
"check_website_access",
@@ -453,6 +455,9 @@ async def test_web_crawl_short_circuits_blocked_url(monkeypatch):
async def test_web_crawl_blocks_redirected_final_url(monkeypatch):
from tools import web_tools
# web_crawl_tool checks for Firecrawl env before website policy
monkeypatch.setenv("FIRECRAWL_API_KEY", "fake-key")
def fake_check(url):
if url == "https://allowed.test":
return None

View File

@@ -555,6 +555,11 @@ def _get_session_info(task_id: Optional[str] = None) -> Dict[str, str]:
session_info = provider.create_session(task_id)
with _cleanup_lock:
# Double-check: another thread may have created a session while we
# were doing the network call. Use the existing one to avoid leaking
# orphan cloud sessions.
if task_id in _active_sessions:
return _active_sessions[task_id]
_active_sessions[task_id] = session_info
return session_info

View File

@@ -82,6 +82,9 @@ def _build_provider_env_blocklist() -> frozenset:
"FIREWORKS_API_KEY", # Fireworks AI
"XAI_API_KEY", # xAI (Grok)
"HELICONE_API_KEY", # LLM Observability proxy
"PARALLEL_API_KEY",
"FIRECRAWL_API_KEY",
"FIRECRAWL_API_URL",
# Gateway/runtime config not represented in OPTIONAL_ENV_VARS.
"TELEGRAM_HOME_CHANNEL",
"TELEGRAM_HOME_CHANNEL_NAME",

View File

@@ -1624,6 +1624,72 @@ def get_mcp_status() -> List[dict]:
return result
def probe_mcp_server_tools() -> Dict[str, List[tuple]]:
"""Temporarily connect to configured MCP servers and list their tools.
Designed for ``hermes tools`` interactive configuration — connects to each
enabled server, grabs tool names and descriptions, then disconnects.
Does NOT register tools in the Hermes registry.
Returns:
Dict mapping server name to list of (tool_name, description) tuples.
Servers that fail to connect are omitted from the result.
"""
if not _MCP_AVAILABLE:
return {}
servers_config = _load_mcp_config()
if not servers_config:
return {}
enabled = {
k: v for k, v in servers_config.items()
if _parse_boolish(v.get("enabled", True), default=True)
}
if not enabled:
return {}
_ensure_mcp_loop()
result: Dict[str, List[tuple]] = {}
probed_servers: List[MCPServerTask] = []
async def _probe_all():
names = list(enabled.keys())
coros = []
for name, cfg in enabled.items():
ct = cfg.get("connect_timeout", _DEFAULT_CONNECT_TIMEOUT)
coros.append(asyncio.wait_for(_connect_server(name, cfg), timeout=ct))
outcomes = await asyncio.gather(*coros, return_exceptions=True)
for name, outcome in zip(names, outcomes):
if isinstance(outcome, Exception):
logger.debug("Probe: failed to connect to '%s': %s", name, outcome)
continue
probed_servers.append(outcome)
tools = []
for t in outcome._tools:
desc = getattr(t, "description", "") or ""
tools.append((t.name, desc))
result[name] = tools
# Shut down all probed connections
await asyncio.gather(
*(s.shutdown() for s in probed_servers),
return_exceptions=True,
)
try:
_run_on_mcp_loop(_probe_all(), timeout=120)
except Exception as exc:
logger.debug("MCP probe failed: %s", exc)
finally:
_stop_mcp_loop()
return result
def shutdown_mcp_servers():
"""Close all MCP server connections and stop the background loop.

View File

@@ -78,6 +78,11 @@ class ProcessSession:
output_buffer: str = "" # Rolling output (last MAX_OUTPUT_CHARS)
max_output_chars: int = MAX_OUTPUT_CHARS
detached: bool = False # True if recovered from crash (no pipe)
# Watcher/notification metadata (persisted for crash recovery)
watcher_platform: str = ""
watcher_chat_id: str = ""
watcher_thread_id: str = ""
watcher_interval: int = 0 # 0 = no watcher configured
_lock: threading.Lock = field(default_factory=threading.Lock)
_reader_thread: Optional[threading.Thread] = field(default=None, repr=False)
_pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True)
@@ -709,6 +714,10 @@ class ProcessRegistry:
"started_at": s.started_at,
"task_id": s.task_id,
"session_key": s.session_key,
"watcher_platform": s.watcher_platform,
"watcher_chat_id": s.watcher_chat_id,
"watcher_thread_id": s.watcher_thread_id,
"watcher_interval": s.watcher_interval,
})
# Atomic write to avoid corruption on crash
@@ -755,12 +764,27 @@ class ProcessRegistry:
cwd=entry.get("cwd"),
started_at=entry.get("started_at", time.time()),
detached=True, # Can't read output, but can report status + kill
watcher_platform=entry.get("watcher_platform", ""),
watcher_chat_id=entry.get("watcher_chat_id", ""),
watcher_thread_id=entry.get("watcher_thread_id", ""),
watcher_interval=entry.get("watcher_interval", 0),
)
with self._lock:
self._running[session.id] = session
recovered += 1
logger.info("Recovered detached process: %s (pid=%d)", session.command[:60], pid)
# Re-enqueue watcher so gateway can resume notifications
if session.watcher_interval > 0:
self.pending_watchers.append({
"session_id": session.id,
"check_interval": session.watcher_interval,
"session_key": session.session_key,
"platform": session.watcher_platform,
"chat_id": session.watcher_chat_id,
"thread_id": session.watcher_thread_id,
})
# Clear the checkpoint (will be rewritten as processes finish)
try:
from utils import atomic_json_write

View File

@@ -355,20 +355,31 @@ async def _send_telegram(token, chat_id, message, media_files=None, thread_id=No
"""Send via Telegram Bot API (one-shot, no polling needed).
Applies markdown→MarkdownV2 formatting (same as the gateway adapter)
so that bold, links, and headers render correctly.
so that bold, links, and headers render correctly. If the message
already contains HTML tags, it is sent with ``parse_mode='HTML'``
instead, bypassing MarkdownV2 conversion.
"""
try:
from telegram import Bot
from telegram.constants import ParseMode
# Reuse the gateway adapter's format_message for markdown→MarkdownV2
try:
from gateway.platforms.telegram import TelegramAdapter, _escape_mdv2, _strip_mdv2
_adapter = TelegramAdapter.__new__(TelegramAdapter)
formatted = _adapter.format_message(message)
except Exception:
# Fallback: send as-is if formatting unavailable
# Auto-detect HTML tags — if present, skip MarkdownV2 and send as HTML.
# Inspired by github.com/ashaney — PR #1568.
_has_html = bool(re.search(r'<[a-zA-Z/][^>]*>', message))
if _has_html:
formatted = message
send_parse_mode = ParseMode.HTML
else:
# Reuse the gateway adapter's format_message for markdown→MarkdownV2
try:
from gateway.platforms.telegram import TelegramAdapter, _escape_mdv2, _strip_mdv2
_adapter = TelegramAdapter.__new__(TelegramAdapter)
formatted = _adapter.format_message(message)
except Exception:
# Fallback: send as-is if formatting unavailable
formatted = message
send_parse_mode = ParseMode.MARKDOWN_V2
bot = Bot(token=token)
int_chat_id = int(chat_id)
@@ -384,16 +395,19 @@ async def _send_telegram(token, chat_id, message, media_files=None, thread_id=No
try:
last_msg = await bot.send_message(
chat_id=int_chat_id, text=formatted,
parse_mode=ParseMode.MARKDOWN_V2, **thread_kwargs
parse_mode=send_parse_mode, **thread_kwargs
)
except Exception as md_error:
# MarkdownV2 failed, fall back to plain text
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower():
logger.warning("MarkdownV2 parse failed in _send_telegram, falling back to plain text: %s", md_error)
try:
from gateway.platforms.telegram import _strip_mdv2
plain = _strip_mdv2(formatted)
except Exception:
# Parse failed, fall back to plain text
if "parse" in str(md_error).lower() or "markdown" in str(md_error).lower() or "html" in str(md_error).lower():
logger.warning("Parse mode %s failed in _send_telegram, falling back to plain text: %s", send_parse_mode, md_error)
if not _has_html:
try:
from gateway.platforms.telegram import _strip_mdv2
plain = _strip_mdv2(formatted)
except Exception:
plain = message
else:
plain = message
last_msg = await bot.send_message(
chat_id=int_chat_id, text=plain,

View File

@@ -1082,13 +1082,23 @@ def terminal_tool(
result_data["check_interval_note"] = (
f"Requested {check_interval}s raised to minimum 30s"
)
watcher_platform = os.getenv("HERMES_SESSION_PLATFORM", "")
watcher_chat_id = os.getenv("HERMES_SESSION_CHAT_ID", "")
watcher_thread_id = os.getenv("HERMES_SESSION_THREAD_ID", "")
# Store on session for checkpoint persistence
proc_session.watcher_platform = watcher_platform
proc_session.watcher_chat_id = watcher_chat_id
proc_session.watcher_thread_id = watcher_thread_id
proc_session.watcher_interval = effective_interval
process_registry.pending_watchers.append({
"session_id": proc_session.id,
"check_interval": effective_interval,
"session_key": session_key,
"platform": os.getenv("HERMES_SESSION_PLATFORM", ""),
"chat_id": os.getenv("HERMES_SESSION_CHAT_ID", ""),
"thread_id": os.getenv("HERMES_SESSION_THREAD_ID", ""),
"platform": watcher_platform,
"chat_id": watcher_chat_id,
"thread_id": watcher_thread_id,
})
return json.dumps(result_data, ensure_ascii=False)

View File

@@ -3,16 +3,16 @@
Standalone Web Tools Module
This module provides generic web tools that work with multiple backend providers.
Currently uses Firecrawl as the backend, and the interface makes it easy to swap
providers without changing the function signatures.
Backend is selected during ``hermes tools`` setup (web.backend in config.yaml).
Available tools:
- web_search_tool: Search the web for information
- web_extract_tool: Extract content from specific web pages
- web_crawl_tool: Crawl websites with specific instructions
- web_crawl_tool: Crawl websites with specific instructions (Firecrawl only)
Backend compatibility:
- Firecrawl: https://docs.firecrawl.dev/introduction
- Firecrawl: https://docs.firecrawl.dev/introduction (search, extract, crawl)
- Parallel: https://docs.parallel.ai (search, extract)
LLM Processing:
- Uses OpenRouter API with Gemini 3 Flash Preview for intelligent content extraction
@@ -53,6 +53,39 @@ from tools.website_policy import check_website_access
logger = logging.getLogger(__name__)
# ─── Backend Selection ────────────────────────────────────────────────────────
def _load_web_config() -> dict:
"""Load the ``web:`` section from ~/.hermes/config.yaml."""
try:
from hermes_cli.config import load_config
return load_config().get("web", {})
except (ImportError, Exception):
return {}
def _get_backend() -> str:
"""Determine which web backend to use.
Reads ``web.backend`` from config.yaml (set by ``hermes tools``).
Falls back to whichever API key is present for users who configured
keys manually without running setup.
"""
configured = _load_web_config().get("backend", "").lower().strip()
if configured in ("parallel", "firecrawl"):
return configured
# Fallback for manual / legacy config — use whichever key is present.
has_firecrawl = bool(os.getenv("FIRECRAWL_API_KEY") or os.getenv("FIRECRAWL_API_URL"))
has_parallel = bool(os.getenv("PARALLEL_API_KEY"))
if has_parallel and not has_firecrawl:
return "parallel"
# Default to firecrawl (backward compat, or when both are set)
return "firecrawl"
# ─── Firecrawl Client ────────────────────────────────────────────────────────
_firecrawl_client = None
def _get_firecrawl_client():
@@ -81,6 +114,47 @@ def _get_firecrawl_client():
_firecrawl_client = Firecrawl(**kwargs)
return _firecrawl_client
# ─── Parallel Client ─────────────────────────────────────────────────────────
_parallel_client = None
_async_parallel_client = None
def _get_parallel_client():
"""Get or create the Parallel sync client (lazy initialization).
Requires PARALLEL_API_KEY environment variable.
"""
from parallel import Parallel
global _parallel_client
if _parallel_client is None:
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_parallel_client = Parallel(api_key=api_key)
return _parallel_client
def _get_async_parallel_client():
"""Get or create the Parallel async client (lazy initialization).
Requires PARALLEL_API_KEY environment variable.
"""
from parallel import AsyncParallel
global _async_parallel_client
if _async_parallel_client is None:
api_key = os.getenv("PARALLEL_API_KEY")
if not api_key:
raise ValueError(
"PARALLEL_API_KEY environment variable not set. "
"Get your API key at https://parallel.ai"
)
_async_parallel_client = AsyncParallel(api_key=api_key)
return _async_parallel_client
DEFAULT_MIN_LENGTH_FOR_SUMMARIZATION = 5000
# Allow per-task override via env var
@@ -428,13 +502,89 @@ def clean_base64_images(text: str) -> str:
return cleaned_text
# ─── Parallel Search & Extract Helpers ────────────────────────────────────────
def _parallel_search(query: str, limit: int = 5) -> dict:
"""Search using the Parallel SDK and return results as a dict."""
from tools.interrupt import is_interrupted
if is_interrupted():
return {"error": "Interrupted", "success": False}
mode = os.getenv("PARALLEL_SEARCH_MODE", "agentic").lower().strip()
if mode not in ("fast", "one-shot", "agentic"):
mode = "agentic"
logger.info("Parallel search: '%s' (mode=%s, limit=%d)", query, mode, limit)
response = _get_parallel_client().beta.search(
search_queries=[query],
objective=query,
mode=mode,
max_results=min(limit, 20),
)
web_results = []
for i, result in enumerate(response.results or []):
excerpts = result.excerpts or []
web_results.append({
"url": result.url or "",
"title": result.title or "",
"description": " ".join(excerpts) if excerpts else "",
"position": i + 1,
})
return {"success": True, "data": {"web": web_results}}
async def _parallel_extract(urls: List[str]) -> List[Dict[str, Any]]:
"""Extract content from URLs using the Parallel async SDK.
Returns a list of result dicts matching the structure expected by the
LLM post-processing pipeline (url, title, content, metadata).
"""
from tools.interrupt import is_interrupted
if is_interrupted():
return [{"url": u, "error": "Interrupted", "title": ""} for u in urls]
logger.info("Parallel extract: %d URL(s)", len(urls))
response = await _get_async_parallel_client().beta.extract(
urls=urls,
full_content=True,
)
results = []
for result in response.results or []:
content = result.full_content or ""
if not content:
content = "\n\n".join(result.excerpts or [])
url = result.url or ""
title = result.title or ""
results.append({
"url": url,
"title": title,
"content": content,
"raw_content": content,
"metadata": {"sourceURL": url, "title": title},
})
for error in response.errors or []:
results.append({
"url": error.url or "",
"title": "",
"content": "",
"error": error.content or error.error_type or "extraction failed",
"metadata": {"sourceURL": error.url or ""},
})
return results
def web_search_tool(query: str, limit: int = 5) -> str:
"""
Search the web for information using available search API backend.
This function provides a generic interface for web search that can work
with multiple backends. Currently uses Firecrawl.
with multiple backends (Parallel or Firecrawl).
Note: This function returns search result metadata only (URLs, titles, descriptions).
Use web_extract_tool to get full content from specific URLs.
@@ -478,17 +628,28 @@ def web_search_tool(query: str, limit: int = 5) -> str:
if is_interrupted():
return json.dumps({"error": "Interrupted", "success": False})
# Dispatch to the configured backend
backend = _get_backend()
if backend == "parallel":
response_data = _parallel_search(query, limit)
debug_call_data["results_count"] = len(response_data.get("data", {}).get("web", []))
result_json = json.dumps(response_data, indent=2, ensure_ascii=False)
debug_call_data["final_response_size"] = len(result_json)
_debug.log_call("web_search_tool", debug_call_data)
_debug.save()
return result_json
logger.info("Searching the web for: '%s' (limit: %d)", query, limit)
response = _get_firecrawl_client().search(
query=query,
limit=limit
)
# The response is a SearchData object with web, news, and images attributes
# When not scraping, the results are directly in these attributes
web_results = []
# Check if response has web attribute (SearchData object)
if hasattr(response, 'web'):
# Response is a SearchData object with web attribute
@@ -596,123 +757,130 @@ async def web_extract_tool(
try:
logger.info("Extracting content from %d URL(s)", len(urls))
# Determine requested formats for Firecrawl v2
formats: List[str] = []
if format == "markdown":
formats = ["markdown"]
elif format == "html":
formats = ["html"]
# Dispatch to the configured backend
backend = _get_backend()
if backend == "parallel":
results = await _parallel_extract(urls)
else:
# Default: request markdown for LLM-readiness and include html as backup
formats = ["markdown", "html"]
# Always use individual scraping for simplicity and reliability
# Batch scraping adds complexity without much benefit for small numbers of URLs
results: List[Dict[str, Any]] = []
from tools.interrupt import is_interrupted as _is_interrupted
for url in urls:
if _is_interrupted():
results.append({"url": url, "error": "Interrupted", "title": ""})
continue
# ── Firecrawl extraction ──
# Determine requested formats for Firecrawl v2
formats: List[str] = []
if format == "markdown":
formats = ["markdown"]
elif format == "html":
formats = ["html"]
else:
# Default: request markdown for LLM-readiness and include html as backup
formats = ["markdown", "html"]
# Website policy check — block before fetching
blocked = check_website_access(url)
if blocked:
logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"])
results.append({
"url": url, "title": "", "content": "",
"error": blocked["message"],
"blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]},
})
continue
# Always use individual scraping for simplicity and reliability
# Batch scraping adds complexity without much benefit for small numbers of URLs
results: List[Dict[str, Any]] = []
try:
logger.info("Scraping: %s", url)
scrape_result = _get_firecrawl_client().scrape(
url=url,
formats=formats
)
# Process the result - properly handle object serialization
metadata = {}
title = ""
content_markdown = None
content_html = None
# Extract data from the scrape result
if hasattr(scrape_result, 'model_dump'):
# Pydantic model - use model_dump to get dict
result_dict = scrape_result.model_dump()
content_markdown = result_dict.get('markdown')
content_html = result_dict.get('html')
metadata = result_dict.get('metadata', {})
elif hasattr(scrape_result, '__dict__'):
# Regular object with attributes
content_markdown = getattr(scrape_result, 'markdown', None)
content_html = getattr(scrape_result, 'html', None)
# Handle metadata - convert to dict if it's an object
metadata_obj = getattr(scrape_result, 'metadata', {})
if hasattr(metadata_obj, 'model_dump'):
metadata = metadata_obj.model_dump()
elif hasattr(metadata_obj, '__dict__'):
metadata = metadata_obj.__dict__
elif isinstance(metadata_obj, dict):
metadata = metadata_obj
else:
metadata = {}
elif isinstance(scrape_result, dict):
# Already a dictionary
content_markdown = scrape_result.get('markdown')
content_html = scrape_result.get('html')
metadata = scrape_result.get('metadata', {})
# Ensure metadata is a dict (not an object)
if not isinstance(metadata, dict):
if hasattr(metadata, 'model_dump'):
metadata = metadata.model_dump()
elif hasattr(metadata, '__dict__'):
metadata = metadata.__dict__
else:
metadata = {}
# Get title from metadata
title = metadata.get("title", "")
# Re-check final URL after redirect
final_url = metadata.get("sourceURL", url)
final_blocked = check_website_access(final_url)
if final_blocked:
logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"])
from tools.interrupt import is_interrupted as _is_interrupted
for url in urls:
if _is_interrupted():
results.append({"url": url, "error": "Interrupted", "title": ""})
continue
# Website policy check — block before fetching
blocked = check_website_access(url)
if blocked:
logger.info("Blocked web_extract for %s by rule %s", blocked["host"], blocked["rule"])
results.append({
"url": final_url, "title": title, "content": "", "raw_content": "",
"error": final_blocked["message"],
"blocked_by_policy": {"host": final_blocked["host"], "rule": final_blocked["rule"], "source": final_blocked["source"]},
"url": url, "title": "", "content": "",
"error": blocked["message"],
"blocked_by_policy": {"host": blocked["host"], "rule": blocked["rule"], "source": blocked["source"]},
})
continue
# Choose content based on requested format
chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or ""
results.append({
"url": final_url,
"title": title,
"content": chosen_content,
"raw_content": chosen_content,
"metadata": metadata # Now guaranteed to be a dict
})
except Exception as scrape_err:
logger.debug("Scrape failed for %s: %s", url, scrape_err)
results.append({
"url": url,
"title": "",
"content": "",
"raw_content": "",
"error": str(scrape_err)
})
try:
logger.info("Scraping: %s", url)
scrape_result = _get_firecrawl_client().scrape(
url=url,
formats=formats
)
# Process the result - properly handle object serialization
metadata = {}
title = ""
content_markdown = None
content_html = None
# Extract data from the scrape result
if hasattr(scrape_result, 'model_dump'):
# Pydantic model - use model_dump to get dict
result_dict = scrape_result.model_dump()
content_markdown = result_dict.get('markdown')
content_html = result_dict.get('html')
metadata = result_dict.get('metadata', {})
elif hasattr(scrape_result, '__dict__'):
# Regular object with attributes
content_markdown = getattr(scrape_result, 'markdown', None)
content_html = getattr(scrape_result, 'html', None)
# Handle metadata - convert to dict if it's an object
metadata_obj = getattr(scrape_result, 'metadata', {})
if hasattr(metadata_obj, 'model_dump'):
metadata = metadata_obj.model_dump()
elif hasattr(metadata_obj, '__dict__'):
metadata = metadata_obj.__dict__
elif isinstance(metadata_obj, dict):
metadata = metadata_obj
else:
metadata = {}
elif isinstance(scrape_result, dict):
# Already a dictionary
content_markdown = scrape_result.get('markdown')
content_html = scrape_result.get('html')
metadata = scrape_result.get('metadata', {})
# Ensure metadata is a dict (not an object)
if not isinstance(metadata, dict):
if hasattr(metadata, 'model_dump'):
metadata = metadata.model_dump()
elif hasattr(metadata, '__dict__'):
metadata = metadata.__dict__
else:
metadata = {}
# Get title from metadata
title = metadata.get("title", "")
# Re-check final URL after redirect
final_url = metadata.get("sourceURL", url)
final_blocked = check_website_access(final_url)
if final_blocked:
logger.info("Blocked redirected web_extract for %s by rule %s", final_blocked["host"], final_blocked["rule"])
results.append({
"url": final_url, "title": title, "content": "", "raw_content": "",
"error": final_blocked["message"],
"blocked_by_policy": {"host": final_blocked["host"], "rule": final_blocked["rule"], "source": final_blocked["source"]},
})
continue
# Choose content based on requested format
chosen_content = content_markdown if (format == "markdown" or (format is None and content_markdown)) else content_html or content_markdown or ""
results.append({
"url": final_url,
"title": title,
"content": chosen_content,
"raw_content": chosen_content,
"metadata": metadata # Now guaranteed to be a dict
})
except Exception as scrape_err:
logger.debug("Scrape failed for %s: %s", url, scrape_err)
results.append({
"url": url,
"title": "",
"content": "",
"raw_content": "",
"error": str(scrape_err)
})
response = {"results": results}
@@ -887,6 +1055,14 @@ async def web_crawl_tool(
}
try:
# web_crawl requires Firecrawl — Parallel has no crawl API
if not (os.getenv("FIRECRAWL_API_KEY") or os.getenv("FIRECRAWL_API_URL")):
return json.dumps({
"error": "web_crawl requires Firecrawl. Set FIRECRAWL_API_KEY, "
"or use web_search + web_extract instead.",
"success": False,
}, ensure_ascii=False)
# Ensure URL has protocol
if not url.startswith(('http://', 'https://')):
url = f'https://{url}'
@@ -1151,13 +1327,22 @@ async def web_crawl_tool(
def check_firecrawl_api_key() -> bool:
"""
Check if the Firecrawl API key is available in environment variables.
Returns:
bool: True if API key is set, False otherwise
"""
return bool(os.getenv("FIRECRAWL_API_KEY"))
def check_web_api_key() -> bool:
"""Check if any web backend API key is available (Parallel or Firecrawl)."""
return bool(
os.getenv("PARALLEL_API_KEY")
or os.getenv("FIRECRAWL_API_KEY")
or os.getenv("FIRECRAWL_API_URL")
)
def check_auxiliary_model() -> bool:
"""Check if an auxiliary text model is available for LLM content processing."""
try:
@@ -1184,26 +1369,30 @@ if __name__ == "__main__":
print("=" * 40)
# Check if API keys are available
firecrawl_available = check_firecrawl_api_key()
web_available = check_web_api_key()
nous_available = check_auxiliary_model()
if not firecrawl_available:
print("❌ FIRECRAWL_API_KEY environment variable not set")
print("Please set your API key: export FIRECRAWL_API_KEY='your-key-here'")
print("Get API key at: https://firecrawl.dev/")
if web_available:
backend = _get_backend()
print(f"✅ Web backend: {backend}")
if backend == "parallel":
print(" Using Parallel API (https://parallel.ai)")
else:
print(" Using Firecrawl API (https://firecrawl.dev)")
else:
print("✅ Firecrawl API key found")
print("❌ No web search backend configured")
print("Set PARALLEL_API_KEY (https://parallel.ai) or FIRECRAWL_API_KEY (https://firecrawl.dev)")
if not nous_available:
print("❌ No auxiliary model available for LLM content processing")
print("Set OPENROUTER_API_KEY, configure Nous Portal, or set OPENAI_BASE_URL + OPENAI_API_KEY")
print("⚠️ Without an auxiliary model, LLM content processing will be disabled")
else:
print(f"✅ Auxiliary model available: {DEFAULT_SUMMARIZER_MODEL}")
if not firecrawl_available:
if not web_available:
exit(1)
print("🛠️ Web tools ready for use!")
if nous_available:
@@ -1301,8 +1490,8 @@ registry.register(
toolset="web",
schema=WEB_SEARCH_SCHEMA,
handler=lambda args, **kw: web_search_tool(args.get("query", ""), limit=5),
check_fn=check_firecrawl_api_key,
requires_env=["FIRECRAWL_API_KEY"],
check_fn=check_web_api_key,
requires_env=["PARALLEL_API_KEY", "FIRECRAWL_API_KEY"],
emoji="🔍",
)
registry.register(
@@ -1311,8 +1500,8 @@ registry.register(
schema=WEB_EXTRACT_SCHEMA,
handler=lambda args, **kw: web_extract_tool(
args.get("urls", [])[:5] if isinstance(args.get("urls"), list) else [], "markdown"),
check_fn=check_firecrawl_api_key,
requires_env=["FIRECRAWL_API_KEY"],
check_fn=check_web_api_key,
requires_env=["PARALLEL_API_KEY", "FIRECRAWL_API_KEY"],
is_async=True,
emoji="📄",
)

View File

@@ -49,6 +49,9 @@ hermes setup # Or configure everything at once
| **Kimi / Moonshot** | Moonshot-hosted coding and chat models | Set `KIMI_API_KEY` |
| **MiniMax** | International MiniMax endpoint | Set `MINIMAX_API_KEY` |
| **MiniMax China** | China-region MiniMax endpoint | Set `MINIMAX_CN_API_KEY` |
| **Alibaba Cloud** | Qwen models via DashScope | Set `DASHSCOPE_API_KEY` |
| **Kilo Code** | KiloCode-hosted models | Set `KILOCODE_API_KEY` |
| **Vercel AI Gateway** | Vercel AI Gateway routing | Set `AI_GATEWAY_API_KEY` |
| **Custom Endpoint** | VLLM, SGLang, or any OpenAI-compatible API | Set base URL + API key |
:::tip

View File

@@ -32,7 +32,8 @@ All variables go in `~/.hermes/.env`. You can also set them with `hermes config
| `KILOCODE_BASE_URL` | Override Kilo Code base URL (default: `https://api.kilo.ai/api/gateway`) |
| `ANTHROPIC_API_KEY` | Anthropic Console API key ([console.anthropic.com](https://console.anthropic.com/)) |
| `ANTHROPIC_TOKEN` | Manual or legacy Anthropic OAuth/setup-token override |
| `DASHSCOPE_API_KEY` | Alibaba Cloud DashScope API key for Qwen models via Anthropic-compatible API ([modelstudio.console.alibabacloud.com](https://modelstudio.console.alibabacloud.com/)) |
| `DASHSCOPE_API_KEY` | Alibaba Cloud DashScope API key for Qwen models ([modelstudio.console.alibabacloud.com](https://modelstudio.console.alibabacloud.com/)) |
| `DASHSCOPE_BASE_URL` | Custom DashScope base URL (default: international endpoint) |
| `CLAUDE_CODE_OAUTH_TOKEN` | Explicit Claude Code token override if you export one manually |
| `HERMES_MODEL` | Preferred model name (checked before `LLM_MODEL`, used by gateway) |
| `LLM_MODEL` | Default model name (fallback when not set in config.yaml) |
@@ -60,10 +61,13 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| Variable | Description |
|----------|-------------|
| `PARALLEL_API_KEY` | AI-native web search ([parallel.ai](https://parallel.ai/)) |
| `FIRECRAWL_API_KEY` | Web scraping ([firecrawl.dev](https://firecrawl.dev/)) |
| `FIRECRAWL_API_URL` | Custom Firecrawl API endpoint for self-hosted instances (optional) |
| `BROWSERBASE_API_KEY` | Browser automation ([browserbase.com](https://browserbase.com/)) |
| `BROWSERBASE_PROJECT_ID` | Browserbase project ID |
| `BROWSER_USE_API_KEY` | Browser Use cloud browser API key ([browser-use.com](https://browser-use.com/)) |
| `BROWSER_CDP_URL` | Chrome DevTools Protocol URL for local browser (set via `/browser connect`, e.g. `ws://localhost:9222`) |
| `BROWSER_INACTIVITY_TIMEOUT` | Browser session inactivity timeout in seconds |
| `FAL_KEY` | Image generation ([fal.ai](https://fal.ai/)) |
| `GROQ_API_KEY` | Groq Whisper STT API key ([groq.com](https://groq.com/)) |
@@ -173,6 +177,19 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| `EMAIL_ALLOW_ALL_USERS` | Allow all inbound email senders |
| `DINGTALK_CLIENT_ID` | DingTalk bot AppKey from developer portal ([open.dingtalk.com](https://open.dingtalk.com)) |
| `DINGTALK_CLIENT_SECRET` | DingTalk bot AppSecret from developer portal |
| `DINGTALK_ALLOWED_USERS` | Comma-separated DingTalk user IDs allowed to message the bot |
| `MATTERMOST_URL` | Mattermost server URL (e.g. `https://mm.example.com`) |
| `MATTERMOST_TOKEN` | Bot token or personal access token for Mattermost |
| `MATTERMOST_ALLOWED_USERS` | Comma-separated Mattermost user IDs allowed to message the bot |
| `MATTERMOST_HOME_CHANNEL` | Channel ID for proactive message delivery (cron, notifications) |
| `MATTERMOST_REPLY_MODE` | Reply style: `thread` (threaded replies) or `off` (flat messages, default) |
| `MATRIX_HOMESERVER` | Matrix homeserver URL (e.g. `https://matrix.org`) |
| `MATRIX_ACCESS_TOKEN` | Matrix access token for bot authentication |
| `MATRIX_USER_ID` | Matrix user ID (e.g. `@hermes:matrix.org`) — required for password login, optional with access token |
| `MATRIX_PASSWORD` | Matrix password (alternative to access token) |
| `MATRIX_ALLOWED_USERS` | Comma-separated Matrix user IDs allowed to message the bot (e.g. `@alice:matrix.org`) |
| `MATRIX_HOME_ROOM` | Room ID for proactive message delivery (e.g. `!abc123:matrix.org`) |
| `MATRIX_ENCRYPTION` | Enable end-to-end encryption (`true`/`false`, default: `false`) |
| `HASS_TOKEN` | Home Assistant Long-Lived Access Token (enables HA platform + tools) |
| `HASS_URL` | Home Assistant URL (default: `http://homeassistant.local:8123`) |
| `MESSAGING_CWD` | Working directory for terminal commands in messaging mode (default: `~`) |

View File

@@ -52,8 +52,9 @@ Type `/` in the CLI to open the autocomplete menu. Built-in commands are case-in
| Command | Description |
|---------|-------------|
| `/tools` | List available tools |
| `/tools [list\|disable\|enable] [name...]` | Manage tools: list available tools, or disable/enable specific tools for the current session. Disabling a tool removes it from the agent's toolset and triggers a session reset. |
| `/toolsets` | List available toolsets |
| `/browser [connect\|disconnect\|status]` | Manage local Chrome CDP connection. `connect` attaches browser tools to a running Chrome instance (default: `ws://localhost:9222`). `disconnect` detaches. `status` shows current connection. Auto-launches Chrome if no debugger is detected. |
| `/skills` | Search, install, inspect, or manage skills from online registries |
| `/cron` | Manage scheduled tasks (list, add/create, edit, pause, resume, run, remove) |
| `/reload-mcp` | Reload MCP servers from config.yaml |
@@ -118,7 +119,7 @@ The messaging gateway supports the following built-in commands inside Telegram,
## Notes
- `/skin`, `/tools`, `/toolsets`, `/config`, `/prompt`, `/cron`, `/skills`, `/platforms`, `/paste`, and `/verbose` are **CLI-only** commands.
- `/skin`, `/tools`, `/toolsets`, `/browser`, `/config`, `/prompt`, `/cron`, `/skills`, `/platforms`, `/paste`, and `/verbose` are **CLI-only** commands.
- `/status`, `/stop`, `/sethome`, `/resume`, and `/update` are **messaging-only** commands.
- `/background`, `/voice`, `/reload-mcp`, and `/rollback` work in **both** the CLI and the messaging gateway.
- `/voice join`, `/voice channel`, and `/voice leave` are only meaningful on Discord.

View File

@@ -72,6 +72,7 @@ You need at least one way to connect to an LLM. Use `hermes model` to switch pro
| **MiniMax China** | `MINIMAX_CN_API_KEY` in `~/.hermes/.env` (provider: `minimax-cn`) |
| **Alibaba Cloud** | `DASHSCOPE_API_KEY` in `~/.hermes/.env` (provider: `alibaba`, aliases: `dashscope`, `qwen`) |
| **Kilo Code** | `KILOCODE_API_KEY` in `~/.hermes/.env` (provider: `kilocode`) |
| **Alibaba Cloud** | `DASHSCOPE_API_KEY` in `~/.hermes/.env` (provider: `alibaba`) |
| **Custom Endpoint** | `hermes model` (saved in `config.yaml`) or `OPENAI_BASE_URL` + `OPENAI_API_KEY` in `~/.hermes/.env` |
:::info Codex Note
@@ -136,16 +137,20 @@ hermes chat --provider minimax --model MiniMax-Text-01
# MiniMax (China endpoint)
hermes chat --provider minimax-cn --model MiniMax-Text-01
# Requires: MINIMAX_CN_API_KEY in ~/.hermes/.env
# Alibaba Cloud / DashScope (Qwen models)
hermes chat --provider alibaba --model qwen-plus
# Requires: DASHSCOPE_API_KEY in ~/.hermes/.env
```
Or set the provider permanently in `config.yaml`:
```yaml
model:
provider: "zai" # or: kimi-coding, minimax, minimax-cn
provider: "zai" # or: kimi-coding, minimax, minimax-cn, alibaba
default: "glm-4-plus"
```
Base URLs can be overridden with `GLM_BASE_URL`, `KIMI_BASE_URL`, `MINIMAX_BASE_URL`, or `MINIMAX_CN_BASE_URL` environment variables.
Base URLs can be overridden with `GLM_BASE_URL`, `KIMI_BASE_URL`, `MINIMAX_BASE_URL`, `MINIMAX_CN_BASE_URL`, or `DASHSCOPE_BASE_URL` environment variables.
## Custom & Self-Hosted LLM Providers
@@ -873,6 +878,7 @@ This controls both the `text_to_speech` tool and spoken replies in voice mode (`
display:
tool_progress: all # off | new | all | verbose
skin: default # Built-in or custom CLI skin (see user-guide/features/skins)
theme_mode: auto # auto | light | dark — color scheme for skin-aware rendering
personality: "kawaii" # Legacy cosmetic field still surfaced in some summaries
compact: false # Compact output mode (less whitespace)
resume_display: full # full (show previous messages on resume) | minimal (one-liner only)
@@ -882,6 +888,18 @@ display:
background_process_notifications: all # all | result | error | off (gateway only)
```
### Theme mode
The `theme_mode` setting controls whether skins render in light or dark mode:
| Mode | Behavior |
|------|----------|
| `auto` (default) | Detects your terminal's background color automatically. Falls back to `dark` if detection fails. |
| `light` | Forces light-mode skin colors. Skins that define a `colors_light` override use those colors instead of the default dark-mode palette. |
| `dark` | Forces dark-mode skin colors. |
This works with any skin — built-in or custom. Skin authors can provide `colors_light` in their skin definition for optimal light-terminal appearance.
| Mode | What you see |
|------|-------------|
| `off` | Silent — just the final response |
@@ -1056,6 +1074,54 @@ browser:
record_sessions: false # Auto-record browser sessions as WebM videos to ~/.hermes/browser_recordings/
```
The browser toolset supports multiple providers. See the [Browser feature page](/docs/user-guide/features/browser) for details on Browserbase, Browser Use, and local Chrome CDP setup.
## Website Blocklist
Block specific domains from being accessed by the agent's web and browser tools:
```yaml
website_blocklist:
enabled: false # Enable URL blocking (default: false)
domains: # List of blocked domain patterns
- "*.internal.company.com"
- "admin.example.com"
- "*.local"
shared_files: # Load additional rules from external files
- "/etc/hermes/blocked-sites.txt"
```
When enabled, any URL matching a blocked domain pattern is rejected before the web or browser tool executes. This applies to `web_search`, `web_extract`, `browser_navigate`, and any tool that accesses URLs.
Domain rules support:
- Exact domains: `admin.example.com`
- Wildcard subdomains: `*.internal.company.com` (blocks all subdomains)
- TLD wildcards: `*.local`
Shared files contain one domain rule per line (blank lines and `#` comments are ignored). Missing or unreadable files log a warning but don't disable other web tools.
The policy is cached for 30 seconds, so config changes take effect quickly without restart.
## Smart Approvals
Control how Hermes handles potentially dangerous commands:
```yaml
approval_mode: ask # ask | smart | off
```
| Mode | Behavior |
|------|----------|
| `ask` (default) | Prompt the user before executing any flagged command. In the CLI, shows an interactive approval dialog. In messaging, queues a pending approval request. |
| `smart` | Use an auxiliary LLM to assess whether a flagged command is actually dangerous. Low-risk commands are auto-approved with session-level persistence. Genuinely risky commands are escalated to the user. |
| `off` | Skip all approval checks. Equivalent to `HERMES_YOLO_MODE=true`. **Use with caution.** |
Smart mode is particularly useful for reducing approval fatigue — it lets the agent work more autonomously on safe operations while still catching genuinely destructive commands.
:::warning
Setting `approval_mode: off` disables all safety checks for terminal commands. Only use this in trusted, sandboxed environments.
:::
## Checkpoints
Automatic filesystem snapshots before destructive file operations. See the [Checkpoints feature page](/docs/user-guide/features/checkpoints) for details.

View File

@@ -1,27 +1,30 @@
---
title: Browser Automation
description: Control cloud browsers with Browserbase integration for web interaction, form filling, scraping, and more.
description: Control browsers with multiple providers, local Chrome via CDP, or cloud browsers for web interaction, form filling, scraping, and more.
sidebar_label: Browser
sidebar_position: 5
---
# Browser Automation
Hermes Agent includes a full browser automation toolset that can run in two modes:
Hermes Agent includes a full browser automation toolset with multiple backend options:
- **Browserbase cloud mode** via [Browserbase](https://browserbase.com) for managed cloud browsers and anti-bot tooling
- **Browser Use cloud mode** via [Browser Use](https://browser-use.com) as an alternative cloud browser provider
- **Local Chrome via CDP** — connect browser tools to your own Chrome instance using `/browser connect`
- **Local browser mode** via the `agent-browser` CLI and a local Chromium installation
In both modes, the agent can navigate websites, interact with page elements, fill forms, and extract information.
In all modes, the agent can navigate websites, interact with page elements, fill forms, and extract information.
## Overview
The browser tools use the `agent-browser` CLI. In Browserbase mode, `agent-browser` connects to Browserbase cloud sessions. In local mode, it drives a local Chromium installation. Pages are represented as **accessibility trees** (text-based snapshots), making them ideal for LLM agents. Interactive elements get ref IDs (like `@e1`, `@e2`) that the agent uses for clicking and typing.
Pages are represented as **accessibility trees** (text-based snapshots), making them ideal for LLM agents. Interactive elements get ref IDs (like `@e1`, `@e2`) that the agent uses for clicking and typing.
Key capabilities:
- **Cloud execution** — no local browser needed
- **Built-in stealth** — random fingerprints, CAPTCHA solving, residential proxies
- **Multi-provider cloud execution** — Browserbase or Browser Use, no local browser needed
- **Local Chrome integration** — attach to your running Chrome via CDP for hands-on browsing
- **Built-in stealth** — random fingerprints, CAPTCHA solving, residential proxies (Browserbase)
- **Session isolation** — each task gets its own browser session
- **Automatic cleanup** — inactive sessions are closed after a timeout
- **Vision analysis** — screenshot + AI analysis for visual understanding
@@ -40,9 +43,48 @@ BROWSERBASE_PROJECT_ID=your-project-id-here
Get your credentials at [browserbase.com](https://browserbase.com).
### Browser Use cloud mode
To use Browser Use as your cloud browser provider, add:
```bash
# Add to ~/.hermes/.env
BROWSER_USE_API_KEY=***
```
Get your API key at [browser-use.com](https://browser-use.com). Browser Use provides a cloud browser via its REST API. If both Browserbase and Browser Use credentials are set, Browserbase takes priority.
### Local Chrome via CDP (`/browser connect`)
Instead of a cloud provider, you can attach Hermes browser tools to your own running Chrome instance via the Chrome DevTools Protocol (CDP). This is useful when you want to see what the agent is doing in real-time, interact with pages that require your own cookies/sessions, or avoid cloud browser costs.
In the CLI, use:
```
/browser connect # Connect to Chrome at ws://localhost:9222
/browser connect ws://host:port # Connect to a specific CDP endpoint
/browser status # Check current connection
/browser disconnect # Detach and return to cloud/local mode
```
If Chrome isn't already running with remote debugging, Hermes will attempt to auto-launch it with `--remote-debugging-port=9222`.
:::tip
To start Chrome manually with CDP enabled:
```bash
# Linux
google-chrome --remote-debugging-port=9222
# macOS
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" --remote-debugging-port=9222
```
:::
When connected via CDP, all browser tools (`browser_navigate`, `browser_click`, etc.) operate on your live Chrome instance instead of spinning up a cloud session.
### Local browser mode
If you do **not** set Browserbase credentials, Hermes can still use the browser tools through a local Chromium install driven by `agent-browser`.
If you do **not** set any cloud credentials and don't use `/browser connect`, Hermes can still use the browser tools through a local Chromium install driven by `agent-browser`.
### Optional Environment Variables
@@ -232,10 +274,8 @@ If paid features aren't available on your plan, Hermes automatically falls back
## Limitations
- **Requires Browserbase account** — no local browser fallback
- **Requires `agent-browser` CLI** — must be installed via npm
- **Text-based interaction** — relies on accessibility tree, not pixel coordinates
- **Snapshot size** — large pages may be truncated or LLM-summarized at 8000 characters
- **Session timeout** — sessions expire based on your Browserbase plan settings
- **Cost** — each session consumes Browserbase credits; use `browser_close` when done
- **Session timeout** — cloud sessions expire based on your provider's plan settings
- **Cost** — cloud sessions consume provider credits; use `browser_close` when done. Use `/browser connect` for free local browsing.
- **No file downloads** — cannot download files from the browser

View File

@@ -1,178 +1,192 @@
---
sidebar_position: 10
title: "DingTalk"
description: "Set up Hermes Agent as a DingTalk bot using Stream Mode for real-time messaging"
description: "Set up Hermes Agent as a DingTalk chatbot"
---
# DingTalk Setup
Hermes connects to DingTalk through the [dingtalk-stream](https://pypi.org/project/dingtalk-stream/) SDK using Stream Mode — a WebSocket-based protocol that requires no public webhook URL. Messages arrive in real-time and responses are sent via the session webhook in markdown format.
Hermes Agent integrates with DingTalk (钉钉) as a chatbot, letting you chat with your AI assistant through direct messages or group chats. The bot connects via DingTalk's Stream Mode — a long-lived WebSocket connection that requires no public URL or webhook server — and replies using markdown-formatted messages through DingTalk's session webhook API.
DingTalk (钉钉) is Alibaba's enterprise communication platform with over 700 million registered users, making it the #1 business application in China. It combines messaging, video conferencing, task management, and workflow automation into a single platform used by millions of organizations.
Before setup, here's the part most people want to know: how Hermes behaves once it's in your DingTalk workspace.
:::info Dependencies
The DingTalk adapter requires additional Python packages:
## How Hermes Behaves
| Context | Behavior |
|---------|----------|
| **DMs (1:1 chat)** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. |
| **Group chats** | Hermes responds when you `@mention` it. Without a mention, Hermes ignores the message. |
| **Shared groups with multiple users** | By default, Hermes isolates session history per user inside the group. Two people talking in the same group do not share one transcript unless you explicitly disable that. |
### Session Model in DingTalk
By default:
- each DM gets its own session
- each user in a shared group chat gets their own session inside that group
This is controlled by `config.yaml`:
```yaml
group_sessions_per_user: true
```
Set it to `false` only if you explicitly want one shared conversation for the entire group:
```yaml
group_sessions_per_user: false
```
This guide walks you through the full setup process — from creating your DingTalk bot to sending your first message.
## Prerequisites
Install the required Python packages:
```bash
pip install dingtalk-stream httpx
```
`httpx` is already a core Hermes dependency, so in practice you only need `dingtalk-stream`.
- `dingtalk-stream` — DingTalk's official SDK for Stream Mode (WebSocket-based real-time messaging)
- `httpx` — async HTTP client used for sending replies via session webhooks
## Step 1: Create a DingTalk App
1. Go to the [DingTalk Developer Console](https://open-dev.dingtalk.com/).
2. Log in with your DingTalk admin account.
3. Click **Application Development****Custom Apps****Create App via H5 Micro-App** (or **Robot** depending on your console version).
4. Fill in:
- **App Name**: e.g., `Hermes Agent`
- **Description**: optional
5. After creating, navigate to **Credentials & Basic Info** to find your **Client ID** (AppKey) and **Client Secret** (AppSecret). Copy both.
:::warning[Credentials shown only once]
The Client Secret is only displayed once when you create the app. If you lose it, you'll need to regenerate it. Never share these credentials publicly or commit them to Git.
:::
---
## Step 2: Enable the Robot Capability
## Prerequisites
- **DingTalk developer account** — register at [open-dev.dingtalk.com](https://open-dev.dingtalk.com)
- **An application created** on the DingTalk Open Platform with Robot (机器人) capability enabled
---
## Step 1: Create a DingTalk Application
1. Go to [open-dev.dingtalk.com](https://open-dev.dingtalk.com) and log in
2. Click **Create Application** (创建应用)
3. Fill in the application name and description
4. Under **Capabilities** (添加能力), enable **Robot** (机器人)
5. In the Robot configuration:
- Enable **Stream Mode** (Stream 模式) — this is critical, as it eliminates the need for a public webhook URL
- Set the bot name and avatar
6. Navigate to **Credentials & Basic Info** (凭证与基本信息) to find:
- **AppKey** — this is your `DINGTALK_CLIENT_ID`
- **AppSecret** — this is your `DINGTALK_CLIENT_SECRET`
7. Publish the application (发布)
1. In your app's settings page, go to **Add Capability****Robot**.
2. Enable the robot capability.
3. Under **Message Reception Mode**, select **Stream Mode** (recommended — no public URL needed).
:::tip
Stream Mode is strongly recommended over the legacy HTTP webhook approach. It works behind firewalls, NATs, and requires no public IP or domain — the SDK maintains a persistent WebSocket connection to DingTalk's servers.
Stream Mode is the recommended setup. It uses a long-lived WebSocket connection initiated from your machine, so you don't need a public IP, domain name, or webhook endpoint. This works behind NAT, firewalls, and on local machines.
:::
---
## Step 3: Find Your DingTalk User ID
## Step 2: Configure Hermes
Hermes Agent uses your DingTalk User ID to control who can interact with the bot. DingTalk User IDs are alphanumeric strings set by your organization's admin.
The easiest way:
To find yours:
1. Ask your DingTalk organization admin — User IDs are configured in the DingTalk admin console under **Contacts****Members**.
2. Alternatively, the bot logs the `sender_id` for each incoming message. Start the gateway, send the bot a message, then check the logs for your ID.
## Step 4: Configure Hermes Agent
### Option A: Interactive Setup (Recommended)
Run the guided setup command:
```bash
hermes gateway setup
```
Select **DingTalk** from the platform menu. The wizard will:
Select **DingTalk** when prompted, then paste your Client ID, Client Secret, and allowed user IDs when asked.
1. Check if `dingtalk-stream` is installed
2. Prompt for your AppKey (Client ID)
3. Prompt for your AppSecret (Client Secret)
4. Configure allowed users and access policies
### Option B: Manual Configuration
### Manual Configuration
Add to `~/.hermes/.env`:
Add the following to your `~/.hermes/.env` file:
```bash
# Required
DINGTALK_CLIENT_ID=your-app-key
DINGTALK_CLIENT_SECRET=your-app-secret
# Security (recommended)
DINGTALK_ALLOWED_USERS=user1_staff_id,user2_staff_id # Comma-separated DingTalk staff IDs
# Security: restrict who can interact with the bot
DINGTALK_ALLOWED_USERS=user-id-1
# Optional
DINGTALK_HOME_CHANNEL=user1_staff_id # Default delivery target for cron jobs
# Multiple allowed users (comma-separated)
# DINGTALK_ALLOWED_USERS=user-id-1,user-id-2
```
Then start the gateway:
Optional behavior settings in `~/.hermes/config.yaml`:
```yaml
group_sessions_per_user: true
```
- `group_sessions_per_user: true` keeps each participant's context isolated inside shared group chats
### Start the Gateway
Once configured, start the DingTalk gateway:
```bash
hermes gateway # Foreground
hermes gateway install # Install as a user service
sudo hermes gateway install --system # Linux only: boot-time system service
hermes gateway
```
---
The bot should connect to DingTalk's Stream Mode within a few seconds. Send it a message — either a DM or in a group where it's been added — to test.
## Access Control
### DM Access
DM access follows the same pattern as all other Hermes platforms:
1. **`DINGTALK_ALLOWED_USERS` set** → only those users can message
2. **No allowlist set** → unknown users get a DM pairing code (approve via `hermes pairing approve dingtalk CODE`)
3. **`DINGTALK_ALLOW_ALL_USERS=true`** → anyone can message (use with caution)
### Group Access
In group chats, the bot responds when @mentioned. Group access follows the same rules — only allowed users can trigger the bot, even in groups.
---
## Features
### Stream Mode (No Webhook URL)
Unlike traditional bot platforms that require a publicly accessible webhook endpoint, DingTalk's Stream Mode uses a persistent WebSocket connection initiated from your side. This means:
- **No public IP required** — works behind firewalls and NATs
- **No domain or SSL certificate needed** — the SDK handles the connection
- **Automatic reconnection** — if the connection drops, the adapter reconnects with exponential backoff (2s → 5s → 10s → 30s → 60s)
### Markdown Replies
Responses are sent in DingTalk's markdown format, which supports rich text formatting including headers, bold, italic, links, and code blocks.
### DM and Group Chat
The adapter supports both:
- **Direct Messages (1:1)** — private conversations with the bot
- **Group Chat** — the bot responds when @mentioned in a group
### Message Deduplication
The adapter tracks recently processed message IDs (up to 1,000 messages within a 5-minute window) to prevent duplicate processing if DingTalk redelivers a message.
### Auto-Reconnection
If the WebSocket connection drops, the adapter automatically reconnects using exponential backoff:
- Retry intervals: 2s, 5s, 10s, 30s, 60s
- Reconnection is transparent — no manual intervention needed
---
:::tip
You can run `hermes gateway` in the background or as a systemd service for persistent operation. See the deployment docs for details.
:::
## Troubleshooting
| Problem | Solution |
|---------|----------|
| **"dingtalk-stream not installed"** | Run `pip install dingtalk-stream httpx` in the Hermes environment |
| **"DINGTALK_CLIENT_ID not set"** | Set `DINGTALK_CLIENT_ID` and `DINGTALK_CLIENT_SECRET` in `~/.hermes/.env` |
| **Bot not responding** | Verify the application is published on open-dev.dingtalk.com and Stream Mode is enabled |
| **Connection keeps dropping** | Check network connectivity. The adapter will auto-reconnect with backoff. Check logs for specific error messages. |
| **Messages processed twice** | This is rare — the deduplication window handles most cases. If persistent, check that only one gateway instance is running. |
| **Bot responds to no one** | Configure `DINGTALK_ALLOWED_USERS`, use DM pairing, or explicitly allow all users through gateway policy if you want broader access. |
| **Group messages ignored** | Ensure the bot is @mentioned in group chats. Only @mentions trigger the bot in groups. |
### Bot is not responding to messages
---
**Cause**: The robot capability isn't enabled, or `DINGTALK_ALLOWED_USERS` doesn't include your User ID.
**Fix**: Verify the robot capability is enabled in your app settings and that Stream Mode is selected. Check that your User ID is in `DINGTALK_ALLOWED_USERS`. Restart the gateway.
### "dingtalk-stream not installed" error
**Cause**: The `dingtalk-stream` Python package is not installed.
**Fix**: Install it:
```bash
pip install dingtalk-stream httpx
```
### "DINGTALK_CLIENT_ID and DINGTALK_CLIENT_SECRET required"
**Cause**: The credentials aren't set in your environment or `.env` file.
**Fix**: Verify `DINGTALK_CLIENT_ID` and `DINGTALK_CLIENT_SECRET` are set correctly in `~/.hermes/.env`. The Client ID is your AppKey, and the Client Secret is your AppSecret from the DingTalk Developer Console.
### Stream disconnects / reconnection loops
**Cause**: Network instability, DingTalk platform maintenance, or credential issues.
**Fix**: The adapter automatically reconnects with exponential backoff (2s → 5s → 10s → 30s → 60s). Check that your credentials are valid and your app hasn't been deactivated. Verify your network allows outbound WebSocket connections.
### Bot is offline
**Cause**: The Hermes gateway isn't running, or it failed to connect.
**Fix**: Check that `hermes gateway` is running. Look at the terminal output for error messages. Common issues: wrong credentials, app deactivated, `dingtalk-stream` or `httpx` not installed.
### "No session_webhook available"
**Cause**: The bot tried to reply but doesn't have a session webhook URL. This typically happens if the webhook expired or the bot was restarted between receiving the message and sending the reply.
**Fix**: Send a new message to the bot — each incoming message provides a fresh session webhook for replies. This is a normal DingTalk limitation; the bot can only reply to messages it has received recently.
## Security
:::warning
**Always configure access controls.** The bot has terminal access by default. Without `DINGTALK_ALLOWED_USERS` or DM pairing, the gateway denies all incoming messages as a safety measure.
Always set `DINGTALK_ALLOWED_USERS` to restrict who can interact with the bot. Without it, the gateway denies all users by default as a safety measure. Only add User IDs of people you trust — authorized users have full access to the agent's capabilities, including tool use and system access.
:::
- Use DM pairing or explicit allowlists for safe onboarding of new users
- Keep your AppSecret confidential — treat it like a password
- The `DINGTALK_CLIENT_SECRET` in `~/.hermes/.env` should be readable only by the user running Hermes
- DingTalk's Stream Mode connection is encrypted via TLS
For more information on securing your Hermes Agent deployment, see the [Security Guide](../security.md).
---
## Notes
## Environment Variables Reference
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `DINGTALK_CLIENT_ID` | Yes | — | DingTalk application AppKey |
| `DINGTALK_CLIENT_SECRET` | Yes | — | DingTalk application AppSecret |
| `DINGTALK_ALLOWED_USERS` | No | — | Comma-separated DingTalk staff IDs |
| `DINGTALK_ALLOW_ALL_USERS` | No | `false` | Allow all users (not recommended) |
| `DINGTALK_HOME_CHANNEL` | No | — | Default delivery target for cron jobs |
- **Stream Mode**: No public URL, domain name, or webhook server needed. The connection is initiated from your machine via WebSocket, so it works behind NAT and firewalls.
- **Markdown responses**: Replies are formatted in DingTalk's markdown format for rich text display.
- **Message deduplication**: The adapter deduplicates messages with a 5-minute window to prevent processing the same message twice.
- **Auto-reconnection**: If the stream connection drops, the adapter automatically reconnects with exponential backoff.
- **Message length limit**: Responses are capped at 20,000 characters per message. Longer responses are truncated.

View File

@@ -1,12 +1,12 @@
---
sidebar_position: 1
title: "Messaging Gateway"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, DingTalk, Home Assistant, or your browser — architecture and setup overview"
description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, or your browser — architecture and setup overview"
---
# Messaging Gateway
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, DingTalk, Home Assistant, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes).
@@ -24,6 +24,8 @@ flowchart TB
sms[SMS]
em[Email]
ha[Home Assistant]
mm[Mattermost]
mx[Matrix]
dt[DingTalk]
end
@@ -40,6 +42,8 @@ flowchart TB
sms --> store
em --> store
ha --> store
mm --> store
mx --> store
dt --> store
store --> agent
cron --> store
@@ -133,9 +137,11 @@ Configure per-platform overrides in `~/.hermes/gateway.json`:
TELEGRAM_ALLOWED_USERS=123456789,987654321
DISCORD_ALLOWED_USERS=123456789012345678
SIGNAL_ALLOWED_USERS=+155****4567,+155****6543
DINGTALK_ALLOWED_USERS=staff_id_1,staff_id_2
SMS_ALLOWED_USERS=+155****4567,+155****6543
EMAIL_ALLOWED_USERS=trusted@example.com,colleague@work.com
MATTERMOST_ALLOWED_USERS=3uo8dkh1p7g1mfk49ear5fzs5c
MATRIX_ALLOWED_USERS=@alice:matrix.org
DINGTALK_ALLOWED_USERS=user-id-1
# Or allow
GATEWAY_ALLOWED_USERS=123456789,987654321
@@ -296,8 +302,10 @@ Each platform has its own toolset:
| Signal | `hermes-signal` | Full tools including terminal |
| SMS | `hermes-sms` | Full tools including terminal |
| Email | `hermes-email` | Full tools including terminal |
| DingTalk | `hermes-dingtalk` | Full tools including terminal |
| Home Assistant | `hermes-homeassistant` | Full tools + HA device control (ha_list_entities, ha_get_state, ha_call_service, ha_list_services) |
| Mattermost | `hermes-mattermost` | Full tools including terminal |
| Matrix | `hermes-matrix` | Full tools including terminal |
| DingTalk | `hermes-dingtalk` | Full tools including terminal |
## Next Steps
@@ -308,5 +316,7 @@ Each platform has its own toolset:
- [Signal Setup](signal.md)
- [SMS Setup (Twilio)](sms.md)
- [Email Setup](email.md)
- [DingTalk Setup](dingtalk.md)
- [Home Assistant Integration](homeassistant.md)
- [Mattermost Setup](mattermost.md)
- [Matrix Setup](matrix.md)
- [DingTalk Setup](dingtalk.md)

View File

@@ -277,6 +277,25 @@ Error messages from MCP tools are sanitized before being returned to the LLM. Th
- Bearer tokens
- `token=`, `key=`, `API_KEY=`, `password=`, `secret=` parameters
### Website Access Policy
You can restrict which websites the agent can access through its web and browser tools. This is useful for preventing the agent from accessing internal services, admin panels, or other sensitive URLs.
```yaml
# In ~/.hermes/config.yaml
website_blocklist:
enabled: true
domains:
- "*.internal.company.com"
- "admin.example.com"
shared_files:
- "/etc/hermes/blocked-sites.txt"
```
When a blocked URL is requested, the tool returns an error explaining the domain is blocked by policy. The blocklist is enforced across `web_search`, `web_extract`, `browser_navigate`, and all URL-capable tools.
See [Website Blocklist](/docs/user-guide/configuration#website-blocklist) in the configuration guide for full details.
### Context File Injection Protection
Context files (AGENTS.md, .cursorrules, SOUL.md) are scanned for prompt injection before being included in the system prompt. The scanner checks for:

View File

@@ -48,6 +48,9 @@ const sidebars: SidebarsConfig = {
'user-guide/messaging/signal',
'user-guide/messaging/email',
'user-guide/messaging/homeassistant',
'user-guide/messaging/mattermost',
'user-guide/messaging/matrix',
'user-guide/messaging/dingtalk',
],
},
{