2026-04-20 20:53:20 -07:00
|
|
|
|
"""
|
|
|
|
|
|
Shell-script hooks bridge.
|
|
|
|
|
|
|
|
|
|
|
|
Reads the ``hooks:`` block from ``cli-config.yaml``, prompts the user for
|
|
|
|
|
|
consent on first use of each ``(event, command)`` pair, and registers
|
|
|
|
|
|
callbacks on the existing plugin hook manager so every existing
|
|
|
|
|
|
``invoke_hook()`` site dispatches to the configured shell scripts — with
|
|
|
|
|
|
zero changes to call sites.
|
|
|
|
|
|
|
|
|
|
|
|
Design notes
|
|
|
|
|
|
------------
|
|
|
|
|
|
* Python plugins and shell hooks compose naturally: both flow through
|
|
|
|
|
|
:func:`hermes_cli.plugins.invoke_hook` and its aggregators. Python
|
|
|
|
|
|
plugins are registered first (via ``discover_and_load()``) so their
|
|
|
|
|
|
block decisions win ties over shell-hook blocks.
|
|
|
|
|
|
* Subprocess execution uses ``shlex.split(os.path.expanduser(command))``
|
|
|
|
|
|
with ``shell=False`` — no shell injection footguns. Users that need
|
|
|
|
|
|
pipes/redirection wrap their logic in a script.
|
|
|
|
|
|
* First-use consent is gated by the allowlist under
|
|
|
|
|
|
``~/.hermes/shell-hooks-allowlist.json``. Non-TTY callers must pass
|
|
|
|
|
|
``accept_hooks=True`` (resolved from ``--accept-hooks``,
|
|
|
|
|
|
``HERMES_ACCEPT_HOOKS``, or ``hooks_auto_accept: true`` in config)
|
|
|
|
|
|
for registration to succeed without a prompt.
|
|
|
|
|
|
* Registration is idempotent — safe to invoke from both the CLI entry
|
|
|
|
|
|
point (``hermes_cli/main.py``) and the gateway entry point
|
|
|
|
|
|
(``gateway/run.py``).
|
|
|
|
|
|
|
|
|
|
|
|
Wire protocol
|
|
|
|
|
|
-------------
|
|
|
|
|
|
**stdin** (JSON, piped to the script)::
|
|
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
|
"hook_event_name": "pre_tool_call",
|
|
|
|
|
|
"tool_name": "terminal",
|
|
|
|
|
|
"tool_input": {"command": "rm -rf /"},
|
|
|
|
|
|
"session_id": "sess_abc123",
|
|
|
|
|
|
"cwd": "/home/user/project",
|
|
|
|
|
|
"extra": {...} # event-specific kwargs
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
**stdout** (JSON, optional — anything else is ignored)::
|
|
|
|
|
|
|
|
|
|
|
|
# Block a pre_tool_call (either shape accepted; normalised internally):
|
|
|
|
|
|
{"decision": "block", "reason": "Forbidden command"} # Claude-Code-style
|
|
|
|
|
|
{"action": "block", "message": "Forbidden command"} # Hermes-canonical
|
|
|
|
|
|
|
|
|
|
|
|
# Inject context for pre_llm_call:
|
|
|
|
|
|
{"context": "Today is Friday"}
|
|
|
|
|
|
|
|
|
|
|
|
# Silent no-op:
|
|
|
|
|
|
<empty or any non-matching JSON object>
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import difflib
|
|
|
|
|
|
import json
|
|
|
|
|
|
import logging
|
|
|
|
|
|
import os
|
|
|
|
|
|
import re
|
|
|
|
|
|
import shlex
|
|
|
|
|
|
import subprocess
|
|
|
|
|
|
import sys
|
|
|
|
|
|
import tempfile
|
|
|
|
|
|
import threading
|
|
|
|
|
|
import time
|
|
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from typing import Any, Callable, Dict, Iterator, List, Optional, Set, Tuple
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
import fcntl # POSIX only; Windows falls back to best-effort without flock.
|
|
|
|
|
|
except ImportError: # pragma: no cover
|
|
|
|
|
|
fcntl = None # type: ignore[assignment]
|
|
|
|
|
|
|
|
|
|
|
|
from hermes_constants import get_hermes_home
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
DEFAULT_TIMEOUT_SECONDS = 60
|
|
|
|
|
|
MAX_TIMEOUT_SECONDS = 300
|
|
|
|
|
|
ALLOWLIST_FILENAME = "shell-hooks-allowlist.json"
|
|
|
|
|
|
|
|
|
|
|
|
# (event, matcher, command) triples that have been wired to the plugin
|
|
|
|
|
|
# manager in the current process. Matcher is part of the key because
|
|
|
|
|
|
# the same script can legitimately register for different matchers under
|
|
|
|
|
|
# the same event (e.g. one entry per tool the user wants to gate).
|
|
|
|
|
|
# Second registration attempts for the exact same triple become no-ops
|
|
|
|
|
|
# so the CLI and gateway can both call register_from_config() safely.
|
|
|
|
|
|
_registered: Set[Tuple[str, Optional[str], str]] = set()
|
|
|
|
|
|
_registered_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
# Intra-process lock for allowlist read-modify-write on platforms that
|
|
|
|
|
|
# lack ``fcntl`` (non-POSIX). Kept separate from ``_registered_lock``
|
|
|
|
|
|
# because ``register_from_config`` already holds ``_registered_lock`` when
|
|
|
|
|
|
# it triggers ``_record_approval`` — reusing it here would self-deadlock
|
|
|
|
|
|
# (``threading.Lock`` is non-reentrant). POSIX callers use the sibling
|
|
|
|
|
|
# ``.lock`` file via ``fcntl.flock`` and bypass this.
|
|
|
|
|
|
_allowlist_write_lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
|
|
|
class ShellHookSpec:
|
|
|
|
|
|
"""Parsed and validated representation of a single ``hooks:`` entry."""
|
|
|
|
|
|
|
|
|
|
|
|
event: str
|
|
|
|
|
|
command: str
|
|
|
|
|
|
matcher: Optional[str] = None
|
|
|
|
|
|
timeout: int = DEFAULT_TIMEOUT_SECONDS
|
|
|
|
|
|
compiled_matcher: Optional[re.Pattern] = field(default=None, repr=False)
|
|
|
|
|
|
|
|
|
|
|
|
def __post_init__(self) -> None:
|
|
|
|
|
|
# Strip whitespace introduced by YAML quirks (e.g. multi-line string
|
|
|
|
|
|
# folding) — a matcher of " terminal" would otherwise silently fail
|
|
|
|
|
|
# to match "terminal" without any diagnostic.
|
|
|
|
|
|
if isinstance(self.matcher, str):
|
|
|
|
|
|
stripped = self.matcher.strip()
|
|
|
|
|
|
self.matcher = stripped if stripped else None
|
|
|
|
|
|
if self.matcher:
|
|
|
|
|
|
try:
|
|
|
|
|
|
self.compiled_matcher = re.compile(self.matcher)
|
|
|
|
|
|
except re.error as exc:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook matcher %r is invalid (%s) — treating as "
|
|
|
|
|
|
"literal equality", self.matcher, exc,
|
|
|
|
|
|
)
|
|
|
|
|
|
self.compiled_matcher = None
|
|
|
|
|
|
|
|
|
|
|
|
def matches_tool(self, tool_name: Optional[str]) -> bool:
|
|
|
|
|
|
if not self.matcher:
|
|
|
|
|
|
return True
|
|
|
|
|
|
if tool_name is None:
|
|
|
|
|
|
return False
|
|
|
|
|
|
if self.compiled_matcher is not None:
|
|
|
|
|
|
return self.compiled_matcher.fullmatch(tool_name) is not None
|
|
|
|
|
|
# compiled_matcher is None only when the regex failed to compile,
|
|
|
|
|
|
# in which case we already warned and fall back to literal equality.
|
|
|
|
|
|
return tool_name == self.matcher
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Public API
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def register_from_config(
|
|
|
|
|
|
cfg: Optional[Dict[str, Any]],
|
|
|
|
|
|
*,
|
|
|
|
|
|
accept_hooks: bool = False,
|
|
|
|
|
|
) -> List[ShellHookSpec]:
|
|
|
|
|
|
"""Register every configured shell hook on the plugin manager.
|
|
|
|
|
|
|
|
|
|
|
|
``cfg`` is the full parsed config dict (``hermes_cli.config.load_config``
|
|
|
|
|
|
output). The ``hooks:`` key is read out of it. Missing, empty, or
|
|
|
|
|
|
non-dict ``hooks`` is treated as zero configured hooks.
|
|
|
|
|
|
|
|
|
|
|
|
``accept_hooks=True`` skips the TTY consent prompt — the caller is
|
|
|
|
|
|
promising that the user has opted in via a flag, env var, or config
|
|
|
|
|
|
setting. ``HERMES_ACCEPT_HOOKS=1`` and ``hooks_auto_accept: true`` are
|
|
|
|
|
|
also honored inside this function so either CLI or gateway call sites
|
|
|
|
|
|
pick them up.
|
|
|
|
|
|
|
|
|
|
|
|
Returns the list of :class:`ShellHookSpec` entries that ended up wired
|
|
|
|
|
|
up on the plugin manager. Skipped entries (unknown events, malformed,
|
|
|
|
|
|
not allowlisted, already registered) are logged but not returned.
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not isinstance(cfg, dict):
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
effective_accept = _resolve_effective_accept(cfg, accept_hooks)
|
|
|
|
|
|
|
|
|
|
|
|
specs = _parse_hooks_block(cfg.get("hooks"))
|
|
|
|
|
|
if not specs:
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
registered: List[ShellHookSpec] = []
|
|
|
|
|
|
|
|
|
|
|
|
# Import lazily — avoids circular imports at module-load time.
|
|
|
|
|
|
from hermes_cli.plugins import get_plugin_manager
|
|
|
|
|
|
|
|
|
|
|
|
manager = get_plugin_manager()
|
|
|
|
|
|
|
|
|
|
|
|
# Idempotence + allowlist read happen under the lock; the TTY
|
|
|
|
|
|
# prompt runs outside so other threads aren't parked on a blocking
|
|
|
|
|
|
# input(). Mutation re-takes the lock with a defensive idempotence
|
|
|
|
|
|
# re-check in case two callers ever race through the prompt.
|
|
|
|
|
|
for spec in specs:
|
|
|
|
|
|
key = (spec.event, spec.matcher, spec.command)
|
|
|
|
|
|
with _registered_lock:
|
|
|
|
|
|
if key in _registered:
|
|
|
|
|
|
continue
|
|
|
|
|
|
already_allowlisted = _is_allowlisted(spec.event, spec.command)
|
|
|
|
|
|
|
|
|
|
|
|
if not already_allowlisted:
|
|
|
|
|
|
if not _prompt_and_record(
|
|
|
|
|
|
spec.event, spec.command, accept_hooks=effective_accept,
|
|
|
|
|
|
):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook for %s (%s) not allowlisted — skipped. "
|
|
|
|
|
|
"Use --accept-hooks / HERMES_ACCEPT_HOOKS=1 / "
|
|
|
|
|
|
"hooks_auto_accept: true, or approve at the TTY "
|
|
|
|
|
|
"prompt next run.",
|
|
|
|
|
|
spec.event, spec.command,
|
|
|
|
|
|
)
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
with _registered_lock:
|
|
|
|
|
|
if key in _registered:
|
|
|
|
|
|
continue
|
|
|
|
|
|
manager._hooks.setdefault(spec.event, []).append(_make_callback(spec))
|
|
|
|
|
|
_registered.add(key)
|
|
|
|
|
|
registered.append(spec)
|
|
|
|
|
|
logger.info(
|
|
|
|
|
|
"shell hook registered: %s -> %s (matcher=%s, timeout=%ds)",
|
|
|
|
|
|
spec.event, spec.command, spec.matcher, spec.timeout,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return registered
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def iter_configured_hooks(cfg: Optional[Dict[str, Any]]) -> List[ShellHookSpec]:
|
|
|
|
|
|
"""Return the parsed ``ShellHookSpec`` entries from config without
|
|
|
|
|
|
registering anything. Used by ``hermes hooks list`` and ``doctor``."""
|
|
|
|
|
|
if not isinstance(cfg, dict):
|
|
|
|
|
|
return []
|
|
|
|
|
|
return _parse_hooks_block(cfg.get("hooks"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def reset_for_tests() -> None:
|
|
|
|
|
|
"""Clear the idempotence set. Test-only helper."""
|
|
|
|
|
|
with _registered_lock:
|
|
|
|
|
|
_registered.clear()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Config parsing
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_hooks_block(hooks_cfg: Any) -> List[ShellHookSpec]:
|
|
|
|
|
|
"""Normalise the ``hooks:`` dict into a flat list of ``ShellHookSpec``.
|
|
|
|
|
|
|
|
|
|
|
|
Malformed entries warn-and-skip — we never raise from config parsing
|
|
|
|
|
|
because a broken hook must not crash the agent.
|
|
|
|
|
|
"""
|
|
|
|
|
|
from hermes_cli.plugins import VALID_HOOKS
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(hooks_cfg, dict):
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
specs: List[ShellHookSpec] = []
|
|
|
|
|
|
|
|
|
|
|
|
for event_name, entries in hooks_cfg.items():
|
|
|
|
|
|
if event_name not in VALID_HOOKS:
|
|
|
|
|
|
suggestion = difflib.get_close_matches(
|
|
|
|
|
|
str(event_name), VALID_HOOKS, n=1, cutoff=0.6,
|
|
|
|
|
|
)
|
|
|
|
|
|
if suggestion:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"unknown hook event %r in hooks: config — did you mean %r?",
|
|
|
|
|
|
event_name, suggestion[0],
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"unknown hook event %r in hooks: config (valid: %s)",
|
|
|
|
|
|
event_name, ", ".join(sorted(VALID_HOOKS)),
|
|
|
|
|
|
)
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if entries is None:
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(entries, list):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s must be a list of hook definitions; got %s",
|
|
|
|
|
|
event_name, type(entries).__name__,
|
|
|
|
|
|
)
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
for i, raw in enumerate(entries):
|
|
|
|
|
|
spec = _parse_single_entry(event_name, i, raw)
|
|
|
|
|
|
if spec is not None:
|
|
|
|
|
|
specs.append(spec)
|
|
|
|
|
|
|
|
|
|
|
|
return specs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_single_entry(
|
|
|
|
|
|
event: str, index: int, raw: Any,
|
|
|
|
|
|
) -> Optional[ShellHookSpec]:
|
|
|
|
|
|
if not isinstance(raw, dict):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d] must be a mapping with a 'command' key; got %s",
|
|
|
|
|
|
event, index, type(raw).__name__,
|
|
|
|
|
|
)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
command = raw.get("command")
|
|
|
|
|
|
if not isinstance(command, str) or not command.strip():
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d] is missing a non-empty 'command' field",
|
|
|
|
|
|
event, index,
|
|
|
|
|
|
)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
matcher = raw.get("matcher")
|
|
|
|
|
|
if matcher is not None and not isinstance(matcher, str):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d].matcher must be a string regex; ignoring",
|
|
|
|
|
|
event, index,
|
|
|
|
|
|
)
|
|
|
|
|
|
matcher = None
|
|
|
|
|
|
|
|
|
|
|
|
if matcher is not None and event not in ("pre_tool_call", "post_tool_call"):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d].matcher=%r will be ignored at runtime — the "
|
|
|
|
|
|
"matcher field is only honored for pre_tool_call / "
|
|
|
|
|
|
"post_tool_call. The hook will fire on every %s event.",
|
|
|
|
|
|
event, index, matcher, event,
|
|
|
|
|
|
)
|
|
|
|
|
|
matcher = None
|
|
|
|
|
|
|
|
|
|
|
|
timeout_raw = raw.get("timeout", DEFAULT_TIMEOUT_SECONDS)
|
|
|
|
|
|
try:
|
|
|
|
|
|
timeout = int(timeout_raw)
|
|
|
|
|
|
except (TypeError, ValueError):
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d].timeout must be an int (got %r); using default %ds",
|
|
|
|
|
|
event, index, timeout_raw, DEFAULT_TIMEOUT_SECONDS,
|
|
|
|
|
|
)
|
|
|
|
|
|
timeout = DEFAULT_TIMEOUT_SECONDS
|
|
|
|
|
|
|
|
|
|
|
|
if timeout < 1:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d].timeout must be >=1; using default %ds",
|
|
|
|
|
|
event, index, DEFAULT_TIMEOUT_SECONDS,
|
|
|
|
|
|
)
|
|
|
|
|
|
timeout = DEFAULT_TIMEOUT_SECONDS
|
|
|
|
|
|
|
|
|
|
|
|
if timeout > MAX_TIMEOUT_SECONDS:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"hooks.%s[%d].timeout=%ds exceeds max %ds; clamping",
|
|
|
|
|
|
event, index, timeout, MAX_TIMEOUT_SECONDS,
|
|
|
|
|
|
)
|
|
|
|
|
|
timeout = MAX_TIMEOUT_SECONDS
|
|
|
|
|
|
|
|
|
|
|
|
return ShellHookSpec(
|
|
|
|
|
|
event=event,
|
|
|
|
|
|
command=command.strip(),
|
|
|
|
|
|
matcher=matcher,
|
|
|
|
|
|
timeout=timeout,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Subprocess callback
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
_TOP_LEVEL_PAYLOAD_KEYS = {"tool_name", "args", "session_id", "parent_session_id"}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _spawn(spec: ShellHookSpec, stdin_json: str) -> Dict[str, Any]:
|
|
|
|
|
|
"""Run ``spec.command`` as a subprocess with ``stdin_json`` on stdin.
|
|
|
|
|
|
|
|
|
|
|
|
Returns a diagnostic dict with the same keys for every outcome
|
|
|
|
|
|
(``returncode``, ``stdout``, ``stderr``, ``timed_out``,
|
|
|
|
|
|
``elapsed_seconds``, ``error``). This is the single place the
|
|
|
|
|
|
subprocess is actually invoked — both the live callback path
|
|
|
|
|
|
(:func:`_make_callback`) and the CLI test helper (:func:`run_once`)
|
|
|
|
|
|
go through it.
|
|
|
|
|
|
"""
|
|
|
|
|
|
result: Dict[str, Any] = {
|
|
|
|
|
|
"returncode": None,
|
|
|
|
|
|
"stdout": "",
|
|
|
|
|
|
"stderr": "",
|
|
|
|
|
|
"timed_out": False,
|
|
|
|
|
|
"elapsed_seconds": 0.0,
|
|
|
|
|
|
"error": None,
|
|
|
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
|
|
|
argv = shlex.split(os.path.expanduser(spec.command))
|
|
|
|
|
|
except ValueError as exc:
|
|
|
|
|
|
result["error"] = f"command {spec.command!r} cannot be parsed: {exc}"
|
|
|
|
|
|
return result
|
|
|
|
|
|
if not argv:
|
|
|
|
|
|
result["error"] = "empty command"
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
t0 = time.monotonic()
|
|
|
|
|
|
try:
|
|
|
|
|
|
proc = subprocess.run(
|
|
|
|
|
|
argv,
|
|
|
|
|
|
input=stdin_json,
|
|
|
|
|
|
capture_output=True,
|
|
|
|
|
|
timeout=spec.timeout,
|
|
|
|
|
|
text=True,
|
|
|
|
|
|
shell=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
except subprocess.TimeoutExpired:
|
|
|
|
|
|
result["timed_out"] = True
|
|
|
|
|
|
result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
|
|
|
|
|
|
return result
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
|
result["error"] = "command not found"
|
|
|
|
|
|
return result
|
|
|
|
|
|
except PermissionError:
|
|
|
|
|
|
result["error"] = "command not executable"
|
|
|
|
|
|
return result
|
|
|
|
|
|
except Exception as exc: # pragma: no cover — defensive
|
|
|
|
|
|
result["error"] = str(exc)
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
result["returncode"] = proc.returncode
|
|
|
|
|
|
result["stdout"] = proc.stdout or ""
|
|
|
|
|
|
result["stderr"] = proc.stderr or ""
|
|
|
|
|
|
result["elapsed_seconds"] = round(time.monotonic() - t0, 3)
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_callback(spec: ShellHookSpec) -> Callable[..., Optional[Dict[str, Any]]]:
|
|
|
|
|
|
"""Build the closure that ``invoke_hook()`` will call per firing."""
|
|
|
|
|
|
|
|
|
|
|
|
def _callback(**kwargs: Any) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
# Matcher gate — only meaningful for tool-scoped events.
|
|
|
|
|
|
if spec.event in ("pre_tool_call", "post_tool_call"):
|
|
|
|
|
|
if not spec.matches_tool(kwargs.get("tool_name")):
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
r = _spawn(spec, _serialize_payload(spec.event, kwargs))
|
|
|
|
|
|
|
|
|
|
|
|
if r["error"]:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook failed (event=%s command=%s): %s",
|
|
|
|
|
|
spec.event, spec.command, r["error"],
|
|
|
|
|
|
)
|
|
|
|
|
|
return None
|
|
|
|
|
|
if r["timed_out"]:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook timed out after %.2fs (event=%s command=%s)",
|
|
|
|
|
|
r["elapsed_seconds"], spec.event, spec.command,
|
|
|
|
|
|
)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
stderr = r["stderr"].strip()
|
|
|
|
|
|
if stderr:
|
|
|
|
|
|
logger.debug(
|
|
|
|
|
|
"shell hook stderr (event=%s command=%s): %s",
|
|
|
|
|
|
spec.event, spec.command, stderr[:400],
|
|
|
|
|
|
)
|
|
|
|
|
|
# Non-zero exits: log but still parse stdout so scripts that
|
|
|
|
|
|
# signal failure via exit code can also return a block directive.
|
|
|
|
|
|
if r["returncode"] != 0:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook exited %d (event=%s command=%s); stderr=%s",
|
|
|
|
|
|
r["returncode"], spec.event, spec.command, stderr[:400],
|
|
|
|
|
|
)
|
|
|
|
|
|
return _parse_response(spec.event, r["stdout"])
|
|
|
|
|
|
|
|
|
|
|
|
_callback.__name__ = f"shell_hook[{spec.event}:{spec.command}]"
|
|
|
|
|
|
_callback.__qualname__ = _callback.__name__
|
|
|
|
|
|
return _callback
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _serialize_payload(event: str, kwargs: Dict[str, Any]) -> str:
|
|
|
|
|
|
"""Render the stdin JSON payload. Unserialisable values are
|
|
|
|
|
|
stringified via ``default=str`` rather than dropped."""
|
|
|
|
|
|
extras = {k: v for k, v in kwargs.items() if k not in _TOP_LEVEL_PAYLOAD_KEYS}
|
|
|
|
|
|
try:
|
|
|
|
|
|
cwd = str(Path.cwd())
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
cwd = ""
|
|
|
|
|
|
payload = {
|
|
|
|
|
|
"hook_event_name": event,
|
|
|
|
|
|
"tool_name": kwargs.get("tool_name"),
|
|
|
|
|
|
"tool_input": kwargs.get("args") if isinstance(kwargs.get("args"), dict) else None,
|
|
|
|
|
|
"session_id": kwargs.get("session_id") or kwargs.get("parent_session_id") or "",
|
|
|
|
|
|
"cwd": cwd,
|
|
|
|
|
|
"extra": extras,
|
|
|
|
|
|
}
|
|
|
|
|
|
return json.dumps(payload, ensure_ascii=False, default=str)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_response(event: str, stdout: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
"""Translate stdout JSON into a Hermes wire-shape dict.
|
|
|
|
|
|
|
|
|
|
|
|
For ``pre_tool_call`` the Claude-Code-style ``{"decision": "block",
|
|
|
|
|
|
"reason": "..."}`` payload is translated into the canonical Hermes
|
|
|
|
|
|
``{"action": "block", "message": "..."}`` shape expected by
|
|
|
|
|
|
:func:`hermes_cli.plugins.get_pre_tool_call_block_message`. This is
|
|
|
|
|
|
the single most important correctness invariant in this module —
|
|
|
|
|
|
skipping the translation silently breaks every ``pre_tool_call``
|
|
|
|
|
|
block directive.
|
|
|
|
|
|
|
|
|
|
|
|
For ``pre_llm_call``, ``{"context": "..."}`` is passed through
|
|
|
|
|
|
unchanged to match the existing plugin-hook contract.
|
|
|
|
|
|
|
|
|
|
|
|
Anything else returns ``None``.
|
|
|
|
|
|
"""
|
|
|
|
|
|
stdout = (stdout or "").strip()
|
|
|
|
|
|
if not stdout:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
data = json.loads(stdout)
|
|
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"shell hook stdout was not valid JSON (event=%s): %s",
|
|
|
|
|
|
event, stdout[:200],
|
|
|
|
|
|
)
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
if not isinstance(data, dict):
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
if event == "pre_tool_call":
|
|
|
|
|
|
if data.get("action") == "block":
|
|
|
|
|
|
message = data.get("message") or data.get("reason") or ""
|
|
|
|
|
|
if isinstance(message, str) and message:
|
|
|
|
|
|
return {"action": "block", "message": message}
|
|
|
|
|
|
if data.get("decision") == "block":
|
|
|
|
|
|
message = data.get("reason") or data.get("message") or ""
|
|
|
|
|
|
if isinstance(message, str) and message:
|
|
|
|
|
|
return {"action": "block", "message": message}
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
context = data.get("context")
|
|
|
|
|
|
if isinstance(context, str) and context.strip():
|
|
|
|
|
|
return {"context": context}
|
|
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Allowlist / consent
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def allowlist_path() -> Path:
|
|
|
|
|
|
"""Path to the per-user shell-hook allowlist file."""
|
|
|
|
|
|
return get_hermes_home() / ALLOWLIST_FILENAME
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_allowlist() -> Dict[str, Any]:
|
|
|
|
|
|
"""Return the parsed allowlist, or an empty skeleton if absent."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
raw = json.loads(allowlist_path().read_text())
|
|
|
|
|
|
except (FileNotFoundError, json.JSONDecodeError, OSError):
|
|
|
|
|
|
return {"approvals": []}
|
|
|
|
|
|
if not isinstance(raw, dict):
|
|
|
|
|
|
return {"approvals": []}
|
|
|
|
|
|
approvals = raw.get("approvals")
|
|
|
|
|
|
if not isinstance(approvals, list):
|
|
|
|
|
|
raw["approvals"] = []
|
|
|
|
|
|
return raw
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_allowlist(data: Dict[str, Any]) -> None:
|
|
|
|
|
|
"""Atomically persist the allowlist via per-process ``mkstemp`` +
|
|
|
|
|
|
``os.replace``. Cross-process read-modify-write races are handled
|
|
|
|
|
|
by :func:`_locked_update_approvals` (``fcntl.flock``). On OSError
|
|
|
|
|
|
the failure is logged; the in-process hook still registers but
|
|
|
|
|
|
the approval won't survive across runs."""
|
|
|
|
|
|
p = allowlist_path()
|
|
|
|
|
|
try:
|
|
|
|
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
fd, tmp_path = tempfile.mkstemp(
|
|
|
|
|
|
prefix=f"{p.name}.", suffix=".tmp", dir=str(p.parent),
|
|
|
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
with os.fdopen(fd, "w") as fh:
|
|
|
|
|
|
fh.write(json.dumps(data, indent=2, sort_keys=True))
|
|
|
|
|
|
os.replace(tmp_path, p)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
try:
|
|
|
|
|
|
os.unlink(tmp_path)
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
pass
|
|
|
|
|
|
raise
|
|
|
|
|
|
except OSError as exc:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
"Failed to persist shell hook allowlist to %s: %s. "
|
|
|
|
|
|
"The approval is in-memory for this run, but the next "
|
|
|
|
|
|
"startup will re-prompt (or skip registration on non-TTY "
|
|
|
|
|
|
"runs without --accept-hooks / HERMES_ACCEPT_HOOKS).",
|
|
|
|
|
|
p, exc,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_allowlisted(event: str, command: str) -> bool:
|
|
|
|
|
|
data = load_allowlist()
|
|
|
|
|
|
return any(
|
|
|
|
|
|
isinstance(e, dict)
|
|
|
|
|
|
and e.get("event") == event
|
|
|
|
|
|
and e.get("command") == command
|
|
|
|
|
|
for e in data.get("approvals", [])
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@contextmanager
|
|
|
|
|
|
def _locked_update_approvals() -> Iterator[Dict[str, Any]]:
|
|
|
|
|
|
"""Serialise read-modify-write on the allowlist across processes.
|
|
|
|
|
|
|
|
|
|
|
|
Holds an exclusive ``flock`` on a sibling lock file for the duration
|
|
|
|
|
|
of the update so concurrent ``_record_approval``/``revoke`` callers
|
|
|
|
|
|
cannot clobber each other's changes (the race Codex reproduced with
|
|
|
|
|
|
20–50 simultaneous writers). Falls back to an in-process lock on
|
|
|
|
|
|
platforms without ``fcntl``.
|
|
|
|
|
|
"""
|
|
|
|
|
|
p = allowlist_path()
|
|
|
|
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
lock_path = p.with_suffix(p.suffix + ".lock")
|
|
|
|
|
|
|
|
|
|
|
|
if fcntl is None: # pragma: no cover — non-POSIX fallback
|
|
|
|
|
|
with _allowlist_write_lock:
|
|
|
|
|
|
data = load_allowlist()
|
|
|
|
|
|
yield data
|
|
|
|
|
|
save_allowlist(data)
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
with open(lock_path, "a+") as lock_fh:
|
|
|
|
|
|
fcntl.flock(lock_fh.fileno(), fcntl.LOCK_EX)
|
|
|
|
|
|
try:
|
|
|
|
|
|
data = load_allowlist()
|
|
|
|
|
|
yield data
|
|
|
|
|
|
save_allowlist(data)
|
|
|
|
|
|
finally:
|
|
|
|
|
|
fcntl.flock(lock_fh.fileno(), fcntl.LOCK_UN)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _prompt_and_record(
|
|
|
|
|
|
event: str, command: str, *, accept_hooks: bool,
|
|
|
|
|
|
) -> bool:
|
|
|
|
|
|
"""Decide whether to approve an unseen ``(event, command)`` pair.
|
|
|
|
|
|
Returns ``True`` iff the approval was granted and recorded.
|
|
|
|
|
|
"""
|
|
|
|
|
|
if accept_hooks:
|
|
|
|
|
|
_record_approval(event, command)
|
|
|
|
|
|
logger.info(
|
|
|
|
|
|
"shell hook auto-approved via --accept-hooks / env / config: "
|
|
|
|
|
|
"%s -> %s", event, command,
|
|
|
|
|
|
)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
if not sys.stdin.isatty():
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
print(
|
|
|
|
|
|
f"\n⚠ Hermes is about to register a shell hook that will run a\n"
|
|
|
|
|
|
f" command on your behalf.\n\n"
|
|
|
|
|
|
f" Event: {event}\n"
|
|
|
|
|
|
f" Command: {command}\n\n"
|
|
|
|
|
|
f" Commands run with your full user credentials. Only approve\n"
|
|
|
|
|
|
f" commands you trust."
|
|
|
|
|
|
)
|
|
|
|
|
|
try:
|
|
|
|
|
|
answer = input("Allow this hook to run? [y/N]: ").strip().lower()
|
|
|
|
|
|
except (EOFError, KeyboardInterrupt):
|
|
|
|
|
|
print() # keep the terminal tidy after ^C
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if answer in ("y", "yes"):
|
|
|
|
|
|
_record_approval(event, command)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _record_approval(event: str, command: str) -> None:
|
|
|
|
|
|
entry = {
|
|
|
|
|
|
"event": event,
|
|
|
|
|
|
"command": command,
|
|
|
|
|
|
"approved_at": _utc_now_iso(),
|
|
|
|
|
|
"script_mtime_at_approval": script_mtime_iso(command),
|
|
|
|
|
|
}
|
|
|
|
|
|
with _locked_update_approvals() as data:
|
|
|
|
|
|
data["approvals"] = [
|
|
|
|
|
|
e for e in data.get("approvals", [])
|
|
|
|
|
|
if not (
|
|
|
|
|
|
isinstance(e, dict)
|
|
|
|
|
|
and e.get("event") == event
|
|
|
|
|
|
and e.get("command") == command
|
|
|
|
|
|
)
|
|
|
|
|
|
] + [entry]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _utc_now_iso() -> str:
|
|
|
|
|
|
return datetime.now(tz=timezone.utc).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def revoke(command: str) -> int:
|
|
|
|
|
|
"""Remove every allowlist entry matching ``command``.
|
|
|
|
|
|
|
|
|
|
|
|
Returns the number of entries removed. Does not unregister any
|
|
|
|
|
|
callbacks that are already live on the plugin manager in the current
|
|
|
|
|
|
process — restart the CLI / gateway to drop them.
|
|
|
|
|
|
"""
|
|
|
|
|
|
with _locked_update_approvals() as data:
|
|
|
|
|
|
before = len(data.get("approvals", []))
|
|
|
|
|
|
data["approvals"] = [
|
|
|
|
|
|
e for e in data.get("approvals", [])
|
|
|
|
|
|
if not (isinstance(e, dict) and e.get("command") == command)
|
|
|
|
|
|
]
|
|
|
|
|
|
after = len(data["approvals"])
|
|
|
|
|
|
return before - after
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_SCRIPT_EXTENSIONS: Tuple[str, ...] = (
|
|
|
|
|
|
".sh", ".bash", ".zsh", ".fish",
|
|
|
|
|
|
".py", ".pyw",
|
|
|
|
|
|
".rb", ".pl", ".lua",
|
|
|
|
|
|
".js", ".mjs", ".cjs", ".ts",
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _command_script_path(command: str) -> str:
|
|
|
|
|
|
"""Return the script path from ``command`` for doctor / drift checks.
|
|
|
|
|
|
|
|
|
|
|
|
Prefers a token ending in a known script extension, then a token
|
|
|
|
|
|
containing ``/`` or leading ``~``, then the first token. Handles
|
|
|
|
|
|
``python3 /path/hook.py``, ``/usr/bin/env bash hook.sh``, and the
|
|
|
|
|
|
common bare-path form.
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
parts = shlex.split(command)
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
return command
|
|
|
|
|
|
if not parts:
|
|
|
|
|
|
return command
|
|
|
|
|
|
for part in parts:
|
|
|
|
|
|
if part.lower().endswith(_SCRIPT_EXTENSIONS):
|
|
|
|
|
|
return part
|
|
|
|
|
|
for part in parts:
|
|
|
|
|
|
if "/" in part or part.startswith("~"):
|
|
|
|
|
|
return part
|
|
|
|
|
|
return parts[0]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Helpers for accept-hooks resolution
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def _resolve_effective_accept(
|
|
|
|
|
|
cfg: Dict[str, Any], accept_hooks_arg: bool,
|
|
|
|
|
|
) -> bool:
|
|
|
|
|
|
"""Combine all three opt-in channels into a single boolean.
|
|
|
|
|
|
|
|
|
|
|
|
Precedence (any truthy source flips us on):
|
|
|
|
|
|
1. ``--accept-hooks`` flag (CLI) / explicit argument
|
|
|
|
|
|
2. ``HERMES_ACCEPT_HOOKS`` env var
|
|
|
|
|
|
3. ``hooks_auto_accept: true`` in ``cli-config.yaml``
|
|
|
|
|
|
"""
|
|
|
|
|
|
if accept_hooks_arg:
|
|
|
|
|
|
return True
|
|
|
|
|
|
env = os.environ.get("HERMES_ACCEPT_HOOKS", "").strip().lower()
|
|
|
|
|
|
if env in ("1", "true", "yes", "on"):
|
|
|
|
|
|
return True
|
|
|
|
|
|
cfg_val = cfg.get("hooks_auto_accept", False)
|
fix(shell_hooks): parse hooks_auto_accept as strict bool/string, not bool() (#16322)
`_resolve_effective_accept()` used `return bool(cfg_val)` for the
`hooks_auto_accept` config key. In Python, `bool("false")` is `True`,
so a user setting `hooks_auto_accept: "false"` (quoted YAML string)
in `config.yaml` would silently enable auto-approval of every shell
hook, bypassing the consent prompt entirely.
Replace the coercion with the same type-aware parsing already used for
the HERMES_ACCEPT_HOOKS env var three lines above: bool passthrough,
strings checked against {1,true,yes,on} case-insensitively, everything
else (including "false", None, 0, ints) rejected.
Add TestHooksAutoAcceptParsing guarding the regression across all four
value shapes (bool, string-truthy, string-falsy, missing/None).
Reported by @sprmn24 in #16244.
2026-04-26 20:48:35 -07:00
|
|
|
|
if isinstance(cfg_val, bool):
|
|
|
|
|
|
return cfg_val
|
|
|
|
|
|
if isinstance(cfg_val, str):
|
|
|
|
|
|
return cfg_val.strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
return False
|
2026-04-20 20:53:20 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
# Introspection (used by `hermes hooks` CLI)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
def allowlist_entry_for(event: str, command: str) -> Optional[Dict[str, Any]]:
|
|
|
|
|
|
"""Return the allowlist record for this pair, if any."""
|
|
|
|
|
|
for e in load_allowlist().get("approvals", []):
|
|
|
|
|
|
if (
|
|
|
|
|
|
isinstance(e, dict)
|
|
|
|
|
|
and e.get("event") == event
|
|
|
|
|
|
and e.get("command") == command
|
|
|
|
|
|
):
|
|
|
|
|
|
return e
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def script_mtime_iso(command: str) -> Optional[str]:
|
|
|
|
|
|
"""ISO-8601 mtime of the resolved script path, or ``None`` if the
|
|
|
|
|
|
script is missing."""
|
|
|
|
|
|
path = _command_script_path(command)
|
|
|
|
|
|
if not path:
|
|
|
|
|
|
return None
|
|
|
|
|
|
try:
|
|
|
|
|
|
expanded = os.path.expanduser(path)
|
|
|
|
|
|
return datetime.fromtimestamp(
|
|
|
|
|
|
os.path.getmtime(expanded), tz=timezone.utc,
|
|
|
|
|
|
).isoformat().replace("+00:00", "Z")
|
|
|
|
|
|
except OSError:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def script_is_executable(command: str) -> bool:
|
|
|
|
|
|
"""Return ``True`` iff ``command`` is runnable as configured.
|
|
|
|
|
|
|
|
|
|
|
|
For a bare invocation (``/path/hook.sh``) the script itself must be
|
|
|
|
|
|
executable. For interpreter-prefixed commands (``python3
|
|
|
|
|
|
/path/hook.py``, ``/usr/bin/env bash hook.sh``) the script just has
|
|
|
|
|
|
to be readable — the interpreter doesn't care about the ``X_OK``
|
|
|
|
|
|
bit. Mirrors what ``_spawn`` would actually do at runtime."""
|
|
|
|
|
|
path = _command_script_path(command)
|
|
|
|
|
|
if not path:
|
|
|
|
|
|
return False
|
|
|
|
|
|
expanded = os.path.expanduser(path)
|
|
|
|
|
|
if not os.path.isfile(expanded):
|
|
|
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
|
|
|
argv = shlex.split(command)
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
return False
|
|
|
|
|
|
is_bare_invocation = bool(argv) and argv[0] == path
|
|
|
|
|
|
required = os.X_OK if is_bare_invocation else os.R_OK
|
|
|
|
|
|
return os.access(expanded, required)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_once(
|
|
|
|
|
|
spec: ShellHookSpec, kwargs: Dict[str, Any],
|
|
|
|
|
|
) -> Dict[str, Any]:
|
|
|
|
|
|
"""Fire a single shell-hook invocation with a synthetic payload.
|
|
|
|
|
|
Used by ``hermes hooks test`` and ``hermes hooks doctor``.
|
|
|
|
|
|
|
|
|
|
|
|
``kwargs`` is the same dict that :func:`hermes_cli.plugins.invoke_hook`
|
|
|
|
|
|
would pass at runtime. It is routed through :func:`_serialize_payload`
|
|
|
|
|
|
so the synthetic stdin exactly matches what a real hook firing would
|
|
|
|
|
|
produce — otherwise scripts tested via ``hermes hooks test`` could
|
|
|
|
|
|
diverge silently from production behaviour.
|
|
|
|
|
|
|
|
|
|
|
|
Returns the :func:`_spawn` diagnostic dict plus a ``parsed`` field
|
|
|
|
|
|
holding the canonical Hermes-wire-shape response."""
|
|
|
|
|
|
stdin_json = _serialize_payload(spec.event, kwargs)
|
|
|
|
|
|
result = _spawn(spec, stdin_json)
|
|
|
|
|
|
result["parsed"] = _parse_response(spec.event, result["stdout"])
|
|
|
|
|
|
return result
|