mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
1 Commits
opencode-p
...
feat/secre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c1ef64a0ac |
@@ -40,6 +40,8 @@ _PREFIX_PATTERNS = [
|
||||
r"sk_[A-Za-z0-9_]{10,}", # ElevenLabs TTS key (sk_ underscore, not sk- dash)
|
||||
r"tvly-[A-Za-z0-9]{10,}", # Tavily search API key
|
||||
r"exa_[A-Za-z0-9]{10,}", # Exa search API key
|
||||
r"AC[a-fA-F0-9]{32}", # Twilio Account SID
|
||||
r"SK[a-fA-F0-9]{32}", # Twilio API Key SID / Secret SID-like identifiers
|
||||
]
|
||||
|
||||
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
|
||||
@@ -68,6 +70,17 @@ _TELEGRAM_RE = re.compile(
|
||||
r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})",
|
||||
)
|
||||
|
||||
# JWTs: three base64url-ish segments separated by dots.
|
||||
# Keep threshold moderately high to avoid redacting short dotted identifiers.
|
||||
_JWT_RE = re.compile(
|
||||
r"\b([A-Za-z0-9_-]{12,}\.[A-Za-z0-9_-]{12,}\.[A-Za-z0-9_-]{12,})\b"
|
||||
)
|
||||
|
||||
# Twilio auth tokens are commonly plain 32-char lowercase hex strings.
|
||||
# This may also match some MD5-like identifiers, but we prefer false positives
|
||||
# over leaking a credential into model context.
|
||||
_TWILIO_AUTH_TOKEN_RE = re.compile(r"\b([a-f0-9]{32})\b")
|
||||
|
||||
# Private key blocks: -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----
|
||||
_PRIVATE_KEY_RE = re.compile(
|
||||
r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----"
|
||||
@@ -140,6 +153,12 @@ def redact_sensitive_text(text: str) -> str:
|
||||
return f"{prefix}{digits}:***"
|
||||
text = _TELEGRAM_RE.sub(_redact_telegram, text)
|
||||
|
||||
# JWTs
|
||||
text = _JWT_RE.sub(lambda m: _mask_token(m.group(1)), text)
|
||||
|
||||
# Twilio auth tokens / bare 32-char lowercase hex tokens
|
||||
text = _TWILIO_AUTH_TOKEN_RE.sub(lambda m: _mask_token(m.group(1)), text)
|
||||
|
||||
# Private key blocks
|
||||
text = _PRIVATE_KEY_RE.sub("[REDACTED PRIVATE KEY]", text)
|
||||
|
||||
|
||||
4
cli.py
4
cli.py
@@ -492,6 +492,7 @@ from cron import get_job
|
||||
from tools.terminal_tool import cleanup_all_environments as _cleanup_all_terminals
|
||||
from tools.terminal_tool import set_sudo_password_callback, set_approval_callback
|
||||
from tools.skills_tool import set_secret_capture_callback
|
||||
from tools.secrets_tool import set_secrets_request_callback
|
||||
from hermes_cli.callbacks import prompt_for_secret
|
||||
from tools.browser_tool import _emergency_cleanup_all_sessions as _cleanup_all_browsers
|
||||
|
||||
@@ -5587,6 +5588,7 @@ class HermesCLI:
|
||||
# Single-query and direct chat callers do not go through run(), so
|
||||
# register secure secret capture here as well.
|
||||
set_secret_capture_callback(self._secret_capture_callback)
|
||||
set_secrets_request_callback(self._secret_capture_callback)
|
||||
|
||||
# Refresh provider credentials if needed (handles key rotation transparently)
|
||||
if not self._ensure_runtime_credentials():
|
||||
@@ -6292,6 +6294,7 @@ class HermesCLI:
|
||||
set_sudo_password_callback(self._sudo_password_callback)
|
||||
set_approval_callback(self._approval_callback)
|
||||
set_secret_capture_callback(self._secret_capture_callback)
|
||||
set_secrets_request_callback(self._secret_capture_callback)
|
||||
|
||||
# Ensure tirith security scanner is available (downloads if needed).
|
||||
# Warn the user if tirith is enabled in config but not available,
|
||||
@@ -7495,6 +7498,7 @@ class HermesCLI:
|
||||
set_sudo_password_callback(None)
|
||||
set_approval_callback(None)
|
||||
set_secret_capture_callback(None)
|
||||
set_secrets_request_callback(None)
|
||||
# Flush + shut down Honcho async writer (drains queue before exit)
|
||||
if self.agent and getattr(self.agent, '_honcho', None):
|
||||
try:
|
||||
|
||||
@@ -150,6 +150,7 @@ def _discover_tools():
|
||||
"tools.tts_tool",
|
||||
"tools.todo_tool",
|
||||
"tools.memory_tool",
|
||||
"tools.secrets_tool",
|
||||
"tools.session_search_tool",
|
||||
"tools.clarify_tool",
|
||||
"tools.code_execution_tool",
|
||||
|
||||
@@ -52,6 +52,21 @@ class TestKnownPrefixes:
|
||||
result = redact_sensitive_text("fal_abc123def456ghi789jkl")
|
||||
assert "abc123def456" not in result
|
||||
|
||||
def test_twilio_account_sid(self):
|
||||
sid = "AC" + ("1" * 16) + ("a" * 16)
|
||||
result = redact_sensitive_text(sid)
|
||||
assert sid not in result
|
||||
|
||||
def test_twilio_auth_token_bare(self):
|
||||
token = ("0" * 16) + ("a" * 16)
|
||||
result = redact_sensitive_text(token)
|
||||
assert token not in result
|
||||
|
||||
def test_jwt_bare(self):
|
||||
jwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhIjoiYiIsImMiOiJkIiwicm9sZSI6ImFkbWluIn0.c2lnbmF0dXJlMTIzNDU2Nzg5MGFiY2RlZg"
|
||||
result = redact_sensitive_text(jwt)
|
||||
assert "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" not in result
|
||||
|
||||
def test_short_token_fully_masked(self):
|
||||
result = redact_sensitive_text("key=sk-short1234567")
|
||||
assert "***" in result
|
||||
|
||||
@@ -207,6 +207,46 @@ Generate some audio.
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0] == "TENOR_API_KEY"
|
||||
|
||||
def test_requires_secrets_alias_triggers_secure_capture(self, tmp_path, monkeypatch):
|
||||
monkeypatch.delenv("TENOR_API_KEY", raising=False)
|
||||
calls = []
|
||||
|
||||
def fake_secret_callback(var_name, prompt, metadata=None):
|
||||
calls.append((var_name, prompt, metadata))
|
||||
os.environ[var_name] = "stored-in-test"
|
||||
return {
|
||||
"success": True,
|
||||
"stored_as": var_name,
|
||||
"validated": False,
|
||||
"skipped": False,
|
||||
}
|
||||
|
||||
monkeypatch.setattr(
|
||||
skills_tool_module,
|
||||
"_secret_capture_callback",
|
||||
fake_secret_callback,
|
||||
raising=False,
|
||||
)
|
||||
|
||||
with patch("tools.skills_tool.SKILLS_DIR", tmp_path):
|
||||
_make_skill(
|
||||
tmp_path,
|
||||
"test-skill",
|
||||
frontmatter_extra=(
|
||||
"requires_secrets:\n"
|
||||
" - key: TENOR_API_KEY\n"
|
||||
" description: Tenor API key\n"
|
||||
" instructions: Find it in the Tenor dashboard\n"
|
||||
),
|
||||
)
|
||||
scan_skill_commands()
|
||||
msg = build_skill_invocation_message("/test-skill", "do stuff")
|
||||
|
||||
assert msg is not None
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0] == "TENOR_API_KEY"
|
||||
assert "tenor" in (calls[0][2].get("required_for", "") or "").lower()
|
||||
|
||||
def test_gateway_still_loads_skill_but_returns_setup_guidance(
|
||||
self, tmp_path, monkeypatch
|
||||
):
|
||||
|
||||
@@ -88,6 +88,7 @@ class TestBackwardCompat:
|
||||
# Should contain well-known tools
|
||||
assert "web_search" in names
|
||||
assert "terminal" in names
|
||||
assert "secrets" in names
|
||||
|
||||
def test_get_toolset_for_tool(self):
|
||||
result = get_toolset_for_tool("web_search")
|
||||
|
||||
@@ -805,5 +805,14 @@ for i in range(15000):
|
||||
self.assertIn("total", output)
|
||||
|
||||
|
||||
def test_execute_code_redacts_sensitive_output(monkeypatch):
|
||||
from tools.code_execution_tool import execute_code
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret-1234567890")
|
||||
result = json.loads(execute_code("import os; print(os.getenv('OPENAI_API_KEY'))", task_id="test-redact"))
|
||||
assert result["status"] == "success"
|
||||
assert "sk-test-secret-1234567890" not in result["output"]
|
||||
assert "***" in result["output"] or "..." in result["output"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
97
tests/tools/test_secrets_tool.py
Normal file
97
tests/tools/test_secrets_tool.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import json
|
||||
import os
|
||||
|
||||
from tools.env_passthrough import clear_env_passthrough, get_all_passthrough
|
||||
from tools.secrets_tool import secrets_tool, set_secrets_request_callback
|
||||
|
||||
|
||||
def setup_function(_fn):
|
||||
clear_env_passthrough()
|
||||
set_secrets_request_callback(None)
|
||||
|
||||
|
||||
def test_list_returns_names_only(monkeypatch):
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
monkeypatch.setenv("PATH", "/usr/bin")
|
||||
result = json.loads(secrets_tool({"action": "list"}))
|
||||
assert "OPENAI_API_KEY" in result["secrets"]
|
||||
assert "PATH" not in result["secrets"]
|
||||
assert "sk-test-secret" not in json.dumps(result)
|
||||
|
||||
|
||||
def test_check_splits_configured_and_missing(monkeypatch):
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
result = json.loads(secrets_tool({"action": "check", "keys": ["OPENAI_API_KEY", "MISSING_API_KEY"]}))
|
||||
assert result["configured"] == ["OPENAI_API_KEY"]
|
||||
assert result["missing"] == ["MISSING_API_KEY"]
|
||||
assert result["rejected"] == []
|
||||
|
||||
|
||||
def test_check_rejects_non_secret_like_vars(monkeypatch):
|
||||
monkeypatch.setenv("PATH", "/usr/bin")
|
||||
monkeypatch.setenv("HOME", "/tmp")
|
||||
monkeypatch.setenv("SSH_AUTH_SOCK", "/tmp/agent.sock")
|
||||
monkeypatch.setenv("SESSION_COOKIE_NAME", "sid")
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
result = json.loads(secrets_tool({"action": "check", "keys": ["PATH", "HOME", "SSH_AUTH_SOCK", "SESSION_COOKIE_NAME", "OPENAI_API_KEY"]}))
|
||||
assert result["configured"] == ["OPENAI_API_KEY"]
|
||||
assert sorted(result["rejected"]) == ["HOME", "PATH", "SESSION_COOKIE_NAME", "SSH_AUTH_SOCK"]
|
||||
|
||||
|
||||
def test_request_uses_secure_callback():
|
||||
calls = []
|
||||
|
||||
def fake_callback(var_name, prompt, metadata=None):
|
||||
calls.append((var_name, prompt, metadata))
|
||||
return {"success": True, "skipped": False, "message": "stored"}
|
||||
|
||||
set_secrets_request_callback(fake_callback)
|
||||
result = json.loads(secrets_tool({
|
||||
"action": "request",
|
||||
"key": "TENOR_API_KEY",
|
||||
"description": "Tenor API key",
|
||||
"instructions": "Find it in Tenor dashboard",
|
||||
"prompt": "Enter Tenor API key",
|
||||
}))
|
||||
assert result["success"] is True
|
||||
assert result["stored"] is True
|
||||
assert calls[0][0] == "TENOR_API_KEY"
|
||||
assert calls[0][1] == "Enter Tenor API key"
|
||||
assert calls[0][2]["description"] == "Tenor API key"
|
||||
assert calls[0][2]["instructions"] == "Find it in Tenor dashboard"
|
||||
|
||||
|
||||
def test_request_without_callback_returns_hint():
|
||||
result = json.loads(secrets_tool({"action": "request", "key": "TENOR_API_KEY"}))
|
||||
assert result["success"] is False
|
||||
assert "local cli" in result["hint"].lower()
|
||||
|
||||
|
||||
def test_delete_clears_env(monkeypatch):
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
result = json.loads(secrets_tool({"action": "delete", "key": "OPENAI_API_KEY"}))
|
||||
assert result["success"] is True
|
||||
assert "OPENAI_API_KEY" not in os.environ
|
||||
|
||||
|
||||
def test_delete_removes_key_from_env_file(tmp_path, monkeypatch):
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
env_path = hermes_home / ".env"
|
||||
env_path.write_text("OPENAI_API_KEY=sk-test-secret\nKEEP_ME=value\n", encoding="utf-8")
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
|
||||
result = json.loads(secrets_tool({"action": "delete", "key": "OPENAI_API_KEY"}))
|
||||
assert result["success"] is True
|
||||
content = env_path.read_text(encoding="utf-8")
|
||||
assert "OPENAI_API_KEY=" not in content
|
||||
assert "KEEP_ME=value" in content
|
||||
|
||||
|
||||
def test_inject_registers_existing_keys(monkeypatch):
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "sk-test-secret")
|
||||
result = json.loads(secrets_tool({"action": "inject", "keys": ["OPENAI_API_KEY", "MISSING_API_KEY"]}))
|
||||
assert result["injected"] == ["OPENAI_API_KEY"]
|
||||
assert result["missing"] == ["MISSING_API_KEY"]
|
||||
assert "OPENAI_API_KEY" in get_all_passthrough()
|
||||
@@ -597,6 +597,13 @@ def execute_code(
|
||||
stderr_text = strip_ansi(stderr_text)
|
||||
|
||||
# Build response
|
||||
try:
|
||||
from agent.redact import redact_sensitive_text
|
||||
stdout_text = redact_sensitive_text(stdout_text)
|
||||
stderr_text = redact_sensitive_text(stderr_text)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
result: Dict[str, Any] = {
|
||||
"status": status,
|
||||
"output": stdout_text,
|
||||
|
||||
@@ -363,6 +363,10 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
|
||||
|
||||
result_dict = result.to_dict()
|
||||
result_json = json.dumps(result_dict, ensure_ascii=False)
|
||||
try:
|
||||
result_json = redact_sensitive_text(result_json)
|
||||
except Exception:
|
||||
pass
|
||||
# Hint when old_string not found — saves iterations where the agent
|
||||
# retries with stale content instead of re-reading the file.
|
||||
if result_dict.get("error") and "Could not find" in str(result_dict["error"]):
|
||||
|
||||
322
tools/secrets_tool.py
Normal file
322
tools/secrets_tool.py
Normal file
@@ -0,0 +1,322 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Secrets tool — secure secret lifecycle management.
|
||||
|
||||
Phase 1 implementation for issue #410. Provides an agent-facing interface for:
|
||||
- listing configured secret names (never values)
|
||||
- checking which keys are configured vs missing
|
||||
- requesting secure input (CLI only for now)
|
||||
- deleting secrets from ~/.hermes/.env
|
||||
- registering env_passthrough for the next sandboxed subprocess
|
||||
|
||||
Important: secret values never enter the LLM context. The `request` action
|
||||
handles capture internally via the existing secure secret callback path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from tools.env_passthrough import get_all_passthrough, register_env_passthrough
|
||||
from tools.registry import registry
|
||||
from tools.skills_tool import SKILLS_DIR, _parse_frontmatter, _get_required_environment_variables
|
||||
from hermes_cli.config import OPTIONAL_ENV_VARS, get_env_value, load_env, save_env_value, get_env_path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_ENV_VAR_NAME_RE = re.compile(r"^[A-Z][A-Z0-9_]*$")
|
||||
_SECRET_CAPTURE_CALLBACK = None
|
||||
_SECRET_SUFFIXES = (
|
||||
"_API_KEY",
|
||||
"_TOKEN",
|
||||
"_SECRET",
|
||||
"_PASSWORD",
|
||||
"_PASSWD",
|
||||
"_PRIVATE_KEY",
|
||||
"_ACCESS_KEY",
|
||||
"_AUTH_TOKEN",
|
||||
"_REFRESH_TOKEN",
|
||||
"_CLIENT_SECRET",
|
||||
"_BOT_TOKEN",
|
||||
"_APP_TOKEN",
|
||||
)
|
||||
_SECRET_EXACT_NAMES = {
|
||||
"TWILIO_ACCOUNT_SID",
|
||||
"TWILIO_AUTH_TOKEN",
|
||||
"TWILIO_PHONE_NUMBER",
|
||||
}
|
||||
_KNOWN_PASSWORD_VARS = {
|
||||
name for name, info in OPTIONAL_ENV_VARS.items() if isinstance(info, dict) and info.get("password")
|
||||
}
|
||||
|
||||
|
||||
def _is_secret_like_name(name: str) -> bool:
|
||||
name = _normalize_key_name(name)
|
||||
if not name:
|
||||
return False
|
||||
if name in _SECRET_EXACT_NAMES:
|
||||
return True
|
||||
if name in _KNOWN_PASSWORD_VARS:
|
||||
return True
|
||||
return name.endswith(_SECRET_SUFFIXES)
|
||||
|
||||
|
||||
def set_secrets_request_callback(callback) -> None:
|
||||
global _SECRET_CAPTURE_CALLBACK
|
||||
_SECRET_CAPTURE_CALLBACK = callback
|
||||
|
||||
|
||||
def _normalize_key_name(key: str) -> str:
|
||||
key = str(key or "").strip().upper()
|
||||
if not _ENV_VAR_NAME_RE.match(key):
|
||||
return ""
|
||||
return key
|
||||
|
||||
|
||||
def _configured_secret_names() -> List[str]:
|
||||
env_snapshot = load_env()
|
||||
names = []
|
||||
for name, value in env_snapshot.items():
|
||||
if value and _is_secret_like_name(name):
|
||||
names.append(name)
|
||||
for name, value in os.environ.items():
|
||||
if value and _is_secret_like_name(name) and name not in names:
|
||||
names.append(name)
|
||||
return sorted(names)
|
||||
|
||||
|
||||
def _delete_env_key(key: str) -> None:
|
||||
env_path = get_env_path()
|
||||
if env_path.exists():
|
||||
try:
|
||||
lines = env_path.read_text(encoding="utf-8", errors="replace").splitlines(True)
|
||||
kept = [line for line in lines if not line.strip().startswith(f"{key}=")]
|
||||
env_path.write_text("".join(kept), encoding="utf-8")
|
||||
except Exception:
|
||||
# Fall back to blanking the key if removal fails for any reason.
|
||||
save_env_value(key, "")
|
||||
os.environ.pop(key, None)
|
||||
return
|
||||
os.environ.pop(key, None)
|
||||
|
||||
|
||||
|
||||
def _required_secrets_for_skills() -> Dict[str, List[str]]:
|
||||
result: Dict[str, List[str]] = {}
|
||||
configured = set(_configured_secret_names())
|
||||
|
||||
if not SKILLS_DIR.exists():
|
||||
return result
|
||||
|
||||
for skill_md in SKILLS_DIR.rglob("SKILL.md"):
|
||||
try:
|
||||
content = skill_md.read_text(encoding="utf-8")
|
||||
frontmatter, _body = _parse_frontmatter(content)
|
||||
required_entries = _get_required_environment_variables(frontmatter)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
missing = sorted(
|
||||
{
|
||||
_normalize_key_name(entry.get("name"))
|
||||
for entry in required_entries
|
||||
if _normalize_key_name(entry.get("name")) not in configured
|
||||
}
|
||||
)
|
||||
missing = [name for name in missing if name]
|
||||
if missing:
|
||||
result[skill_md.parent.name] = missing
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _action_list(_args: Dict[str, Any], **_kwargs) -> str:
|
||||
return json.dumps(
|
||||
{
|
||||
"secrets": _configured_secret_names(),
|
||||
"missing_for_skills": _required_secrets_for_skills(),
|
||||
"passthrough_registered": sorted(get_all_passthrough()),
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
|
||||
def _action_check(args: Dict[str, Any], **_kwargs) -> str:
|
||||
keys = args.get("keys") or []
|
||||
if not isinstance(keys, list):
|
||||
return json.dumps({"error": "keys must be a list"})
|
||||
|
||||
configured, missing, rejected = [], [], []
|
||||
for item in keys:
|
||||
name = _normalize_key_name(item)
|
||||
if not name:
|
||||
continue
|
||||
if not _is_secret_like_name(name):
|
||||
rejected.append(name)
|
||||
continue
|
||||
if get_env_value(name):
|
||||
configured.append(name)
|
||||
else:
|
||||
missing.append(name)
|
||||
|
||||
return json.dumps({"configured": configured, "missing": missing, "rejected": rejected}, ensure_ascii=False)
|
||||
|
||||
|
||||
def _action_request(args: Dict[str, Any], **_kwargs) -> str:
|
||||
key = _normalize_key_name(args.get("key"))
|
||||
if not key:
|
||||
return json.dumps({"error": "A valid key is required"})
|
||||
if not _is_secret_like_name(key):
|
||||
return json.dumps({"error": f"{key} is not a supported secret-like variable name"})
|
||||
|
||||
description = str(args.get("description") or "").strip()
|
||||
instructions = str(args.get("instructions") or "").strip()
|
||||
prompt = str(args.get("prompt") or f"Enter value for {key}").strip()
|
||||
|
||||
if _SECRET_CAPTURE_CALLBACK is None:
|
||||
hint = "Use the local CLI to be prompted securely, or set it manually in ~/.hermes/.env."
|
||||
try:
|
||||
if os.getenv("HERMES_GATEWAY_SESSION") or os.getenv("HERMES_SESSION_PLATFORM"):
|
||||
from gateway.platforms.base import GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
|
||||
hint = GATEWAY_SECRET_CAPTURE_UNSUPPORTED_MESSAGE
|
||||
except Exception:
|
||||
pass
|
||||
return json.dumps(
|
||||
{
|
||||
"success": False,
|
||||
"key": key,
|
||||
"error": "Secure secret entry is not available in this surface.",
|
||||
"hint": hint,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
metadata = {}
|
||||
if description:
|
||||
metadata["description"] = description
|
||||
if instructions:
|
||||
metadata["instructions"] = instructions
|
||||
|
||||
try:
|
||||
result = _SECRET_CAPTURE_CALLBACK(key, prompt, metadata)
|
||||
except Exception as e:
|
||||
logger.warning("Secret capture callback failed for %s", key, exc_info=True)
|
||||
return json.dumps({"success": False, "key": key, "error": str(e)}, ensure_ascii=False)
|
||||
|
||||
payload = {
|
||||
"success": bool(isinstance(result, dict) and result.get("success")),
|
||||
"key": key,
|
||||
"stored": bool(isinstance(result, dict) and result.get("success") and not result.get("skipped")),
|
||||
"skipped": bool(isinstance(result, dict) and result.get("skipped")),
|
||||
"message": (result or {}).get("message") if isinstance(result, dict) else None,
|
||||
}
|
||||
return json.dumps(payload, ensure_ascii=False)
|
||||
|
||||
|
||||
def _action_delete(args: Dict[str, Any], **_kwargs) -> str:
|
||||
key = _normalize_key_name(args.get("key"))
|
||||
if not key:
|
||||
return json.dumps({"error": "A valid key is required"})
|
||||
if not _is_secret_like_name(key):
|
||||
return json.dumps({"error": f"{key} is not a supported secret-like variable name"})
|
||||
|
||||
_delete_env_key(key)
|
||||
return json.dumps({"success": True, "deleted": key}, ensure_ascii=False)
|
||||
|
||||
|
||||
def _action_inject(args: Dict[str, Any], **_kwargs) -> str:
|
||||
keys = args.get("keys") or []
|
||||
if not isinstance(keys, list):
|
||||
return json.dumps({"error": "keys must be a list"})
|
||||
|
||||
to_register = []
|
||||
missing = []
|
||||
rejected = []
|
||||
for item in keys:
|
||||
name = _normalize_key_name(item)
|
||||
if not name:
|
||||
continue
|
||||
if not _is_secret_like_name(name):
|
||||
rejected.append(name)
|
||||
continue
|
||||
if get_env_value(name):
|
||||
to_register.append(name)
|
||||
else:
|
||||
missing.append(name)
|
||||
|
||||
if to_register:
|
||||
register_env_passthrough(to_register)
|
||||
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"injected": sorted(set(to_register)),
|
||||
"missing": sorted(set(missing)),
|
||||
"rejected": sorted(set(rejected)),
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
|
||||
|
||||
def secrets_tool(args: Dict[str, Any], **kwargs) -> str:
|
||||
action = str(args.get("action") or "").strip().lower()
|
||||
if action == "list":
|
||||
return _action_list(args, **kwargs)
|
||||
if action == "check":
|
||||
return _action_check(args, **kwargs)
|
||||
if action == "request":
|
||||
return _action_request(args, **kwargs)
|
||||
if action == "delete":
|
||||
return _action_delete(args, **kwargs)
|
||||
if action == "inject":
|
||||
return _action_inject(args, **kwargs)
|
||||
return json.dumps({"error": f"Unknown action: {action}"})
|
||||
|
||||
|
||||
registry.register(
|
||||
name="secrets",
|
||||
toolset="secrets",
|
||||
description="Securely manage API keys and other credentials without exposing values to the model. List configured secret names, check which keys are missing, request secure input in CLI, delete secrets, or register secrets for env passthrough.",
|
||||
emoji="🔐",
|
||||
schema={
|
||||
"name": "secrets",
|
||||
"description": "Secure secret lifecycle management. Secret values are never returned to the model.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["list", "check", "request", "delete", "inject"],
|
||||
"description": "The secrets action to perform.",
|
||||
},
|
||||
"keys": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "Secret names to check or inject.",
|
||||
},
|
||||
"key": {
|
||||
"type": "string",
|
||||
"description": "Single secret name for request/delete.",
|
||||
},
|
||||
"description": {
|
||||
"type": "string",
|
||||
"description": "Human-readable description shown during secure prompt.",
|
||||
},
|
||||
"instructions": {
|
||||
"type": "string",
|
||||
"description": "Optional user instructions for where to find the secret.",
|
||||
},
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "Custom secure prompt text.",
|
||||
},
|
||||
},
|
||||
"required": ["action"],
|
||||
},
|
||||
},
|
||||
handler=secrets_tool,
|
||||
)
|
||||
@@ -198,11 +198,34 @@ def _get_required_environment_variables(
|
||||
) -> List[Dict[str, Any]]:
|
||||
setup = _normalize_setup_metadata(frontmatter)
|
||||
required_raw = frontmatter.get("required_environment_variables")
|
||||
requires_secrets_raw = frontmatter.get("requires_secrets")
|
||||
|
||||
if isinstance(required_raw, dict):
|
||||
required_raw = [required_raw]
|
||||
if not isinstance(required_raw, list):
|
||||
required_raw = []
|
||||
|
||||
if isinstance(requires_secrets_raw, dict):
|
||||
requires_secrets_raw = [requires_secrets_raw]
|
||||
if not isinstance(requires_secrets_raw, list):
|
||||
requires_secrets_raw = []
|
||||
|
||||
# `requires_secrets` is a friendlier alias for skill authors. Normalize it
|
||||
# into the existing required_environment_variables pipeline so prompt,
|
||||
# validation, env passthrough, and gateway hints all work unchanged.
|
||||
for item in requires_secrets_raw:
|
||||
if isinstance(item, str):
|
||||
required_raw.append({"name": item})
|
||||
elif isinstance(item, dict):
|
||||
required_raw.append(
|
||||
{
|
||||
"name": item.get("key") or item.get("name") or item.get("env_var"),
|
||||
"prompt": item.get("prompt") or item.get("description") or item.get("label"),
|
||||
"help": item.get("instructions") or item.get("help") or item.get("provider_url") or item.get("url"),
|
||||
"required_for": item.get("required_for") or item.get("description"),
|
||||
}
|
||||
)
|
||||
|
||||
required: List[Dict[str, Any]] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
|
||||
@@ -48,8 +48,8 @@ _HERMES_CORE_TOOLS = [
|
||||
"browser_vision", "browser_console",
|
||||
# Text-to-speech
|
||||
"text_to_speech",
|
||||
# Planning & memory
|
||||
"todo", "memory",
|
||||
# Planning, memory, secrets
|
||||
"todo", "memory", "secrets",
|
||||
# Session history search
|
||||
"session_search",
|
||||
# Clarifying questions
|
||||
|
||||
Reference in New Issue
Block a user