Compare commits

..

2 Commits

Author SHA1 Message Date
teknium1
e1369d1936 docs: add /insights to all help menus and documentation
- website/docs/reference/cli-commands.md: Added 'hermes insights' terminal
  command section with --days and --source flags, plus /insights slash command
  in the Conversation section
- website/docs/user-guide/cli.md: Added /insights to slash commands table
- website/docs/user-guide/messaging/index.md: Added /insights to gateway
  chat commands table
- website/docs/user-guide/sessions.md: Added cross-reference to hermes
  insights from the sessions stats section
2026-03-06 16:03:20 -08:00
teknium1
64133814a2 fix: restore all removed bundled skills + fix skills sync system
- Restored 21 skills removed in commits 757d012 and 740dd92:
  accelerate, audiocraft, code-review, faiss, flash-attention, gguf,
  grpo-rl-training, guidance, llava, nemo-curator, obliteratus, peft,
  pytorch-fsdp, pytorch-lightning, simpo, slime, stable-diffusion,
  tensorrt-llm, torchtitan, trl-fine-tuning, whisper

- Rewrote sync_skills() with proper update semantics:
  * New skills (not in manifest): copied to user dir
  * Existing skills (in manifest + on disk): updated via hash comparison
  * User-deleted skills (in manifest, not on disk): respected, not re-added
  * Stale manifest entries (removed from bundled): cleaned from manifest

- Added sync_skills() to CLI startup (cmd_chat) and gateway startup
  (start_gateway) — previously only ran during 'hermes update'

- Updated cmd_update output to show new/updated/cleaned counts

- Rewrote tests: 20 tests covering manifest CRUD, dir hashing, fresh
  install, user deletion respect, update detection, stale cleanup, and
  name collision handling

75 bundled skills total. 2002 tests pass.
2026-03-06 15:57:12 -08:00
14 changed files with 1051 additions and 1778 deletions

8
cli.py
View File

@@ -870,13 +870,7 @@ class HermesCLI:
or os.getenv("OPENAI_BASE_URL")
or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
)
# Match key to resolved base_url: OpenRouter URL → prefer OPENROUTER_API_KEY,
# custom endpoint → prefer OPENAI_API_KEY (issue #560).
# Note: _ensure_runtime_credentials() re-resolves this before first use.
if "openrouter.ai" in self.base_url:
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY")
else:
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY")
self._nous_key_expires_at: Optional[str] = None
self._nous_key_source: Optional[str] = None
# Max turns priority: CLI arg > config file > env var > default

View File

