diff --git a/hermes_cli/plugins.py b/hermes_cli/plugins.py index 7eb9a400c91..3d514ddc126 100644 --- a/hermes_cli/plugins.py +++ b/hermes_cli/plugins.py @@ -79,6 +79,20 @@ VALID_HOOKS: Set[str] = { # {"action": "allow"} / None -> normal dispatch # Kwargs: event: MessageEvent, gateway: GatewayRunner, session_store. "pre_gateway_dispatch", + # Approval lifecycle hooks. Fired by tools/approval.py when a dangerous + # command needs user approval -- fires BOTH for CLI-interactive prompts + # and for gateway/ACP approvals (Telegram, Discord, Slack, TUI, etc.). + # Observers only: return values are ignored. Plugins cannot veto or + # pre-answer an approval from these hooks (use pre_tool_call to block + # a tool before it reaches approval). + # + # Kwargs for pre_approval_request: + # command: str, description: str, pattern_key: str, pattern_keys: list[str], + # session_key: str, surface: "cli" | "gateway" + # Kwargs for post_approval_response: same as above plus + # choice: "once" | "session" | "always" | "deny" | "timeout" + "pre_approval_request", + "post_approval_response", } ENTRY_POINTS_GROUP = "hermes_agent.plugins" diff --git a/tests/tools/test_approval_plugin_hooks.py b/tests/tools/test_approval_plugin_hooks.py new file mode 100644 index 00000000000..29489cf8778 --- /dev/null +++ b/tests/tools/test_approval_plugin_hooks.py @@ -0,0 +1,248 @@ +"""Tests for pre_approval_request / post_approval_response plugin hooks. + +These hooks fire in tools/approval.py::check_all_command_guards whenever a +dangerous command needs user approval. They are observer-only (return values +ignored) and must fire on BOTH the CLI-interactive path and the async gateway +path, so external tools like macOS notifiers can be alerted regardless of +which surface the user is on. +""" +from unittest.mock import patch + +import pytest + +import tools.approval as approval_module +from tools.approval import ( + check_all_command_guards, + register_gateway_notify, + unregister_gateway_notify, + resolve_gateway_approval, + set_current_session_key, + clear_session, +) + + +@pytest.fixture +def isolated_session(monkeypatch): + """Give each test a fresh session_key and clean approval-state.""" + session_key = "test:session:approval_hooks" + token = set_current_session_key(session_key) + monkeypatch.setenv("HERMES_SESSION_KEY", session_key) + # Make sure we don't skip guards via yolo / approvals.mode=off + monkeypatch.delenv("HERMES_YOLO_MODE", raising=False) + try: + yield session_key + finally: + try: + approval_module._approval_session_key.reset(token) + except Exception: + pass + clear_session(session_key) + + +class TestCliPathFiresHooks: + """CLI-interactive approval path: HERMES_INTERACTIVE is set, the + prompt_dangerous_approval() result decides the outcome.""" + + def test_pre_and_post_fire_with_expected_kwargs( + self, isolated_session, monkeypatch + ): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + # approvals.mode=manual so we actually reach the prompt site + monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") + + captured = [] + + def fake_invoke_hook(hook_name, **kwargs): + captured.append((hook_name, kwargs)) + return [] + + # Force the user to "approve once" via the approval_callback contract + def cb(command, description, *, allow_permanent=True): + return "once" + + with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): + result = check_all_command_guards( + "rm -rf /tmp/test-hook", "local", approval_callback=cb, + ) + + assert result["approved"] is True + + hook_names = [c[0] for c in captured] + assert "pre_approval_request" in hook_names + assert "post_approval_response" in hook_names + + pre_kwargs = next(kw for name, kw in captured if name == "pre_approval_request") + assert pre_kwargs["command"] == "rm -rf /tmp/test-hook" + assert pre_kwargs["surface"] == "cli" + assert pre_kwargs["session_key"] == isolated_session + assert isinstance(pre_kwargs["pattern_keys"], list) + assert pre_kwargs["pattern_key"] # non-empty primary pattern + assert pre_kwargs["description"] + + post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") + assert post_kwargs["choice"] == "once" + assert post_kwargs["surface"] == "cli" + assert post_kwargs["command"] == "rm -rf /tmp/test-hook" + + def test_deny_reported_to_post_hook(self, isolated_session, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") + + captured = [] + + def fake_invoke_hook(hook_name, **kwargs): + captured.append((hook_name, kwargs)) + return [] + + def cb(command, description, *, allow_permanent=True): + return "deny" + + with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): + result = check_all_command_guards( + "rm -rf /tmp/test-deny", "local", approval_callback=cb, + ) + + assert result["approved"] is False + post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") + assert post_kwargs["choice"] == "deny" + + def test_plugin_hook_crash_does_not_break_approval( + self, isolated_session, monkeypatch + ): + """A crashing plugin must never prevent the approval flow from + reaching the user. Hooks are observer-only and safety-critical + behavior must be preserved.""" + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + monkeypatch.delenv("HERMES_GATEWAY_SESSION", raising=False) + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") + + def boom(hook_name, **kwargs): + raise RuntimeError("plugin crashed") + + def cb(command, description, *, allow_permanent=True): + return "once" + + with patch("hermes_cli.plugins.invoke_hook", side_effect=boom): + result = check_all_command_guards( + "rm -rf /tmp/test-crash", "local", approval_callback=cb, + ) + + # User's approval was still honored despite the plugin crashing + assert result["approved"] is True + + +class TestGatewayPathFiresHooks: + """Async gateway approval path: HERMES_GATEWAY_SESSION is set and a + gateway notify callback is registered. The agent thread blocks on the + approval event until resolve_gateway_approval() is called from another + thread.""" + + def test_pre_and_post_fire_on_gateway_surface( + self, isolated_session, monkeypatch + ): + import threading + + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1") + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") + # Short gateway_timeout so a buggy test fails fast instead of hanging + monkeypatch.setattr( + approval_module, "_get_approval_config", lambda: {"gateway_timeout": 10} + ) + + captured = [] + + def fake_invoke_hook(hook_name, **kwargs): + captured.append((hook_name, kwargs)) + return [] + + notify_seen = threading.Event() + + def notify_cb(approval_data): + notify_seen.set() + + register_gateway_notify(isolated_session, notify_cb) + result_holder = {} + + def run_guard(): + with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): + result_holder["result"] = check_all_command_guards( + "rm -rf /tmp/test-gateway-hook", "local", + ) + + t = threading.Thread(target=run_guard, daemon=True) + t.start() + + # Wait for the gateway callback to see the approval request + assert notify_seen.wait(timeout=5), "Gateway notify never fired" + + # User approves from the "other thread" (simulating /approve command) + resolve_gateway_approval(isolated_session, "once") + + t.join(timeout=5) + assert not t.is_alive(), "Agent thread never unblocked" + unregister_gateway_notify(isolated_session) + + assert result_holder["result"]["approved"] is True + + hook_names = [c[0] for c in captured] + assert "pre_approval_request" in hook_names + assert "post_approval_response" in hook_names + + pre_kwargs = next(kw for name, kw in captured if name == "pre_approval_request") + assert pre_kwargs["surface"] == "gateway" + assert pre_kwargs["command"] == "rm -rf /tmp/test-gateway-hook" + + post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") + assert post_kwargs["surface"] == "gateway" + assert post_kwargs["choice"] == "once" + + def test_timeout_reports_timeout_choice(self, isolated_session, monkeypatch): + import threading + + monkeypatch.delenv("HERMES_INTERACTIVE", raising=False) + monkeypatch.setenv("HERMES_GATEWAY_SESSION", "1") + monkeypatch.delenv("HERMES_EXEC_ASK", raising=False) + monkeypatch.setattr(approval_module, "_get_approval_mode", lambda: "manual") + monkeypatch.setattr( + approval_module, "_get_approval_config", lambda: {"gateway_timeout": 1} + ) + + captured = [] + + def fake_invoke_hook(hook_name, **kwargs): + captured.append((hook_name, kwargs)) + return [] + + notify_seen = threading.Event() + + def notify_cb(approval_data): + notify_seen.set() + + register_gateway_notify(isolated_session, notify_cb) + result_holder = {} + + def run_guard(): + with patch("hermes_cli.plugins.invoke_hook", side_effect=fake_invoke_hook): + result_holder["result"] = check_all_command_guards( + "rm -rf /tmp/test-gateway-timeout", "local", + ) + + t = threading.Thread(target=run_guard, daemon=True) + t.start() + assert notify_seen.wait(timeout=5) + # Deliberately do NOT resolve -- let it time out + t.join(timeout=5) + assert not t.is_alive() + unregister_gateway_notify(isolated_session) + + assert result_holder["result"]["approved"] is False + + post_kwargs = next(kw for name, kw in captured if name == "post_approval_response") + assert post_kwargs["choice"] == "timeout" diff --git a/tools/approval.py b/tools/approval.py index c31c764d6ff..aac0406d544 100644 --- a/tools/approval.py +++ b/tools/approval.py @@ -30,6 +30,32 @@ _approval_session_key: contextvars.ContextVar[str] = contextvars.ContextVar( ) +def _fire_approval_hook(hook_name: str, **kwargs) -> None: + """Invoke a plugin lifecycle hook for the approval system. + + Lazy-imports the plugin manager to avoid circular imports (approval.py is + imported very early, long before plugins are discovered). Never raises -- + plugin errors are logged and swallowed. + + Only fires for the two approval-specific hooks in VALID_HOOKS: + pre_approval_request, post_approval_response. + """ + try: + from hermes_cli.plugins import invoke_hook + except Exception: + # Plugin system not available in this execution context + # (e.g. bare tool-only imports, minimal test environments). + return + try: + invoke_hook(hook_name, **kwargs) + except Exception as exc: + # invoke_hook() already swallows per-callback errors, so reaching here + # means the dispatch layer itself failed. Log and move on -- approval + # flow is safety-critical, plugin observability is not. + logger.debug("Approval hook %s dispatch failed: %s", hook_name, exc) + + + def set_current_session_key(session_key: str) -> contextvars.Token[str]: """Bind the active approval session key to the current context.""" return _approval_session_key.set(session_key or "") @@ -1002,6 +1028,19 @@ def check_all_command_guards(command: str, env_type: str, with _lock: _gateway_queues.setdefault(session_key, []).append(entry) + # Notify plugins that an approval is being requested. Fires before + # the gateway notify callback so observers (e.g. macOS notifier + # plugins, audit logs, Slack alerts) get the event in real time. + _fire_approval_hook( + "pre_approval_request", + command=command, + description=combined_desc, + pattern_key=primary_key, + pattern_keys=list(all_keys), + session_key=session_key, + surface="gateway", + ) + # Notify the user (bridges sync agent thread → async gateway) try: notify_cb(approval_data) @@ -1067,6 +1106,24 @@ def check_all_command_guards(command: str, env_type: str, _gateway_queues.pop(session_key, None) choice = entry.result + # Normalize outcome for the post hook. Unresolved (timeout) and + # None both mean the user never responded; report that explicitly + # so plugins can distinguish timeout from explicit deny. + _outcome = ( + "timeout" if not resolved + else (choice if choice else "timeout") + ) + _fire_approval_hook( + "post_approval_response", + command=command, + description=combined_desc, + pattern_key=primary_key, + pattern_keys=list(all_keys), + session_key=session_key, + surface="gateway", + choice=_outcome, + ) + if not resolved or choice is None or choice == "deny": reason = "timed out" if not resolved else "denied by user" return { @@ -1111,9 +1168,28 @@ def check_all_command_guards(command: str, env_type: str, # CLI interactive: single combined prompt # Hide [a]lways when any tirith warning is present + _fire_approval_hook( + "pre_approval_request", + command=command, + description=combined_desc, + pattern_key=primary_key, + pattern_keys=list(all_keys), + session_key=session_key, + surface="cli", + ) choice = prompt_dangerous_approval(command, combined_desc, allow_permanent=not has_tirith, approval_callback=approval_callback) + _fire_approval_hook( + "post_approval_response", + command=command, + description=combined_desc, + pattern_key=primary_key, + pattern_keys=list(all_keys), + session_key=session_key, + surface="cli", + choice=choice, + ) if choice == "deny": return { diff --git a/website/docs/user-guide/features/hooks.md b/website/docs/user-guide/features/hooks.md index ba77e535f1c..3d81027ed60 100644 --- a/website/docs/user-guide/features/hooks.md +++ b/website/docs/user-guide/features/hooks.md @@ -248,6 +248,8 @@ def register(ctx): | [`on_session_reset`](#on_session_reset) | Gateway swaps in a fresh session key (e.g. `/new`, `/reset`) | ignored | | [`subagent_stop`](#subagent_stop) | A `delegate_task` child has exited | ignored | | [`pre_gateway_dispatch`](#pre_gateway_dispatch) | Gateway received a user message, before auth + dispatch | `{"action": "skip" \| "rewrite" \| "allow", ...}` to influence flow | +| [`pre_approval_request`](#pre_approval_request) | Dangerous command needs user approval, before the prompt/notification is sent | ignored | +| [`post_approval_response`](#post_approval_response) | User responded to an approval prompt (or it timed out) | ignored | --- @@ -775,6 +777,97 @@ def register(ctx): --- +### `pre_approval_request` + +Fires **immediately before** an approval request is shown to the user — covers every surface: interactive CLI, the Ink TUI, gateway platforms (Telegram, Discord, Slack, WhatsApp, Matrix, etc.), and ACP clients (VS Code, Zed, JetBrains). + +This is the right place to wire a custom notifier — for example, a macOS menu-bar app that pops an allow/deny notification, or an audit log that records every approval request with context. + +**Callback signature:** + +```python +def my_callback( + command: str, + description: str, + pattern_key: str, + pattern_keys: list[str], + session_key: str, + surface: str, + **kwargs, +): +``` + +| Parameter | Type | Description | +|-----------|------|-------------| +| `command` | `str` | The shell command awaiting approval | +| `description` | `str` | Human-readable reason(s) the command is flagged (combined when multiple patterns match) | +| `pattern_key` | `str` | Primary pattern key that triggered the approval (e.g. `"rm_rf"`, `"sudo"`) | +| `pattern_keys` | `list[str]` | All pattern keys that matched | +| `session_key` | `str` | Session identifier, useful for scoping notifications per-chat | +| `surface` | `str` | `"cli"` for interactive CLI/TUI prompts, `"gateway"` for async platform approvals | + +**Return value:** ignored. Hooks here are observer-only; they cannot veto or pre-answer the approval. Use [`pre_tool_call`](#pre_tool_call) to block a tool before it reaches the approval system. + +**Use cases:** Desktop notifications, push alerts, audit logging, Slack webhooks, escalation routing, metrics. + +**Example — desktop notification on macOS:** + +```python +import subprocess + +def notify_approval(command, description, session_key, **kwargs): + title = "Hermes needs approval" + body = f"{description}: {command[:80]}" + subprocess.Popen([ + "osascript", "-e", + f'display notification "{body}" with title "{title}"', + ]) + +def register(ctx): + ctx.register_hook("pre_approval_request", notify_approval) +``` + +--- + +### `post_approval_response` + +Fires **after** the user responds to an approval prompt (or the prompt times out). + +**Callback signature:** + +```python +def my_callback( + command: str, + description: str, + pattern_key: str, + pattern_keys: list[str], + session_key: str, + surface: str, + choice: str, + **kwargs, +): +``` + +Same kwargs as `pre_approval_request`, plus: + +| Parameter | Type | Description | +|-----------|------|-------------| +| `choice` | `str` | One of `"once"`, `"session"`, `"always"`, `"deny"`, or `"timeout"` | + +**Return value:** ignored. + +**Use cases:** Close the matching desktop notification, record the final decision in an audit log, update metrics, roll forward a rate limiter. + +```python +def log_decision(command, choice, session_key, **kwargs): + logger.info("approval %s: %s for session %s", choice, command[:60], session_key) + +def register(ctx): + ctx.register_hook("post_approval_response", log_decision) +``` + +--- + ## Shell Hooks Declare shell-script hooks in your `cli-config.yaml` and Hermes will run them as subprocesses whenever the corresponding plugin-hook event fires — in both CLI and gateway sessions. No Python plugin authoring required.