mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
Compare commits
6 Commits
skill/gith
...
security/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5bcdf54962 | ||
|
|
8e5f06ceac | ||
|
|
f1db999735 | ||
|
|
2fafcf42e8 | ||
|
|
0475e5e9f2 | ||
|
|
ee3db462cf |
@@ -687,6 +687,15 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
|
|||||||
if pconfig.auth_type != "api_key":
|
if pconfig.auth_type != "api_key":
|
||||||
continue
|
continue
|
||||||
if provider_id == "anthropic":
|
if provider_id == "anthropic":
|
||||||
|
# Only try anthropic when the user has explicitly configured it.
|
||||||
|
# Without this gate, Claude Code credentials get silently used
|
||||||
|
# as auxiliary fallback when the user's primary provider fails.
|
||||||
|
try:
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
if not is_provider_explicitly_configured("anthropic"):
|
||||||
|
continue
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
return _try_anthropic()
|
return _try_anthropic()
|
||||||
|
|
||||||
pool_present, entry = _select_pool_entry(provider_id)
|
pool_present, entry = _select_pool_entry(provider_id)
|
||||||
|
|||||||
@@ -1059,6 +1059,17 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
|
|||||||
auth_store = _load_auth_store()
|
auth_store = _load_auth_store()
|
||||||
|
|
||||||
if provider == "anthropic":
|
if provider == "anthropic":
|
||||||
|
# Only auto-discover external credentials (Claude Code, Hermes PKCE)
|
||||||
|
# when the user has explicitly configured anthropic as their provider.
|
||||||
|
# Without this gate, auxiliary client fallback chains silently read
|
||||||
|
# ~/.claude/.credentials.json without user consent. See PR #4210.
|
||||||
|
try:
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
if not is_provider_explicitly_configured("anthropic"):
|
||||||
|
return changed, active_sources
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
|
|
||||||
from agent.anthropic_adapter import read_claude_code_credentials, read_hermes_oauth_credentials
|
from agent.anthropic_adapter import read_claude_code_credentials, read_hermes_oauth_credentials
|
||||||
|
|
||||||
for source_name, creds in (
|
for source_name, creds in (
|
||||||
@@ -1066,6 +1077,13 @@ def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> Tup
|
|||||||
("claude_code", read_claude_code_credentials()),
|
("claude_code", read_claude_code_credentials()),
|
||||||
):
|
):
|
||||||
if creds and creds.get("accessToken"):
|
if creds and creds.get("accessToken"):
|
||||||
|
# Check if user explicitly removed this source
|
||||||
|
try:
|
||||||
|
from hermes_cli.auth import is_source_suppressed
|
||||||
|
if is_source_suppressed(provider, source_name):
|
||||||
|
continue
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
active_sources.add(source_name)
|
active_sources.add(source_name)
|
||||||
changed |= _upsert_entry(
|
changed |= _upsert_entry(
|
||||||
entries,
|
entries,
|
||||||
|
|||||||
@@ -704,6 +704,27 @@ def write_credential_pool(provider_id: str, entries: List[Dict[str, Any]]) -> Pa
|
|||||||
return _save_auth_store(auth_store)
|
return _save_auth_store(auth_store)
|
||||||
|
|
||||||
|
|
||||||
|
def suppress_credential_source(provider_id: str, source: str) -> None:
|
||||||
|
"""Mark a credential source as suppressed so it won't be re-seeded."""
|
||||||
|
with _auth_store_lock():
|
||||||
|
auth_store = _load_auth_store()
|
||||||
|
suppressed = auth_store.setdefault("suppressed_sources", {})
|
||||||
|
provider_list = suppressed.setdefault(provider_id, [])
|
||||||
|
if source not in provider_list:
|
||||||
|
provider_list.append(source)
|
||||||
|
_save_auth_store(auth_store)
|
||||||
|
|
||||||
|
|
||||||
|
def is_source_suppressed(provider_id: str, source: str) -> bool:
|
||||||
|
"""Check if a credential source has been suppressed by the user."""
|
||||||
|
try:
|
||||||
|
auth_store = _load_auth_store()
|
||||||
|
suppressed = auth_store.get("suppressed_sources", {})
|
||||||
|
return source in suppressed.get(provider_id, [])
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_provider_auth_state(provider_id: str) -> Optional[Dict[str, Any]]:
|
def get_provider_auth_state(provider_id: str) -> Optional[Dict[str, Any]]:
|
||||||
"""Return persisted auth state for a provider, or None."""
|
"""Return persisted auth state for a provider, or None."""
|
||||||
auth_store = _load_auth_store()
|
auth_store = _load_auth_store()
|
||||||
@@ -716,6 +737,57 @@ def get_active_provider() -> Optional[str]:
|
|||||||
return auth_store.get("active_provider")
|
return auth_store.get("active_provider")
|
||||||
|
|
||||||
|
|
||||||
|
def is_provider_explicitly_configured(provider_id: str) -> bool:
|
||||||
|
"""Return True only if the user has explicitly configured this provider.
|
||||||
|
|
||||||
|
Checks:
|
||||||
|
1. active_provider in auth.json matches
|
||||||
|
2. model.provider in config.yaml matches
|
||||||
|
3. Provider-specific env vars are set (e.g. ANTHROPIC_API_KEY)
|
||||||
|
|
||||||
|
This is used to gate auto-discovery of external credentials (e.g.
|
||||||
|
Claude Code's ~/.claude/.credentials.json) so they are never used
|
||||||
|
without the user's explicit choice. See PR #4210 for the same
|
||||||
|
pattern applied to the setup wizard gate.
|
||||||
|
"""
|
||||||
|
normalized = (provider_id or "").strip().lower()
|
||||||
|
|
||||||
|
# 1. Check auth.json active_provider
|
||||||
|
try:
|
||||||
|
auth_store = _load_auth_store()
|
||||||
|
active = (auth_store.get("active_provider") or "").strip().lower()
|
||||||
|
if active and active == normalized:
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 2. Check config.yaml model.provider
|
||||||
|
try:
|
||||||
|
from hermes_cli.config import load_config
|
||||||
|
cfg = load_config()
|
||||||
|
model_cfg = cfg.get("model")
|
||||||
|
if isinstance(model_cfg, dict):
|
||||||
|
cfg_provider = (model_cfg.get("provider") or "").strip().lower()
|
||||||
|
if cfg_provider == normalized:
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 3. Check provider-specific env vars
|
||||||
|
# Exclude CLAUDE_CODE_OAUTH_TOKEN — it's set by Claude Code itself,
|
||||||
|
# not by the user explicitly configuring anthropic in Hermes.
|
||||||
|
_IMPLICIT_ENV_VARS = {"CLAUDE_CODE_OAUTH_TOKEN"}
|
||||||
|
pconfig = PROVIDER_REGISTRY.get(normalized)
|
||||||
|
if pconfig and pconfig.auth_type == "api_key":
|
||||||
|
for env_var in pconfig.api_key_env_vars:
|
||||||
|
if env_var in _IMPLICIT_ENV_VARS:
|
||||||
|
continue
|
||||||
|
if has_usable_secret(os.getenv(env_var, "")):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def clear_provider_auth(provider_id: Optional[str] = None) -> bool:
|
def clear_provider_auth(provider_id: Optional[str] = None) -> bool:
|
||||||
"""
|
"""
|
||||||
Clear auth state for a provider. Used by `hermes logout`.
|
Clear auth state for a provider. Used by `hermes logout`.
|
||||||
|
|||||||
@@ -347,8 +347,11 @@ def auth_remove_command(args) -> None:
|
|||||||
print("Cleared Hermes Anthropic OAuth credentials")
|
print("Cleared Hermes Anthropic OAuth credentials")
|
||||||
|
|
||||||
elif removed.source == "claude_code" and provider == "anthropic":
|
elif removed.source == "claude_code" and provider == "anthropic":
|
||||||
print("Note: Claude Code credentials live in ~/.claude/.credentials.json")
|
from hermes_cli.auth import suppress_credential_source
|
||||||
print(" Remove them manually if you want to deauthorize Claude Code.")
|
suppress_credential_source(provider, "claude_code")
|
||||||
|
print("Suppressed claude_code credential — it will not be re-seeded.")
|
||||||
|
print("Note: Claude Code credentials still live in ~/.claude/.credentials.json")
|
||||||
|
print("Run `hermes auth add anthropic` to re-enable if needed.")
|
||||||
|
|
||||||
|
|
||||||
def auth_reset_command(args) -> None:
|
def auth_reset_command(args) -> None:
|
||||||
|
|||||||
@@ -1111,3 +1111,45 @@ class TestCallLlmPaymentFallback:
|
|||||||
task="compression",
|
task="compression",
|
||||||
messages=[{"role": "user", "content": "hello"}],
|
messages=[{"role": "user", "content": "hello"}],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Gate: _resolve_api_key_provider must skip anthropic when not configured
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_resolve_api_key_provider_skips_unconfigured_anthropic(monkeypatch):
|
||||||
|
"""_resolve_api_key_provider must not try anthropic when user never configured it."""
|
||||||
|
from collections import OrderedDict
|
||||||
|
from hermes_cli.auth import ProviderConfig
|
||||||
|
|
||||||
|
# Build a minimal registry with only "anthropic" so the loop is guaranteed
|
||||||
|
# to reach it without being short-circuited by earlier providers.
|
||||||
|
fake_registry = OrderedDict({
|
||||||
|
"anthropic": ProviderConfig(
|
||||||
|
id="anthropic",
|
||||||
|
name="Anthropic",
|
||||||
|
auth_type="api_key",
|
||||||
|
inference_base_url="https://api.anthropic.com",
|
||||||
|
api_key_env_vars=("ANTHROPIC_API_KEY",),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
|
||||||
|
called = []
|
||||||
|
|
||||||
|
def mock_try_anthropic():
|
||||||
|
called.append("anthropic")
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
monkeypatch.setattr("agent.auxiliary_client._try_anthropic", mock_try_anthropic)
|
||||||
|
monkeypatch.setattr("hermes_cli.auth.PROVIDER_REGISTRY", fake_registry)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"hermes_cli.auth.is_provider_explicitly_configured",
|
||||||
|
lambda pid: False,
|
||||||
|
)
|
||||||
|
|
||||||
|
from agent.auxiliary_client import _resolve_api_key_provider
|
||||||
|
_resolve_api_key_provider()
|
||||||
|
|
||||||
|
assert "anthropic" not in called, \
|
||||||
|
"_try_anthropic() should not be called when anthropic is not explicitly configured"
|
||||||
|
|||||||
@@ -567,6 +567,7 @@ def test_singleton_seed_does_not_clobber_manual_oauth_entry(tmp_path, monkeypatc
|
|||||||
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
|
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
|
||||||
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
|
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
|
||||||
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
|
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
|
||||||
|
monkeypatch.setattr("hermes_cli.auth.is_provider_explicitly_configured", lambda pid: True)
|
||||||
_write_auth_store(
|
_write_auth_store(
|
||||||
tmp_path,
|
tmp_path,
|
||||||
{
|
{
|
||||||
@@ -1043,3 +1044,30 @@ def test_release_lease_decrements_counter(tmp_path, monkeypatch):
|
|||||||
|
|
||||||
pool.release_lease("cred-1")
|
pool.release_lease("cred-1")
|
||||||
assert pool._active_leases.get("cred-1", 0) == 0
|
assert pool._active_leases.get("cred-1", 0) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_load_pool_does_not_seed_claude_code_when_anthropic_not_configured(tmp_path, monkeypatch):
|
||||||
|
"""Claude Code credentials must not be auto-seeded when the user never selected anthropic."""
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
_write_auth_store(tmp_path, {"version": 1, "credential_pool": {}})
|
||||||
|
|
||||||
|
# Claude Code credentials exist on disk
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"agent.anthropic_adapter.read_claude_code_credentials",
|
||||||
|
lambda: {"accessToken": "sk-ant...oken", "refreshToken": "rt", "expiresAt": 9999999999999},
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"agent.anthropic_adapter.read_hermes_oauth_credentials",
|
||||||
|
lambda: None,
|
||||||
|
)
|
||||||
|
# User configured kimi-coding, NOT anthropic
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"hermes_cli.auth.is_provider_explicitly_configured",
|
||||||
|
lambda pid: pid == "kimi-coding",
|
||||||
|
)
|
||||||
|
|
||||||
|
from agent.credential_pool import load_pool
|
||||||
|
pool = load_pool("anthropic")
|
||||||
|
|
||||||
|
# Should NOT have seeded the claude_code entry
|
||||||
|
assert pool.entries() == []
|
||||||
|
|||||||
@@ -657,3 +657,41 @@ def test_auth_remove_manual_entry_does_not_touch_env(tmp_path, monkeypatch):
|
|||||||
|
|
||||||
# .env should be untouched
|
# .env should be untouched
|
||||||
assert env_path.read_text() == "SOME_KEY=some-value\n"
|
assert env_path.read_text() == "SOME_KEY=some-value\n"
|
||||||
|
|
||||||
|
|
||||||
|
def test_auth_remove_claude_code_suppresses_reseed(tmp_path, monkeypatch):
|
||||||
|
"""Removing a claude_code credential must prevent it from being re-seeded."""
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
|
||||||
|
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
|
||||||
|
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"agent.credential_pool._seed_from_singletons",
|
||||||
|
lambda provider, entries: (False, {"claude_code"}),
|
||||||
|
)
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
auth_store = {
|
||||||
|
"version": 1,
|
||||||
|
"credential_pool": {
|
||||||
|
"anthropic": [{
|
||||||
|
"id": "cc1",
|
||||||
|
"label": "claude_code",
|
||||||
|
"auth_type": "oauth",
|
||||||
|
"priority": 0,
|
||||||
|
"source": "claude_code",
|
||||||
|
"access_token": "sk-ant-oat01-token",
|
||||||
|
}]
|
||||||
|
},
|
||||||
|
}
|
||||||
|
(hermes_home / "auth.json").write_text(json.dumps(auth_store))
|
||||||
|
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from hermes_cli.auth_commands import auth_remove_command
|
||||||
|
auth_remove_command(SimpleNamespace(provider="anthropic", target="1"))
|
||||||
|
|
||||||
|
updated = json.loads((hermes_home / "auth.json").read_text())
|
||||||
|
suppressed = updated.get("suppressed_sources", {})
|
||||||
|
assert "anthropic" in suppressed
|
||||||
|
assert "claude_code" in suppressed["anthropic"]
|
||||||
|
|||||||
78
tests/hermes_cli/test_auth_provider_gate.py
Normal file
78
tests/hermes_cli/test_auth_provider_gate.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
"""Tests for is_provider_explicitly_configured()."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def _write_config(tmp_path, config: dict) -> None:
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
import yaml
|
||||||
|
(hermes_home / "config.yaml").write_text(yaml.dump(config))
|
||||||
|
|
||||||
|
|
||||||
|
def _write_auth_store(tmp_path, payload: dict) -> None:
|
||||||
|
hermes_home = tmp_path / "hermes"
|
||||||
|
hermes_home.mkdir(parents=True, exist_ok=True)
|
||||||
|
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_false_when_no_config(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
(tmp_path / "hermes").mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_true_when_active_provider_matches(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
_write_auth_store(tmp_path, {
|
||||||
|
"version": 1,
|
||||||
|
"providers": {},
|
||||||
|
"active_provider": "anthropic",
|
||||||
|
})
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_true_when_config_provider_matches(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
_write_config(tmp_path, {"model": {"provider": "anthropic", "default": "claude-sonnet-4-6"}})
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_false_when_config_provider_is_different(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
_write_config(tmp_path, {"model": {"provider": "kimi-coding", "default": "kimi-k2"}})
|
||||||
|
_write_auth_store(tmp_path, {
|
||||||
|
"version": 1,
|
||||||
|
"providers": {},
|
||||||
|
"active_provider": None,
|
||||||
|
})
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_returns_true_when_anthropic_env_var_set(tmp_path, monkeypatch):
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
monkeypatch.setenv("ANTHROPIC_API_KEY", "sk-ant-api03-realkey")
|
||||||
|
(tmp_path / "hermes").mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_claude_code_oauth_token_does_not_count_as_explicit(tmp_path, monkeypatch):
|
||||||
|
"""CLAUDE_CODE_OAUTH_TOKEN is set by Claude Code, not the user — must not gate."""
|
||||||
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
|
||||||
|
monkeypatch.setenv("CLAUDE_CODE_OAUTH_TOKEN", "sk-ant-oat01-auto-token")
|
||||||
|
(tmp_path / "hermes").mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
from hermes_cli.auth import is_provider_explicitly_configured
|
||||||
|
assert is_provider_explicitly_configured("anthropic") is False
|
||||||
@@ -649,3 +649,172 @@ class TestNormalizationBypass:
|
|||||||
assert dangerous is False
|
assert dangerous is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestHeredocScriptExecution:
|
||||||
|
"""Script execution via heredoc bypasses the -e/-c flag patterns.
|
||||||
|
|
||||||
|
`python3 << 'EOF'` feeds arbitrary code through stdin without any
|
||||||
|
flag that the original patterns check for. See security audit Test 3.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_python3_heredoc_detected(self):
|
||||||
|
# The heredoc body also contains `rm -rf /` which fires the
|
||||||
|
# "delete in root path" pattern first (patterns are ordered).
|
||||||
|
# The heredoc pattern also matches — either detection is correct.
|
||||||
|
cmd = "python3 << 'EOF'\nimport os; os.system('rm -rf /')\nEOF"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_python_heredoc_detected(self):
|
||||||
|
cmd = 'python << "PYEOF"\nprint("pwned")\nPYEOF'
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_perl_heredoc_detected(self):
|
||||||
|
cmd = "perl <<'END'\nsystem('whoami');\nEND"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_ruby_heredoc_detected(self):
|
||||||
|
cmd = "ruby <<RUBY\n`rm -rf /`\nRUBY"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_node_heredoc_detected(self):
|
||||||
|
cmd = "node << 'JS'\nrequire('child_process').execSync('whoami')\nJS"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_python3_dash_c_still_detected(self):
|
||||||
|
"""Existing -c pattern must not regress."""
|
||||||
|
cmd = "python3 -c 'import os; os.system(\"rm -rf /\")'"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_safe_python_not_flagged(self):
|
||||||
|
"""Plain 'python3 script.py' without heredoc or -c must stay safe."""
|
||||||
|
cmd = "python3 my_script.py"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestPgrepKillExpansion:
|
||||||
|
"""kill -9 $(pgrep hermes) bypasses the pkill/killall name-matching
|
||||||
|
pattern because the command substitution is opaque to regex.
|
||||||
|
|
||||||
|
See security audit Test 7.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_kill_dollar_pgrep_detected(self):
|
||||||
|
cmd = 'kill -9 $(pgrep -f "hermes.*gateway")'
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
assert "pgrep" in desc.lower()
|
||||||
|
|
||||||
|
def test_kill_backtick_pgrep_detected(self):
|
||||||
|
cmd = "kill -9 `pgrep hermes`"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_kill_dollar_pgrep_no_flags(self):
|
||||||
|
cmd = "kill $(pgrep gateway)"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_pkill_hermes_still_detected(self):
|
||||||
|
"""Existing pkill pattern must not regress."""
|
||||||
|
cmd = "pkill -9 hermes"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_safe_kill_pid_not_flagged(self):
|
||||||
|
"""A plain 'kill 12345' (literal PID, no expansion) must stay safe."""
|
||||||
|
cmd = "kill 12345"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestGitDestructiveOps:
|
||||||
|
"""git reset --hard, push --force, clean -f, branch -D can destroy
|
||||||
|
work and rewrite shared history. Not covered by rm/chmod patterns.
|
||||||
|
|
||||||
|
See security audit Test 6.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_git_reset_hard_detected(self):
|
||||||
|
cmd = "git reset --hard HEAD~3"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
assert "reset" in desc.lower() or "hard" in desc.lower()
|
||||||
|
|
||||||
|
def test_git_push_force_detected(self):
|
||||||
|
cmd = "git push --force origin main"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
assert "force" in desc.lower()
|
||||||
|
|
||||||
|
def test_git_push_dash_f_detected(self):
|
||||||
|
cmd = "git push -f origin main"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_git_clean_force_detected(self):
|
||||||
|
cmd = "git clean -fd"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
assert "clean" in desc.lower()
|
||||||
|
|
||||||
|
def test_git_branch_force_delete_detected(self):
|
||||||
|
cmd = "git branch -D feature-branch"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_safe_git_status_not_flagged(self):
|
||||||
|
cmd = "git status"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is False
|
||||||
|
|
||||||
|
def test_safe_git_push_not_flagged(self):
|
||||||
|
"""Normal push without --force must not be flagged."""
|
||||||
|
cmd = "git push origin main"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is False
|
||||||
|
|
||||||
|
def test_git_branch_lowercase_d_also_flagged(self):
|
||||||
|
"""git branch -d triggers approval too — IGNORECASE is global.
|
||||||
|
|
||||||
|
This is intentional: -d is safer than -D but an approval prompt
|
||||||
|
for branch deletion is reasonable. The user can still approve.
|
||||||
|
"""
|
||||||
|
cmd = "git branch -d feature-branch"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestChmodExecuteCombo:
|
||||||
|
"""chmod +x && ./ is the two-step social engineering pattern where a
|
||||||
|
script is first made executable then immediately run. The script
|
||||||
|
content may contain dangerous commands invisible to pattern matching.
|
||||||
|
|
||||||
|
See security audit Test 4.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_chmod_and_execute_detected(self):
|
||||||
|
cmd = "chmod +x /tmp/cleanup.sh && ./cleanup.sh"
|
||||||
|
dangerous, _, desc = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is True
|
||||||
|
assert "chmod" in desc.lower() or "execution" in desc.lower()
|
||||||
|
|
||||||
|
def test_chmod_semicolon_execute_detected(self):
|
||||||
|
cmd = "chmod +x script.sh; ./script.sh"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
# Semicolon variant — pattern uses && but full-string match
|
||||||
|
# on chmod +x should still trigger even without the && ./
|
||||||
|
assert dangerous is True
|
||||||
|
|
||||||
|
def test_safe_chmod_without_execute_not_flagged(self):
|
||||||
|
"""chmod +x alone without immediate execution must not be flagged."""
|
||||||
|
cmd = "chmod +x script.sh"
|
||||||
|
dangerous, _, _ = detect_dangerous_command(cmd)
|
||||||
|
assert dangerous is False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ from contextlib import contextmanager
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from tools.skill_manager_tool import (
|
from tools.skill_manager_tool import (
|
||||||
_validate_name,
|
_validate_name,
|
||||||
_validate_category,
|
_validate_category,
|
||||||
@@ -330,6 +332,25 @@ word word
|
|||||||
result = _patch_skill("nonexistent", "old", "new")
|
result = _patch_skill("nonexistent", "old", "new")
|
||||||
assert result["success"] is False
|
assert result["success"] is False
|
||||||
|
|
||||||
|
def test_patch_supporting_file_symlink_escape_blocked(self, tmp_path):
|
||||||
|
outside_file = tmp_path / "outside.txt"
|
||||||
|
outside_file.write_text("old text here")
|
||||||
|
|
||||||
|
with _skill_dir(tmp_path):
|
||||||
|
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||||
|
link = tmp_path / "my-skill" / "references" / "evil.md"
|
||||||
|
link.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
try:
|
||||||
|
link.symlink_to(outside_file)
|
||||||
|
except OSError:
|
||||||
|
pytest.skip("Symlinks not supported")
|
||||||
|
|
||||||
|
result = _patch_skill("my-skill", "old text", "new text", file_path="references/evil.md")
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "boundary" in result["error"].lower()
|
||||||
|
assert outside_file.read_text() == "old text here"
|
||||||
|
|
||||||
|
|
||||||
class TestDeleteSkill:
|
class TestDeleteSkill:
|
||||||
def test_delete_existing(self, tmp_path):
|
def test_delete_existing(self, tmp_path):
|
||||||
@@ -375,6 +396,25 @@ class TestWriteFile:
|
|||||||
result = _write_file("my-skill", "secret/evil.py", "malicious")
|
result = _write_file("my-skill", "secret/evil.py", "malicious")
|
||||||
assert result["success"] is False
|
assert result["success"] is False
|
||||||
|
|
||||||
|
def test_write_symlink_escape_blocked(self, tmp_path):
|
||||||
|
outside_dir = tmp_path / "outside"
|
||||||
|
outside_dir.mkdir()
|
||||||
|
|
||||||
|
with _skill_dir(tmp_path):
|
||||||
|
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||||
|
link = tmp_path / "my-skill" / "references" / "escape"
|
||||||
|
link.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
try:
|
||||||
|
link.symlink_to(outside_dir, target_is_directory=True)
|
||||||
|
except OSError:
|
||||||
|
pytest.skip("Symlinks not supported")
|
||||||
|
|
||||||
|
result = _write_file("my-skill", "references/escape/owned.md", "malicious")
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "boundary" in result["error"].lower()
|
||||||
|
assert not (outside_dir / "owned.md").exists()
|
||||||
|
|
||||||
|
|
||||||
class TestRemoveFile:
|
class TestRemoveFile:
|
||||||
def test_remove_existing_file(self, tmp_path):
|
def test_remove_existing_file(self, tmp_path):
|
||||||
@@ -391,6 +431,27 @@ class TestRemoveFile:
|
|||||||
result = _remove_file("my-skill", "references/nope.md")
|
result = _remove_file("my-skill", "references/nope.md")
|
||||||
assert result["success"] is False
|
assert result["success"] is False
|
||||||
|
|
||||||
|
def test_remove_symlink_escape_blocked(self, tmp_path):
|
||||||
|
outside_dir = tmp_path / "outside"
|
||||||
|
outside_dir.mkdir()
|
||||||
|
outside_file = outside_dir / "keep.txt"
|
||||||
|
outside_file.write_text("content")
|
||||||
|
|
||||||
|
with _skill_dir(tmp_path):
|
||||||
|
_create_skill("my-skill", VALID_SKILL_CONTENT)
|
||||||
|
link = tmp_path / "my-skill" / "references" / "escape"
|
||||||
|
link.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
try:
|
||||||
|
link.symlink_to(outside_dir, target_is_directory=True)
|
||||||
|
except OSError:
|
||||||
|
pytest.skip("Symlinks not supported")
|
||||||
|
|
||||||
|
result = _remove_file("my-skill", "references/escape/keep.txt")
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "boundary" in result["error"].lower()
|
||||||
|
assert outside_file.exists()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# skill_manage dispatcher
|
# skill_manage dispatcher
|
||||||
|
|||||||
@@ -99,10 +99,30 @@ DANGEROUS_PATTERNS = [
|
|||||||
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
||||||
# Self-termination protection: prevent agent from killing its own process
|
# Self-termination protection: prevent agent from killing its own process
|
||||||
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
|
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
|
||||||
|
# Self-termination via kill + command substitution (pgrep/pidof).
|
||||||
|
# The name-based pattern above catches `pkill hermes` but not
|
||||||
|
# `kill -9 $(pgrep -f hermes)` because the substitution is opaque
|
||||||
|
# to regex at detection time. Catch the structural pattern instead.
|
||||||
|
(r'\bkill\b.*\$\(\s*pgrep\b', "kill process via pgrep expansion (self-termination)"),
|
||||||
|
(r'\bkill\b.*`\s*pgrep\b', "kill process via backtick pgrep expansion (self-termination)"),
|
||||||
# File copy/move/edit into sensitive system paths
|
# File copy/move/edit into sensitive system paths
|
||||||
(r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
|
(r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
|
||||||
(r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
|
(r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
|
||||||
(r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
|
(r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
|
||||||
|
# Script execution via heredoc — bypasses the -e/-c flag patterns above.
|
||||||
|
# `python3 << 'EOF'` feeds arbitrary code via stdin without -c/-e flags.
|
||||||
|
(r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"),
|
||||||
|
# Git destructive operations that can lose uncommitted work or rewrite
|
||||||
|
# shared history. Not captured by rm/chmod/etc patterns.
|
||||||
|
(r'\bgit\s+reset\s+--hard\b', "git reset --hard (destroys uncommitted changes)"),
|
||||||
|
(r'\bgit\s+push\b.*--force\b', "git force push (rewrites remote history)"),
|
||||||
|
(r'\bgit\s+push\b.*-f\b', "git force push short flag (rewrites remote history)"),
|
||||||
|
(r'\bgit\s+clean\s+-[^\s]*f', "git clean with force (deletes untracked files)"),
|
||||||
|
(r'\bgit\s+branch\s+-D\b', "git branch force delete"),
|
||||||
|
# Script execution after chmod +x — catches the two-step pattern where
|
||||||
|
# a script is first made executable then immediately run. The script
|
||||||
|
# content may contain dangerous commands that individual patterns miss.
|
||||||
|
(r'\bchmod\s+\+x\b.*[;&|]+\s*\./', "chmod +x followed by immediate execution"),
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -40,7 +40,7 @@ import shutil
|
|||||||
import tempfile
|
import tempfile
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from hermes_constants import get_hermes_home
|
from hermes_constants import get_hermes_home
|
||||||
from typing import Dict, Any, Optional
|
from typing import Dict, Any, Optional, Tuple
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -240,6 +240,20 @@ def _validate_file_path(file_path: str) -> Optional[str]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_skill_target(skill_dir: Path, file_path: str) -> Tuple[Optional[Path], Optional[str]]:
|
||||||
|
"""Resolve a supporting-file path and ensure it stays within the skill directory."""
|
||||||
|
target = skill_dir / file_path
|
||||||
|
try:
|
||||||
|
resolved = target.resolve(strict=False)
|
||||||
|
skill_dir_resolved = skill_dir.resolve()
|
||||||
|
resolved.relative_to(skill_dir_resolved)
|
||||||
|
except ValueError:
|
||||||
|
return None, "Path escapes skill directory boundary."
|
||||||
|
except OSError as e:
|
||||||
|
return None, f"Invalid file path '{file_path}': {e}"
|
||||||
|
return target, None
|
||||||
|
|
||||||
|
|
||||||
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
|
def _atomic_write_text(file_path: Path, content: str, encoding: str = "utf-8") -> None:
|
||||||
"""
|
"""
|
||||||
Atomically write text content to a file.
|
Atomically write text content to a file.
|
||||||
@@ -394,7 +408,9 @@ def _patch_skill(
|
|||||||
err = _validate_file_path(file_path)
|
err = _validate_file_path(file_path)
|
||||||
if err:
|
if err:
|
||||||
return {"success": False, "error": err}
|
return {"success": False, "error": err}
|
||||||
target = skill_dir / file_path
|
target, err = _resolve_skill_target(skill_dir, file_path)
|
||||||
|
if err:
|
||||||
|
return {"success": False, "error": err}
|
||||||
else:
|
else:
|
||||||
# Patching SKILL.md
|
# Patching SKILL.md
|
||||||
target = skill_dir / "SKILL.md"
|
target = skill_dir / "SKILL.md"
|
||||||
@@ -500,7 +516,9 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
|
|||||||
if not existing:
|
if not existing:
|
||||||
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
|
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
|
||||||
|
|
||||||
target = existing["path"] / file_path
|
target, err = _resolve_skill_target(existing["path"], file_path)
|
||||||
|
if err:
|
||||||
|
return {"success": False, "error": err}
|
||||||
target.parent.mkdir(parents=True, exist_ok=True)
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
# Back up for rollback
|
# Back up for rollback
|
||||||
original_content = target.read_text(encoding="utf-8") if target.exists() else None
|
original_content = target.read_text(encoding="utf-8") if target.exists() else None
|
||||||
@@ -533,7 +551,9 @@ def _remove_file(name: str, file_path: str) -> Dict[str, Any]:
|
|||||||
return {"success": False, "error": f"Skill '{name}' not found."}
|
return {"success": False, "error": f"Skill '{name}' not found."}
|
||||||
skill_dir = existing["path"]
|
skill_dir = existing["path"]
|
||||||
|
|
||||||
target = skill_dir / file_path
|
target, err = _resolve_skill_target(skill_dir, file_path)
|
||||||
|
if err:
|
||||||
|
return {"success": False, "error": err}
|
||||||
if not target.exists():
|
if not target.exists():
|
||||||
# List what's actually there for the model to see
|
# List what's actually there for the model to see
|
||||||
available = []
|
available = []
|
||||||
|
|||||||
Reference in New Issue
Block a user