@@ -864,8 +864,6 @@ def _update_via_zip(args):
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
@@ -984,8 +982,6 @@ def cmd_update(args):
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
@@ -1219,15 +1215,7 @@ For more help on a command:
setup_parser = subparsers.add_parser(
"setup",
help="Interactive setup wizard",
description="Configure Hermes Agent with an interactive wizard. "
"Run a specific section: hermes setup model|terminal|gateway|tools|agent"
)
setup_parser.add_argument(
"section",
nargs="?",
choices=["model", "terminal", "gateway", "tools", "agent"],
default=None,
help="Run a specific setup section instead of the full wizard"
description="Configure Hermes Agent with an interactive wizard"
)
setup_parser.add_argument(
"--non-interactive",

View File

@@ -72,25 +72,12 @@ def _resolve_openrouter_runtime(
or OPENROUTER_BASE_URL
).rstrip("/")
# Choose API key based on whether the resolved base_url targets OpenRouter.
# When hitting OpenRouter, prefer OPENROUTER_API_KEY (issue #289).
# When hitting a custom endpoint, prefer OPENAI_API_KEY so the OpenRouter
# key doesn't leak to an unrelated provider (issue #560).
_is_openrouter_url = "openrouter.ai" in base_url
if _is_openrouter_url:
api_key = (
explicit_api_key
or os.getenv("OPENROUTER_API_KEY")
or os.getenv("OPENAI_API_KEY")
or ""
)
else:
api_key = (
explicit_api_key
or os.getenv("OPENAI_API_KEY")
or os.getenv("OPENROUTER_API_KEY")
or ""
)
api_key = (
explicit_api_key
or os.getenv("OPENROUTER_API_KEY")
or os.getenv("OPENAI_API_KEY")
or ""
)
source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config"

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,7 @@
"""
Unified tool configuration for Hermes Agent.
`hermes tools` and `hermes setup tools` both enter this module.
Select a platform → toggle toolsets on/off → for newly enabled tools
that need API keys, run through provider-aware configuration.
Interactive tool configuration for Hermes Agent.
`hermes tools` — select a platform, then toggle toolsets on/off via checklist.
Saves per-platform tool configuration to ~/.hermes/config.yaml under
the `platform_toolsets` key.
"""
@@ -15,63 +12,9 @@ from typing import Dict, List, Set
import os
from hermes_cli.config import (
load_config, save_config, get_env_value, save_env_value,
get_hermes_home,
)
from hermes_cli.config import load_config, save_config, get_env_value, save_env_value
from hermes_cli.colors import Colors, color
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
# ─── UI Helpers (shared with setup.py) ────────────────────────────────────────
def _print_info(text: str):
print(color(f" {text}", Colors.DIM))
def _print_success(text: str):
print(color(f"{text}", Colors.GREEN))
def _print_warning(text: str):
print(color(f"{text}", Colors.YELLOW))
def _print_error(text: str):
print(color(f"{text}", Colors.RED))
def _prompt(question: str, default: str = None, password: bool = False) -> str:
if default:
display = f"{question} [{default}]: "
else:
display = f"{question}: "
try:
if password:
import getpass
value = getpass.getpass(color(display, Colors.YELLOW))
else:
value = input(color(display, Colors.YELLOW))
return value.strip() or default or ""
except (KeyboardInterrupt, EOFError):
print()
return default or ""
def _prompt_yes_no(question: str, default: bool = True) -> bool:
default_str = "Y/n" if default else "y/N"
while True:
try:
value = input(color(f"{question} [{default_str}]: ", Colors.YELLOW)).strip().lower()
except (KeyboardInterrupt, EOFError):
print()
return default
if not value:
return default
if value in ('y', 'yes'):
return True
if value in ('n', 'no'):
return False
# ─── Toolset Registry ─────────────────────────────────────────────────────────
# Toolsets shown in the configurator, grouped for display.
# Each entry: (toolset_name, label, description)
# These map to keys in toolsets.py TOOLSETS dict.
@@ -106,181 +49,6 @@ PLATFORMS = {
}
# ─── Tool Categories (provider-aware configuration) ──────────────────────────
# Maps toolset keys to their provider options. When a toolset is newly enabled,
# we use this to show provider selection and prompt for the right API keys.
# Toolsets not in this map either need no config or use the simple fallback.
TOOL_CATEGORIES = {
"tts": {
"name": "Text-to-Speech",
"icon": "🔊",
"providers": [
{
"name": "Microsoft Edge TTS",
"tag": "Free - no API key needed",
"env_vars": [],
"tts_provider": "edge",
},
{
"name": "OpenAI TTS",
"tag": "Premium - high quality voices",
"env_vars": [
{"key": "VOICE_TOOLS_OPENAI_KEY", "prompt": "OpenAI API key", "url": "https://platform.openai.com/api-keys"},
],
"tts_provider": "openai",
},
{
"name": "ElevenLabs",
"tag": "Premium - most natural voices",
"env_vars": [
{"key": "ELEVENLABS_API_KEY", "prompt": "ElevenLabs API key", "url": "https://elevenlabs.io/app/settings/api-keys"},
],
"tts_provider": "elevenlabs",
},
],
},
"web": {
"name": "Web Search & Extract",
"icon": "🔍",
"providers": [
{
"name": "Firecrawl Cloud",
"tag": "Recommended - hosted service",
"env_vars": [
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
],
},
{
"name": "Firecrawl Self-Hosted",
"tag": "Free - run your own instance",
"env_vars": [
{"key": "FIRECRAWL_API_URL", "prompt": "Your Firecrawl instance URL (e.g., http://localhost:3002)"},
],
},
],
},
"image_gen": {
"name": "Image Generation",
"icon": "🎨",
"providers": [
{
"name": "FAL.ai",
"tag": "FLUX 2 Pro with auto-upscaling",
"env_vars": [
{"key": "FAL_KEY", "prompt": "FAL API key", "url": "https://fal.ai/dashboard/keys"},
],
},
],
},
"browser": {
"name": "Browser Automation",
"icon": "🌐",
"providers": [
{
"name": "Browserbase",
"tag": "Cloud browser with stealth mode",
"env_vars": [
{"key": "BROWSERBASE_API_KEY", "prompt": "Browserbase API key", "url": "https://browserbase.com"},
{"key": "BROWSERBASE_PROJECT_ID", "prompt": "Browserbase project ID"},
],
"post_setup": "browserbase",
},
],
},
"homeassistant": {
"name": "Smart Home",
"icon": "🏠",
"providers": [
{
"name": "Home Assistant",
"tag": "REST API integration",
"env_vars": [
{"key": "HASS_TOKEN", "prompt": "Home Assistant Long-Lived Access Token"},
{"key": "HASS_URL", "prompt": "Home Assistant URL", "default": "http://homeassistant.local:8123"},
],
},
],
},
"rl": {
"name": "RL Training",
"icon": "🧪",
"requires_python": (3, 11),
"providers": [
{
"name": "Tinker / Atropos",
"tag": "RL training platform",
"env_vars": [
{"key": "TINKER_API_KEY", "prompt": "Tinker API key", "url": "https://tinker-console.thinkingmachines.ai/keys"},
{"key": "WANDB_API_KEY", "prompt": "WandB API key", "url": "https://wandb.ai/authorize"},
],
"post_setup": "rl_training",
},
],
},
}
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.
# Used as a fallback for tools like vision/moa that just need an API key.
TOOLSET_ENV_REQUIREMENTS = {
"vision": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"moa": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
}
# ─── Post-Setup Hooks ─────────────────────────────────────────────────────────
def _run_post_setup(post_setup_key: str):
"""Run post-setup hooks for tools that need extra installation steps."""
import shutil
if post_setup_key == "browserbase":
node_modules = PROJECT_ROOT / "node_modules" / "agent-browser"
if not node_modules.exists() and shutil.which("npm"):
_print_info(" Installing Node.js dependencies for browser tools...")
import subprocess
result = subprocess.run(
["npm", "install", "--silent"],
capture_output=True, text=True, cwd=str(PROJECT_ROOT)
)
if result.returncode == 0:
_print_success(" Node.js dependencies installed")
else:
_print_warning(" npm install failed - run manually: cd ~/.hermes/hermes-agent && npm install")
elif not node_modules.exists():
_print_warning(" Node.js not found - browser tools require: npm install (in hermes-agent directory)")
elif post_setup_key == "rl_training":
try:
__import__("tinker_atropos")
except ImportError:
tinker_dir = PROJECT_ROOT / "tinker-atropos"
if tinker_dir.exists() and (tinker_dir / "pyproject.toml").exists():
_print_info(" Installing tinker-atropos submodule...")
import subprocess
uv_bin = shutil.which("uv")
if uv_bin:
result = subprocess.run(
[uv_bin, "pip", "install", "-e", str(tinker_dir)],
capture_output=True, text=True
)
else:
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "-e", str(tinker_dir)],
capture_output=True, text=True
)
if result.returncode == 0:
_print_success(" tinker-atropos installed")
else:
_print_warning(" tinker-atropos install failed - run manually:")
_print_info(' uv pip install -e "./tinker-atropos"')
else:
_print_warning(" tinker-atropos submodule not found - run:")
_print_info(" git submodule update --init --recursive")
_print_info(' uv pip install -e "./tinker-atropos"')
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────
def _get_enabled_platforms() -> List[str]:
"""Return platform keys that are configured (have tokens or are CLI)."""
enabled = ["cli"]
@@ -329,28 +97,6 @@ def _save_platform_tools(config: dict, platform: str, enabled_toolset_keys: Set[
save_config(config)
def _toolset_has_keys(ts_key: str) -> bool:
"""Check if a toolset's required API keys are configured."""
# Check TOOL_CATEGORIES first (provider-aware)
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
for provider in cat["providers"]:
env_vars = provider.get("env_vars", [])
if not env_vars:
return True # Free provider (e.g., Edge TTS)
if all(get_env_value(v["key"]) for v in env_vars):
return True
return False
# Fallback to simple requirements
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return True
return all(get_env_value(var) for var, _ in requirements)
# ─── Menu Helpers ─────────────────────────────────────────────────────────────
def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
"""Single-select menu (arrow keys)."""
print(color(question, Colors.YELLOW))
@@ -368,7 +114,7 @@ def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
)
idx = menu.show()
if idx is None:
return default
sys.exit(0)
print()
return idx
except (ImportError, NotImplementedError):
@@ -386,7 +132,15 @@ def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
return idx
except (ValueError, KeyboardInterrupt, EOFError):
print()
return default
sys.exit(0)
def _toolset_has_keys(ts_key: str) -> bool:
"""Check if a toolset's required API keys are configured."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return True
return all(get_env_value(var) for var, _ in requirements)
def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str]:
@@ -396,8 +150,8 @@ def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str
labels = []
for ts_key, ts_label, ts_desc in CONFIGURABLE_TOOLSETS:
suffix = ""
if not _toolset_has_keys(ts_key) and (TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)):
suffix = " [no API key]"
if not _toolset_has_keys(ts_key) and TOOLSET_ENV_REQUIREMENTS.get(ts_key):
suffix = " no API key"
labels.append(f"{ts_label} ({ts_desc}){suffix}")
pre_selected_indices = [
@@ -548,294 +302,77 @@ def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str
return {CONFIGURABLE_TOOLSETS[i][0] for i in selected}
# ─── Provider-Aware Configuration ────────────────────────────────────────────
def _configure_toolset(ts_key: str, config: dict):
"""Configure a toolset - provider selection + API keys.
Uses TOOL_CATEGORIES for provider-aware config, falls back to simple
env var prompts for toolsets not in TOOL_CATEGORIES.
"""
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
_configure_tool_category(ts_key, cat, config)
else:
# Simple fallback for vision, moa, etc.
_configure_simple_requirements(ts_key)
# Map toolset keys to the env vars they require and where to get them
TOOLSET_ENV_REQUIREMENTS = {
"web": [("FIRECRAWL_API_KEY", "https://firecrawl.dev/")],
"browser": [("BROWSERBASE_API_KEY", "https://browserbase.com/"),
("BROWSERBASE_PROJECT_ID", None)],
"vision": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"image_gen": [("FAL_KEY", "https://fal.ai/")],
"moa": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"tts": [], # Edge TTS is free, no key needed
"rl": [("TINKER_API_KEY", "https://tinker-console.thinkingmachines.ai/keys"),
("WANDB_API_KEY", "https://wandb.ai/authorize")],
"homeassistant": [("HASS_TOKEN", "Home Assistant > Profile > Long-Lived Access Tokens"),
("HASS_URL", None)],
}
def _configure_tool_category(ts_key: str, cat: dict, config: dict):
"""Configure a tool category with provider selection."""
icon = cat.get("icon", "")
name = cat["name"]
providers = cat["providers"]
def _check_and_prompt_requirements(newly_enabled: Set[str]):
"""Check if newly enabled toolsets have missing API keys and offer to set them up."""
for ts_key in sorted(newly_enabled):
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
continue
# Check Python version requirement
if cat.get("requires_python"):
req = cat["requires_python"]
if sys.version_info < req:
print()
_print_error(f" {name} requires Python {req[0]}.{req[1]}+ (current: {sys.version_info.major}.{sys.version_info.minor})")
_print_info(" Upgrade Python and reinstall to enable this tool.")
return
missing = [(var, url) for var, url in requirements if not get_env_value(var)]
if not missing:
continue
if len(providers) == 1:
# Single provider - configure directly
provider = providers[0]
print()
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
if provider.get("tag"):
_print_info(f" {provider['tag']}")
_configure_provider(provider, config)
else:
# Multiple providers - let user choose
print()
print(color(f" --- {icon} {name} - Choose a provider ---", Colors.CYAN))
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f"{ts_label} requires configuration:", Colors.YELLOW))
# Plain text labels only (no ANSI codes in menu items)
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
configured = " [active]"
elif not env_vars:
configured = " [active]" if config.get("tts", {}).get("provider", "edge") == p.get("tts_provider", "") else ""
else:
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
# Detect current provider as default
default_idx = 0
for i, p in enumerate(providers):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
default_idx = i
break
env_vars = p.get("env_vars", [])
if env_vars and all(get_env_value(v["key"]) for v in env_vars):
default_idx = i
break
provider_idx = _prompt_choice(" Select provider:", provider_choices, default_idx)
_configure_provider(providers[provider_idx], config)
def _configure_provider(provider: dict, config: dict):
"""Configure a single provider - prompt for API keys and set config."""
env_vars = provider.get("env_vars", [])
# Set TTS provider in config if applicable
if provider.get("tts_provider"):
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
if not env_vars:
_print_success(f" {provider['name']} - no configuration needed!")
return
# Prompt for each required env var
all_configured = True
for var in env_vars:
existing = get_env_value(var["key"])
if existing:
_print_success(f" {var['key']}: already configured")
# Don't ask to update - this is a new enable flow.
# Reconfigure is handled separately.
else:
url = var.get("url", "")
for var, url in missing:
if url:
_print_info(f" Get yours at: {url}")
default_val = var.get("default", "")
if default_val:
value = _prompt(f" {var.get('prompt', var['key'])}", default_val)
print(color(f" {var}", Colors.CYAN) + color(f" ({url})", Colors.DIM))
else:
value = _prompt(f" {var.get('prompt', var['key'])}", password=True)
print(color(f" {var}", Colors.CYAN))
if value:
save_env_value(var["key"], value)
_print_success(f" Saved")
else:
_print_warning(f" Skipped")
all_configured = False
# Run post-setup hooks if needed
if provider.get("post_setup") and all_configured:
_run_post_setup(provider["post_setup"])
if all_configured:
_print_success(f" {provider['name']} configured!")
def _configure_simple_requirements(ts_key: str):
"""Simple fallback for toolsets that just need env vars (no provider selection)."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return
missing = [(var, url) for var, url in requirements if not get_env_value(var)]
if not missing:
return
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f" {ts_label} requires configuration:", Colors.YELLOW))
for var, url in missing:
if url:
_print_info(f" Get key at: {url}")
value = _prompt(f" {var}", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Saved")
else:
_print_warning(f" Skipped")
def _reconfigure_tool(config: dict):
"""Let user reconfigure an existing tool's provider or API key."""
# Build list of configurable tools that are currently set up
configurable = []
for ts_key, ts_label, _ in CONFIGURABLE_TOOLSETS:
cat = TOOL_CATEGORIES.get(ts_key)
reqs = TOOLSET_ENV_REQUIREMENTS.get(ts_key)
if cat or reqs:
if _toolset_has_keys(ts_key):
configurable.append((ts_key, ts_label))
if not configurable:
_print_info("No configured tools to reconfigure.")
return
choices = [label for _, label in configurable]
choices.append("Cancel")
idx = _prompt_choice(" Which tool would you like to reconfigure?", choices, len(choices) - 1)
if idx >= len(configurable):
return # Cancel
ts_key, ts_label = configurable[idx]
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
_configure_tool_category_for_reconfig(ts_key, cat, config)
else:
_reconfigure_simple_requirements(ts_key)
save_config(config)
def _configure_tool_category_for_reconfig(ts_key: str, cat: dict, config: dict):
"""Reconfigure a tool category - provider selection + API key update."""
icon = cat.get("icon", "")
name = cat["name"]
providers = cat["providers"]
if len(providers) == 1:
provider = providers[0]
print()
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
_reconfigure_provider(provider, config)
else:
print()
print(color(f" --- {icon} {name} - Choose a provider ---", Colors.CYAN))
print()
try:
response = input(color(" Set up now? [Y/n] ", Colors.YELLOW)).strip().lower()
except (KeyboardInterrupt, EOFError):
print()
continue
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
configured = " [active]"
elif not env_vars:
configured = ""
if response in ("", "y", "yes"):
for var, url in missing:
if url:
print(color(f" Get key at: {url}", Colors.DIM))
try:
import getpass
value = getpass.getpass(color(f" {var}: ", Colors.YELLOW))
except (KeyboardInterrupt, EOFError):
print()
break
if value.strip():
save_env_value(var, value.strip())
print(color(f" ✓ Saved", Colors.GREEN))
else:
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
default_idx = 0
for i, p in enumerate(providers):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
default_idx = i
break
env_vars = p.get("env_vars", [])
if env_vars and all(get_env_value(v["key"]) for v in env_vars):
default_idx = i
break
provider_idx = _prompt_choice(" Select provider:", provider_choices, default_idx)
_reconfigure_provider(providers[provider_idx], config)
def _reconfigure_provider(provider: dict, config: dict):
"""Reconfigure a provider - update API keys."""
env_vars = provider.get("env_vars", [])
if provider.get("tts_provider"):
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
_print_success(f" TTS provider set to: {provider['tts_provider']}")
if not env_vars:
_print_success(f" {provider['name']} - no configuration needed!")
return
for var in env_vars:
existing = get_env_value(var["key"])
if existing:
_print_info(f" {var['key']}: configured ({existing[:8]}...)")
url = var.get("url", "")
if url:
_print_info(f" Get yours at: {url}")
default_val = var.get("default", "")
value = _prompt(f" {var.get('prompt', var['key'])} (Enter to keep current)", password=not default_val)
if value and value.strip():
save_env_value(var["key"], value.strip())
_print_success(f" Updated")
print(color(f" Skipped", Colors.DIM))
else:
_print_info(f" Kept current")
print(color(" Skipped — configure later with 'hermes setup'", Colors.DIM))
def _reconfigure_simple_requirements(ts_key: str):
"""Reconfigure simple env var requirements."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f" {ts_label}:", Colors.CYAN))
for var, url in requirements:
existing = get_env_value(var)
if existing:
_print_info(f" {var}: configured ({existing[:8]}...)")
if url:
_print_info(f" Get key at: {url}")
value = _prompt(f" {var} (Enter to keep current)", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Updated")
else:
_print_info(f" Kept current")
# ─── Main Entry Point ─────────────────────────────────────────────────────────
def tools_command(args=None):
"""Entry point for `hermes tools` and `hermes setup tools`."""
def tools_command(args):
"""Entry point for `hermes tools`."""
config = load_config()
enabled_platforms = _get_enabled_platforms()
print()
print(color("⚕ Hermes Tool Configuration", Colors.CYAN, Colors.BOLD))
print(color(" Enable or disable tools per platform.", Colors.DIM))
print(color(" Tools that need API keys will be configured when enabled.", Colors.DIM))
print()
# Build platform choices
@@ -843,27 +380,21 @@ def tools_command(args=None):
platform_keys = []
for pkey in enabled_platforms:
pinfo = PLATFORMS[pkey]
# Count currently enabled toolsets
current = _get_platform_tools(config, pkey)
count = len(current)
total = len(CONFIGURABLE_TOOLSETS)
platform_choices.append(f"Configure {pinfo['label']} ({count}/{total} enabled)")
platform_keys.append(pkey)
platform_choices.append("Reconfigure an existing tool's provider or API key")
platform_choices.append("Done")
platform_choices.append("Done — save and exit")
while True:
idx = _prompt_choice("Select an option:", platform_choices, default=0)
idx = _prompt_choice("Select a platform to configure:", platform_choices, default=0)
# "Done" selected
if idx == len(platform_keys) + 1:
break
# "Reconfigure" selected
if idx == len(platform_keys):
_reconfigure_tool(config)
print()
continue
break
pkey = platform_keys[idx]
pinfo = PLATFORMS[pkey]
@@ -887,15 +418,11 @@ def tools_command(args=None):
label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts), ts)
print(color(f" - {label}", Colors.RED))
# Configure newly enabled toolsets that need API keys
# Prompt for missing API keys on newly enabled toolsets
if added:
for ts_key in sorted(added):
if TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key):
if not _toolset_has_keys(ts_key):
_configure_toolset(ts_key, config)
_check_and_prompt_requirements(added)
_save_platform_tools(config, pkey, new_enabled)
save_config(config)
print(color(f" ✓ Saved {pinfo['label']} configuration", Colors.GREEN))
else:
print(color(f" No changes to {pinfo['label']}", Colors.DIM))

View File

@@ -225,18 +225,6 @@ def get_tool_definitions(
# Ask the registry for schemas (only returns tools whose check_fn passes)
filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode)
# Rebuild execute_code schema to only list sandbox tools that are actually
# enabled. Without this, the model sees "web_search is available in
# execute_code" even when the user disabled the web toolset (#560-discord).
if "execute_code" in tools_to_include:
from tools.code_execution_tool import SANDBOX_ALLOWED_TOOLS, build_execute_code_schema
sandbox_enabled = SANDBOX_ALLOWED_TOOLS & tools_to_include
dynamic_schema = build_execute_code_schema(sandbox_enabled)
for i, td in enumerate(filtered_tools):
if td.get("function", {}).get("name") == "execute_code":
filtered_tools[i] = {"type": "function", "function": dynamic_schema}
break
if not quiet_mode:
if filtered_tools:
tool_names = [t["function"]["name"] for t in filtered_tools]

View File

@@ -3707,33 +3707,13 @@ class AIAgent:
# Check if response only has think block with no actual content after it
if not self._has_content_after_think_block(final_response):
# If the previous turn already delivered real content alongside
# tool calls (e.g. "You're welcome!" + memory save), the model
# has nothing more to say. Use the earlier content immediately
# instead of wasting API calls on retries that won't help.
fallback = getattr(self, '_last_content_with_tools', None)
if fallback:
logger.debug("Empty follow-up after tool calls — using prior turn content as final response")
self._last_content_with_tools = None
self._empty_content_retries = 0
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_names = []
for tc in msg["tool_calls"]:
fn = tc.get("function", {})
tool_names.append(fn.get("name", "unknown"))
msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..."
break
final_response = self._strip_think_blocks(fallback).strip()
break
# No fallback available — this is a genuine empty response.
# Retry in case the model just had a bad generation.
# Track retries for empty-after-think responses
if not hasattr(self, '_empty_content_retries'):
self._empty_content_retries = 0
self._empty_content_retries += 1
# Show the reasoning/thinking content so the user can see
# what the model was thinking even though content is empty
reasoning_text = self._extract_reasoning(assistant_message)
print(f"{self.log_prefix}⚠️ Response only contains think block with no content after it")
if reasoning_text:

View File

@@ -172,11 +172,7 @@ class TestBuildSkillsSystemPrompt:
class TestBuildContextFilesPrompt:
def test_empty_dir_returns_empty(self, tmp_path):
from unittest.mock import patch
fake_home = tmp_path / "fake_home"
fake_home.mkdir()
with patch("pathlib.Path.home", return_value=fake_home):
result = build_context_files_prompt(cwd=str(tmp_path))
result = build_context_files_prompt(cwd=str(tmp_path))
assert result == ""
def test_loads_agents_md(self, tmp_path):

View File

@@ -12,21 +12,8 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
def _make_cli(**kwargs):
"""Create a HermesCLI instance with minimal mocking."""
import cli as _cli_mod
from cli import HermesCLI
_clean_config = {
"model": {
"default": "anthropic/claude-opus-4.6",
"base_url": "https://openrouter.ai/api/v1",
"provider": "auto",
},
"display": {"compact": False, "tool_progress": "all"},
"agent": {},
"terminal": {"env_type": "local"},
}
with patch("cli.get_tool_definitions", return_value=[]), \
patch.dict("os.environ", {"LLM_MODEL": ""}, clear=False), \
patch.dict(_cli_mod.__dict__, {"CLI_CONFIG": _clean_config}):
with patch("cli.get_tool_definitions", return_value=[]):
return HermesCLI(**kwargs)

View File

@@ -121,43 +121,6 @@ def test_openai_key_used_when_no_openrouter_key(monkeypatch):
assert resolved["api_key"] == "sk-openai-fallback"
def test_custom_endpoint_prefers_openai_key(monkeypatch):
"""Custom endpoint should use OPENAI_API_KEY, not OPENROUTER_API_KEY.
Regression test for #560: when base_url is a non-OpenRouter endpoint,
OPENROUTER_API_KEY was being sent as the auth header instead of OPENAI_API_KEY.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://api.z.ai/api/coding/paas/v4")
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-zai-correct-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-wrong-key-for-zai")
resolved = rp.resolve_runtime_provider(requested="custom")
assert resolved["base_url"] == "https://api.z.ai/api/coding/paas/v4"
assert resolved["api_key"] == "sk-zai-correct-key"
def test_custom_endpoint_auto_provider_prefers_openai_key(monkeypatch):
"""Auto provider with non-OpenRouter base_url should prefer OPENAI_API_KEY.
Same as #560 but via 'hermes model' flow which sets provider to 'auto'.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://my-vllm-server.example.com/v1")
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-vllm-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-should-not-leak")
resolved = rp.resolve_runtime_provider(requested="auto")
assert resolved["base_url"] == "https://my-vllm-server.example.com/v1"
assert resolved["api_key"] == "sk-vllm-key"
def test_resolve_requested_provider_precedence(monkeypatch):
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})

View File

@@ -17,62 +17,42 @@ from tools.skills_sync import (
class TestReadWriteManifest:
def test_read_missing_manifest(self, tmp_path):
with patch(
"tools.skills_sync.MANIFEST_FILE",
with patch.object(
__import__("tools.skills_sync", fromlist=["MANIFEST_FILE"]),
"MANIFEST_FILE",
tmp_path / "nonexistent",
):
result = _read_manifest()
assert result == {}
assert result == set()
def test_write_and_read_roundtrip_v2(self, tmp_path):
def test_write_and_read_roundtrip(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
entries = {"skill-a": "abc123", "skill-b": "def456", "skill-c": "789012"}
names = {"skill-a", "skill-b", "skill-c"}
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
_write_manifest(entries)
_write_manifest(names)
result = _read_manifest()
assert result == entries
assert result == names
def test_write_manifest_sorted(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
entries = {"zebra": "hash1", "alpha": "hash2", "middle": "hash3"}
names = {"zebra", "alpha", "middle"}
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
_write_manifest(entries)
_write_manifest(names)
lines = manifest_file.read_text().strip().splitlines()
names = [line.split(":")[0] for line in lines]
assert names == ["alpha", "middle", "zebra"]
def test_read_v1_manifest_migration(self, tmp_path):
"""v1 format (plain names, no hashes) should be read with empty hashes."""
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("skill-a\nskill-b\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"skill-a": "", "skill-b": ""}
assert lines == ["alpha", "middle", "zebra"]
def test_read_manifest_ignores_blank_lines(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("skill-a:hash1\n\n \nskill-b:hash2\n")
manifest_file.write_text("skill-a\n\n \nskill-b\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"skill-a": "hash1", "skill-b": "hash2"}
def test_read_manifest_mixed_v1_v2(self, tmp_path):
"""Manifest with both v1 and v2 lines (shouldn't happen but handle gracefully)."""
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("old-skill\nnew-skill:abc123\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"old-skill": "", "new-skill": "abc123"}
assert result == {"skill-a", "skill-b"}
class TestDirHash:
@@ -107,10 +87,13 @@ class TestDirHash:
class TestDiscoverBundledSkills:
def test_finds_skills_with_skill_md(self, tmp_path):
# Create two skills
(tmp_path / "category" / "skill-a").mkdir(parents=True)
(tmp_path / "category" / "skill-a" / "SKILL.md").write_text("# Skill A")
(tmp_path / "skill-b").mkdir()
(tmp_path / "skill-b" / "SKILL.md").write_text("# Skill B")
# A directory without SKILL.md — should NOT be found
(tmp_path / "not-a-skill").mkdir()
(tmp_path / "not-a-skill" / "README.md").write_text("Not a skill")
@@ -157,198 +140,105 @@ class TestSyncSkills:
(bundled / "old-skill" / "SKILL.md").write_text("# Old")
return bundled
def _patches(self, bundled, skills_dir, manifest_file):
"""Return context manager stack for patching sync globals."""
from contextlib import ExitStack
stack = ExitStack()
stack.enter_context(patch("tools.skills_sync._get_bundled_dir", return_value=bundled))
stack.enter_context(patch("tools.skills_sync.SKILLS_DIR", skills_dir))
stack.enter_context(patch("tools.skills_sync.MANIFEST_FILE", manifest_file))
return stack
def test_fresh_install_copies_all(self, tmp_path):
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
assert len(result["copied"]) == 2
assert result["total_bundled"] == 2
assert result["updated"] == []
assert result["user_modified"] == []
assert result["cleaned"] == []
assert (skills_dir / "category" / "new-skill" / "SKILL.md").exists()
assert (skills_dir / "old-skill" / "SKILL.md").exists()
# DESCRIPTION.md should also be copied
assert (skills_dir / "category" / "DESCRIPTION.md").exists()
def test_fresh_install_records_origin_hashes(self, tmp_path):
"""After fresh install, manifest should have v2 format with hashes."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with self._patches(bundled, skills_dir, manifest_file):
sync_skills(quiet=True)
manifest = _read_manifest()
assert "new-skill" in manifest
assert "old-skill" in manifest
# Hashes should be non-empty MD5 strings
assert len(manifest["new-skill"]) == 32
assert len(manifest["old-skill"]) == 32
def test_user_deleted_skill_not_re_added(self, tmp_path):
"""Skill in manifest but not on disk = user deleted it. Don't re-add."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
skills_dir.mkdir(parents=True)
# old-skill is in manifest (v2 format) but NOT on disk
old_hash = _dir_hash(bundled / "old-skill")
manifest_file.write_text(f"old-skill:{old_hash}\n")
# old-skill is in manifest but NOT on disk (user deleted it)
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
# new-skill should be copied, old-skill should be skipped
assert "new-skill" in result["copied"]
assert "old-skill" not in result["copied"]
assert "old-skill" not in result.get("updated", [])
assert not (skills_dir / "old-skill").exists()
def test_unmodified_skill_gets_updated(self, tmp_path):
"""Skill in manifest + on disk + user hasn't modified = update from bundled."""
def test_existing_skill_gets_updated(self, tmp_path):
"""Skill in manifest AND on disk with changed content = updated."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Simulate: user has old version that was synced from an older bundled
# Pre-create old-skill on disk with DIFFERENT content
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_origin_hash = _dir_hash(user_skill)
(user_skill / "SKILL.md").write_text("# Old version from last sync")
# Mark it in the manifest
manifest_file.write_text("old-skill\n")
# Record origin hash = hash of what was synced (the old version)
manifest_file.write_text(f"old-skill:{old_origin_hash}\n")
# Now bundled has a newer version ("# Old" != "# Old v1")
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
# Should be updated because user copy matches origin (unmodified)
# old-skill should be updated
assert "old-skill" in result["updated"]
assert (user_skill / "SKILL.md").read_text() == "# Old"
def test_user_modified_skill_not_overwritten(self, tmp_path):
"""Skill modified by user should NOT be overwritten even if bundled changed."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Simulate: user had the old version synced, then modified it
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_origin_hash = _dir_hash(user_skill)
# Record origin hash from what was originally synced
manifest_file.write_text(f"old-skill:{old_origin_hash}\n")
# User modifies their copy
(user_skill / "SKILL.md").write_text("# My custom version")
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should NOT update — user modified it
assert "old-skill" in result["user_modified"]
assert "old-skill" not in result.get("updated", [])
assert (user_skill / "SKILL.md").read_text() == "# My custom version"
def test_unchanged_skill_not_updated(self, tmp_path):
"""Skill in sync (user == bundled == origin) = no action needed."""
"""Skill in manifest AND on disk with same content = skipped."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Copy bundled to user dir (simulating perfect sync state)
# Pre-create old-skill on disk with SAME content
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old")
origin_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{origin_hash}\n")
# Mark it in the manifest
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
# Should be skipped, not updated
assert "old-skill" not in result.get("updated", [])
assert "old-skill" not in result.get("user_modified", [])
assert result["skipped"] >= 1
def test_v1_manifest_migration_sets_baseline(self, tmp_path):
"""v1 manifest entries (no hash) should set baseline from user's current copy."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create skill on disk
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old modified by user")
# v1 manifest (no hashes)
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should skip (migration baseline set), NOT update
assert "old-skill" not in result.get("updated", [])
assert "old-skill" not in result.get("user_modified", [])
# Now check manifest was upgraded to v2 with user's hash as baseline
manifest = _read_manifest()
assert len(manifest["old-skill"]) == 32 # MD5 hash
def test_v1_migration_then_bundled_update_detected(self, tmp_path):
"""After v1 migration, a subsequent sync should detect bundled updates."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# User has the SAME content as bundled (in sync)
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old")
# v1 manifest
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
# First sync: migration — sets baseline
sync_skills(quiet=True)
# Now change bundled content
(bundled / "old-skill" / "SKILL.md").write_text("# Old v2 — improved")
# Second sync: should detect bundled changed + user unmodified → update
result = sync_skills(quiet=True)
assert "old-skill" in result["updated"]
assert (user_skill / "SKILL.md").read_text() == "# Old v2 — improved"
def test_stale_manifest_entries_cleaned(self, tmp_path):
"""Skills in manifest that no longer exist in bundled dir get cleaned."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
skills_dir.mkdir(parents=True)
manifest_file.write_text("old-skill:abc123\nremoved-skill:def456\n")
# Add a stale entry that doesn't exist in bundled
manifest_file.write_text("old-skill\nremoved-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
assert "removed-skill" in result["cleaned"]
# Verify manifest no longer has removed-skill
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
manifest = _read_manifest()
assert "removed-skill" not in manifest
@@ -359,111 +249,20 @@ class TestSyncSkills:
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create the skill dir with user content (not in manifest)
user_skill = skills_dir / "category" / "new-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# User modified")
with self._patches(bundled, skills_dir, manifest_file):
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = sync_skills(quiet=True)
# Should not overwrite user's version
assert (user_skill / "SKILL.md").read_text() == "# User modified"
def test_nonexistent_bundled_dir(self, tmp_path):
with patch("tools.skills_sync._get_bundled_dir", return_value=tmp_path / "nope"):
result = sync_skills(quiet=True)
assert result == {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
}
def test_failed_copy_does_not_poison_manifest(self, tmp_path):
"""If copytree fails, the skill must NOT be added to the manifest.
Otherwise the next sync treats it as 'user deleted' and never retries.
"""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with self._patches(bundled, skills_dir, manifest_file):
# Patch copytree to fail for new-skill
original_copytree = __import__("shutil").copytree
def failing_copytree(src, dst, *a, **kw):
if "new-skill" in str(src):
raise OSError("Simulated disk full")
return original_copytree(src, dst, *a, **kw)
with patch("shutil.copytree", side_effect=failing_copytree):
result = sync_skills(quiet=True)
# new-skill should NOT be in copied (it failed)
assert "new-skill" not in result["copied"]
# Critical: new-skill must NOT be in the manifest
manifest = _read_manifest()
assert "new-skill" not in manifest, (
"Failed copy was recorded in manifest — next sync will "
"treat it as 'user deleted' and never retry"
)
# Now run sync again (copytree works this time) — it should retry
result2 = sync_skills(quiet=True)
assert "new-skill" in result2["copied"]
assert (skills_dir / "category" / "new-skill" / "SKILL.md").exists()
def test_failed_update_does_not_destroy_user_copy(self, tmp_path):
"""If copytree fails during update, the user's existing copy must survive."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Start with old synced version
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{old_hash}\n")
with self._patches(bundled, skills_dir, manifest_file):
# Patch copytree to fail (rmtree succeeds, copytree fails)
original_copytree = __import__("shutil").copytree
def failing_copytree(src, dst, *a, **kw):
if "old-skill" in str(src):
raise OSError("Simulated write failure")
return original_copytree(src, dst, *a, **kw)
with patch("shutil.copytree", side_effect=failing_copytree):
result = sync_skills(quiet=True)
# old-skill should NOT be in updated (it failed)
assert "old-skill" not in result.get("updated", [])
# The skill directory should still exist (rmtree destroyed it
# but copytree failed to replace it — this is data loss)
assert user_skill.exists(), (
"Update failure destroyed user's skill copy without replacing it"
)
def test_update_records_new_origin_hash(self, tmp_path):
"""After updating a skill, the manifest should record the new bundled hash."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Start with old synced version
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{old_hash}\n")
with self._patches(bundled, skills_dir, manifest_file):
sync_skills(quiet=True) # updates to "# Old"
manifest = _read_manifest()
# New origin hash should match the bundled version
new_bundled_hash = _dir_hash(bundled / "old-skill")
assert manifest["old-skill"] == new_bundled_hash
assert manifest["old-skill"] != old_hash
assert result == {"copied": [], "updated": [], "skipped": 0, "cleaned": [], "total_bundled": 0}

View File

@@ -592,55 +592,9 @@ def _load_config() -> dict:
# OpenAI Function-Calling Schema
# ---------------------------------------------------------------------------
# Per-tool documentation lines for the execute_code description.
# Ordered to match the canonical display order.
_TOOL_DOC_LINES = [
("web_search",
" web_search(query: str, limit: int = 5) -> dict\n"
" Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"),
("web_extract",
" web_extract(urls: list[str]) -> dict\n"
" Returns {\"results\": [{\"url\", \"content\", \"error\"}, ...]} where content is markdown"),
("read_file",
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"),
("write_file",
" write_file(path: str, content: str) -> dict\n"
" Always overwrites the entire file."),
("search_files",
" search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n"
" target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}"),
("patch",
" patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n"
" Replaces old_string with new_string in the file."),
("terminal",
" terminal(command: str, timeout=None, workdir=None) -> dict\n"
" Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}"),
]
def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict:
"""Build the execute_code schema with description listing only enabled tools.
When tools are disabled via ``hermes tools`` (e.g. web is turned off),
the schema description should NOT mention web_search / web_extract —
otherwise the model thinks they are available and keeps trying to use them.
"""
if enabled_sandbox_tools is None:
enabled_sandbox_tools = SANDBOX_ALLOWED_TOOLS
# Build tool documentation lines for only the enabled tools
tool_lines = "\n".join(
doc for name, doc in _TOOL_DOC_LINES if name in enabled_sandbox_tools
)
# Build example import list from enabled tools
import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools]
if not import_examples:
import_examples = sorted(enabled_sandbox_tools)[:2]
import_str = ", ".join(import_examples) + ", ..."
description = (
EXECUTE_CODE_SCHEMA = {
"name": "execute_code",
"description": (
"Run a Python script that can call Hermes tools programmatically. "
"Use this when you need 3+ tool calls with processing logic between them, "
"need to filter/reduce large tool outputs before they enter your context, "
@@ -649,8 +603,21 @@ def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict:
"Use normal tool calls instead when: single tool call with no processing, "
"you need to see the full result and apply complex reasoning, "
"or the task requires interactive user input.\n\n"
f"Available via `from hermes_tools import ...`:\n\n"
f"{tool_lines}\n\n"
"Available via `from hermes_tools import ...`:\n\n"
" web_search(query: str, limit: int = 5) -> dict\n"
" Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}\n"
" web_extract(urls: list[str]) -> dict\n"
" Returns {\"results\": [{\"url\", \"content\", \"error\"}, ...]} where content is markdown\n"
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}\n"
" write_file(path: str, content: str) -> dict\n"
" Always overwrites the entire file.\n"
" search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n"
" target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}\n"
" patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n"
" Replaces old_string with new_string in the file.\n"
" terminal(command: str, timeout=None, workdir=None) -> dict\n"
" Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}\n\n"
"Limits: 5-minute timeout, 50KB stdout cap, max 50 tool calls per script. "
"terminal() is foreground-only (no background or pty).\n\n"
"Print your final result to stdout. Use Python stdlib (json, re, math, csv, "
@@ -659,30 +626,22 @@ def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict:
" json_parse(text: str) — json.loads with strict=False; use for terminal() output with control chars\n"
" shell_quote(s: str) — shlex.quote(); use when interpolating dynamic strings into shell commands\n"
" retry(fn, max_attempts=3, delay=2) — retry with exponential backoff for transient failures"
)
return {
"name": "execute_code",
"description": description,
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Import tools with "
f"`from hermes_tools import {import_str}` "
"and print your final result to stdout."
),
},
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Import tools with "
"`from hermes_tools import web_search, terminal, ...` "
"and print your final result to stdout."
),
},
"required": ["code"],
},
}
# Default schema used at registration time (all sandbox tools listed)
EXECUTE_CODE_SCHEMA = build_execute_code_schema()
"required": ["code"],
},
}
# --- Registry ---

