Files
hermes-agent/tests/tools/test_approval_plugin_hooks.py

249 lines
9.3 KiB
Python
Raw Normal View History

"""Tests for pre_approval_request / post_approval_response plugin hooks.
These hooks fire in tools/approval.py::check_all_command_guards whenever a
dangerous command needs user approval. They are observer-only (return values
ignored) and must fire on BOTH the CLI-interactive path and the async gateway
path, so external tools like macOS notifiers can be alerted regardless of
which surface the user is on.
"""
from unittest.mock import patch
import pytest
import tools.approval as approval_module
from tools.approval import (
check_all_command_guards,
register_gateway_notify,
unregister_gateway_notify,
resolve_gateway_approval,
set_current_session_key,
clear_session,
)
@pytest.fixture
def isolated_session(monkeypatch):
"""Give each test a fresh session_key and clean approval-state."""
session_key = "test:session:approval_hooks"
token = set_current_session_key(session_key)
monkeypatch.setenv("HERMES_SESSION_KEY", session_key)
# Make sure we don't skip guards via yolo / approvals.mode=off
monkeypatch.delenv("HERMES_YOLO_MODE", raising=False)
try:
yield session_key
finally:
try:
approval_module._approval_session_key.reset(token)
except Exception:
pass
clear_session(session_key)
class TestCliPathFiresHooks:
"""CLI-interactive approval path: HERMES_INTERACTIVE is set, the
prompt_dangerous_approval() result decides the outcome."""
def test_pre_and_post_fire_with_expected_kwargs(
self, isolated_session, monkeypatch
):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
# approvals.mode=manual so we actually reach the prompt site
monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual")
captured = []
def fake_invoke_hook(hook_name, **kwargs):
captured.append((hook_name, kwargs))
return []
# Force the user to "approve once" via the approval_callback contract
def cb(command, description, *, allow_permanent=True):
return "once"
with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook):
result = check_all_command_guards(
"rm -rf /tmp/test-hook", "local", approval_callback=cb,
)
assert result["approved"] is True
hook_names = [c[0] for c in captured]
assert "pre_approval_request" in hook_names
assert "post_approval_response" in hook_names
pre_kwargs = next(kw for name, kw in captured if name == "pre_approval_request")
assert pre_kwargs["command"] == "rm -rf /tmp/test-hook"
assert pre_kwargs["surface"] == "cli"
assert pre_kwargs["session_key"] == isolated_session
assert isinstance(pre_kwargs["pattern_keys"], list)
assert pre_kwargs["pattern_key"] # non-empty primary pattern
assert pre_kwargs["description"]
post_kwargs = next(kw for name, kw in captured if name == "post_approval_response")
assert post_kwargs["choice"] == "once"
assert post_kwargs["surface"] == "cli"
assert post_kwargs["command"] == "rm -rf /tmp/test-hook"
def test_deny_reported_to_post_hook(self, isolated_session, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual")
captured = []
def fake_invoke_hook(hook_name, **kwargs):
captured.append((hook_name, kwargs))
return []
def cb(command, description, *, allow_permanent=True):
return "deny"
with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook):
result = check_all_command_guards(
"rm -rf /tmp/test-deny", "local", approval_callback=cb,
)
assert result["approved"] is False
post_kwargs = next(kw for name, kw in captured if name == "post_approval_response")
assert post_kwargs["choice"] == "deny"
def test_plugin_hook_crash_does_not_break_approval(
self, isolated_session, monkeypatch
):
"""A crashing plugin must never prevent the approval flow from
reaching the user. Hooks are observer-only and safety-critical
behavior must be preserved."""
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False)
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual")
def boom(hook_name, **kwargs):
raise RuntimeError("plugin crashed")
def cb(command, description, *, allow_permanent=True):
return "once"
with patch("hermes_cli.plugins.invoke_hook", side_effect=boom):
result = check_all_command_guards(
"rm -rf /tmp/test-crash", "local", approval_callback=cb,
)
# User's approval was still honored despite the plugin crashing
assert result["approved"] is True
class TestGatewayPathFiresHooks:
"""Async gateway approval path: HERMES_GATEWAY_SESSION is set and a
gateway notify callback is registered. The agent thread blocks on the
approval event until resolve_gateway_approval() is called from another
thread."""
def test_pre_and_post_fire_on_gateway_surface(
self, isolated_session, monkeypatch
):
import threading
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1")
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual")
# Short gateway_timeout so a buggy test fails fast instead of hanging
monkeypatch.setattr(
approval_module, "_get_approval_config", lambda: {"gateway_timeout": 10}
)
captured = []
def fake_invoke_hook(hook_name, **kwargs):
captured.append((hook_name, kwargs))
return []
notify_seen = threading.Event()
def notify_cb(approval_data):
notify_seen.set()
register_gateway_notify(isolated_session, notify_cb)
result_holder = {}
def run_guard():
with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook):
result_holder["result"] = check_all_command_guards(
"rm -rf /tmp/test-gateway-hook", "local",
)
t = threading.Thread(target=run_guard, daemon=True)
t.start()
# Wait for the gateway callback to see the approval request
assert notify_seen.wait(timeout=5), "Gateway notify never fired"
# User approves from the "other thread" (simulating /approve command)
resolve_gateway_approval(isolated_session, "once")
t.join(timeout=5)
assert not t.is_alive(), "Agent thread never unblocked"
unregister_gateway_notify(isolated_session)
assert result_holder["result"]["approved"] is True
hook_names = [c[0] for c in captured]
assert "pre_approval_request" in hook_names
assert "post_approval_response" in hook_names
pre_kwargs = next(kw for name, kw in captured if name == "pre_approval_request")
assert pre_kwargs["surface"] == "gateway"
assert pre_kwargs["command"] == "rm -rf /tmp/test-gateway-hook"
post_kwargs = next(kw for name, kw in captured if name == "post_approval_response")
assert post_kwargs["surface"] == "gateway"
assert post_kwargs["choice"] == "once"
def test_timeout_reports_timeout_choice(self, isolated_session, monkeypatch):
import threading
monkeypatch.delenv("HERMES_INTERACTIVE", raising=False)
monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1")
monkeypatch.delenv("HERMES_EXEC_ASK", raising=False)
monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual")
monkeypatch.setattr(
approval_module, "_get_approval_config", lambda: {"gateway_timeout": 1}
)
captured = []
def fake_invoke_hook(hook_name, **kwargs):
captured.append((hook_name, kwargs))
return []
notify_seen = threading.Event()
def notify_cb(approval_data):
notify_seen.set()
register_gateway_notify(isolated_session, notify_cb)
result_holder = {}
def run_guard():
with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook):
result_holder["result"] = check_all_command_guards(
"rm -rf /tmp/test-gateway-timeout", "local",
)
t = threading.Thread(target=run_guard, daemon=True)
t.start()
assert notify_seen.wait(timeout=5)
# Deliberately do NOT resolve -- let it time out
t.join(timeout=5)
assert not t.is_alive()
unregister_gateway_notify(isolated_session)
assert result_holder["result"]["approved"] is False
post_kwargs = next(kw for name, kw in captured if name == "post_approval_response")
assert post_kwargs["choice"] == "timeout"