View File

@@ -174,14 +174,7 @@ def _run_single_child(
child_start = time.monotonic()
# When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents.
if toolsets:
child_toolsets = _strip_blocked_tools(toolsets)
elif parent_agent and getattr(parent_agent, "enabled_toolsets", None):
child_toolsets = _strip_blocked_tools(parent_agent.enabled_toolsets)
else:
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
child_toolsets = _strip_blocked_tools(toolsets or DEFAULT_TOOLSETS)
child_prompt = _build_child_system_prompt(goal, context)
@@ -500,7 +493,7 @@ DELEGATE_TASK_SCHEMA = {
"items": {"type": "string"},
"description": (
"Toolsets to enable for this subagent. "
"Default: inherits your enabled toolsets. "
"Default: ['terminal', 'file', 'web']. "
"Common patterns: ['terminal', 'file'] for code work, "
"['web'] for research, ['terminal', 'file', 'web'] for "
"full-stack tasks."

View File

@@ -3,22 +3,16 @@
Skills Sync -- Manifest-based seeding and updating of bundled skills.
Copies bundled skills from the repo's skills/ directory into ~/.hermes/skills/
and uses a manifest to track which skills have been synced and their origin hash.
and uses a manifest to track which skills have been offered.
Manifest format (v2): each line is "skill_name:origin_hash" where origin_hash
is the MD5 of the bundled skill at the time it was last synced to the user dir.
Old v1 manifests (plain names without hashes) are auto-migrated.
Update logic:
- NEW skills (not in manifest): copied to user dir, origin hash recorded.
- EXISTING skills (in manifest, present in user dir):
* If user copy matches origin hash: user hasn't modified it → safe to
update from bundled if bundled changed. New origin hash recorded.
* If user copy differs from origin hash: user customized it → SKIP.
- DELETED by user (in manifest, absent from user dir): respected, not re-added.
Behavior:
- NEW skills (not in manifest): copied to user dir, added to manifest.
- EXISTING skills (in manifest, present in user dir): UPDATED from bundled.
- DELETED by user (in manifest, absent from user dir): respected -- not re-added.
- REMOVED from bundled (in manifest, gone from repo): cleaned from manifest.
The manifest lives at ~/.hermes/skills/.bundled_manifest.
The manifest lives at ~/.hermes/skills/.bundled_manifest and is a simple
newline-delimited list of skill names that have been offered to the user.
"""
import hashlib
@@ -26,7 +20,7 @@ import logging
import os
import shutil
from pathlib import Path
from typing import Dict, List, Tuple
from typing import List, Tuple
logger = logging.getLogger(__name__)
@@ -41,38 +35,27 @@ def _get_bundled_dir() -> Path:
return Path(__file__).parent.parent / "skills"
def _read_manifest() -> Dict[str, str]:
"""
Read the manifest as a dict of {skill_name: origin_hash}.
Handles both v1 (plain names) and v2 (name:hash) formats.
v1 entries get an empty hash string which triggers migration on next sync.
"""
def _read_manifest() -> set:
"""Read the set of skill names already offered to the user."""
if not MANIFEST_FILE.exists():
return {}
return set()
try:
result = {}
for line in MANIFEST_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
if ":" in line:
# v2 format: name:hash
name, _, hash_val = line.partition(":")
result[name.strip()] = hash_val.strip()
else:
# v1 format: plain name — empty hash triggers migration
result[line] = ""
return result
return set(
line.strip()
for line in MANIFEST_FILE.read_text(encoding="utf-8").splitlines()
if line.strip()
)
except (OSError, IOError):
return {}
return set()
def _write_manifest(entries: Dict[str, str]):
"""Write the manifest file in v2 format (name:hash)."""
def _write_manifest(names: set):
"""Write the manifest file."""
MANIFEST_FILE.parent.mkdir(parents=True, exist_ok=True)
lines = [f"{name}:{hash_val}" for name, hash_val in sorted(entries.items())]
MANIFEST_FILE.write_text("\n".join(lines) + "\n", encoding="utf-8")
MANIFEST_FILE.write_text(
"\n".join(sorted(names)) + "\n",
encoding="utf-8",
)
def _discover_bundled_skills(bundled_dir: Path) -> List[Tuple[str, Path]]:
@@ -122,16 +105,18 @@ def sync_skills(quiet: bool = False) -> dict:
"""
Sync bundled skills into ~/.hermes/skills/ using the manifest.
- NEW skills (not in manifest): copied to user dir, added to manifest.
- EXISTING skills (in manifest, present in user dir): updated from bundled.
- DELETED by user (in manifest, absent from user dir): respected, not re-added.
- REMOVED from bundled (in manifest, gone from repo): cleaned from manifest.
Returns:
dict with keys: copied (list), updated (list), skipped (int),
user_modified (list), cleaned (list), total_bundled (int)
cleaned (list), total_bundled (int)
"""
bundled_dir = _get_bundled_dir()
if not bundled_dir.exists():
return {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
}
return {"copied": [], "updated": [], "skipped": 0, "cleaned": [], "total_bundled": 0}
SKILLS_DIR.mkdir(parents=True, exist_ok=True)
manifest = _read_manifest()
@@ -140,88 +125,52 @@ def sync_skills(quiet: bool = False) -> dict:
copied = []
updated = []
user_modified = []
skipped = 0
for skill_name, skill_src in bundled_skills:
dest = _compute_relative_dest(skill_src, bundled_dir)
bundled_hash = _dir_hash(skill_src)
if skill_name not in manifest:
# ── New skill never offered before ──
# New skill -- never offered before
try:
if dest.exists():
# User already has a skill with the same name — don't overwrite
# User already has a skill with the same name (unlikely but possible)
skipped += 1
manifest[skill_name] = bundled_hash
else:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(skill_src, dest)
copied.append(skill_name)
manifest[skill_name] = bundled_hash
if not quiet:
print(f" + {skill_name}")
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to copy {skill_name}: {e}")
# Do NOT add to manifest — next sync should retry
manifest.add(skill_name)
elif dest.exists():
# ── Existing skill in manifest AND on disk ──
origin_hash = manifest.get(skill_name, "")
user_hash = _dir_hash(dest)
if not origin_hash:
# v1 migration: no origin hash recorded. Set baseline from
# user's current copy so future syncs can detect modifications.
manifest[skill_name] = user_hash
if user_hash == bundled_hash:
skipped += 1 # already in sync
else:
# Can't tell if user modified or bundled changed — be safe
skipped += 1
continue
if user_hash != origin_hash:
# User modified this skill — don't overwrite their changes
user_modified.append(skill_name)
if not quiet:
print(f" ~ {skill_name} (user-modified, skipping)")
continue
# User copy matches origin — check if bundled has a newer version
if bundled_hash != origin_hash:
# Existing skill in manifest AND on disk -- check for updates
src_hash = _dir_hash(skill_src)
dst_hash = _dir_hash(dest)
if src_hash != dst_hash:
try:
# Move old copy to a backup so we can restore on failure
backup = dest.with_suffix(".bak")
shutil.move(str(dest), str(backup))
try:
shutil.copytree(skill_src, dest)
manifest[skill_name] = bundled_hash
updated.append(skill_name)
if not quiet:
print(f"{skill_name} (updated)")
# Remove backup after successful copy
shutil.rmtree(backup, ignore_errors=True)
except (OSError, IOError):
# Restore from backup
if backup.exists() and not dest.exists():
shutil.move(str(backup), str(dest))
raise
shutil.rmtree(dest)
shutil.copytree(skill_src, dest)
updated.append(skill_name)
if not quiet:
print(f"{skill_name} (updated)")
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to update {skill_name}: {e}")
else:
skipped += 1 # bundled unchanged, user unchanged
skipped += 1
else:
# ── In manifest but not on disk user deleted it ──
# In manifest but not on disk -- user deleted it, respect that
skipped += 1
# Clean stale manifest entries (skills removed from bundled dir)
cleaned = sorted(set(manifest.keys()) - bundled_names)
for name in cleaned:
del manifest[name]
cleaned = sorted(manifest - bundled_names)
manifest -= set(cleaned)
# Also copy DESCRIPTION.md files for categories (if not already present)
for desc_md in bundled_dir.rglob("DESCRIPTION.md"):
@@ -240,7 +189,6 @@ def sync_skills(quiet: bool = False) -> dict:
"copied": copied,
"updated": updated,
"skipped": skipped,
"user_modified": user_modified,
"cleaned": cleaned,
"total_bundled": len(bundled_skills),
}
@@ -249,13 +197,6 @@ def sync_skills(quiet: bool = False) -> dict:
if __name__ == "__main__":
print("Syncing bundled skills into ~/.hermes/skills/ ...")
result = sync_skills(quiet=False)
parts = [
f"{len(result['copied'])} new",
f"{len(result['updated'])} updated",
f"{result['skipped']} unchanged",
]
if result["user_modified"]:
parts.append(f"{len(result['user_modified'])} user-modified (kept)")
if result["cleaned"]:
parts.append(f"{len(result['cleaned'])} cleaned from manifest")
print(f"\nDone: {', '.join(parts)}. {result['total_bundled']} total bundled.")
print(f"\nDone: {len(result['copied'])} new, {len(result['updated'])} updated, "
f"{result['skipped']} unchanged, {len(result['cleaned'])} cleaned from manifest, "
f"{result['total_bundled']} total bundled.")