Compare commits

..

11 Commits

Author SHA1 Message Date
emozilla
ebee077f9f docs(telemetry): align observability docs with the trimmed schema
Match the docs to the code after the dead-schema cut and span layer:
  - List the actual tel_* tables (runs, spans, model_calls, tool_calls,
    error_events) instead of a vague "indexed tel_* tables".
  - Add a "Traces and spans" section: a run = one session, each call is a child
    span under the run root in tel_spans keyed by span_id, reconstructable as a
    connected run -> calls tree. Note subagent cross-run lineage isn't recorded.
  - Fix stale "tool failure rates by category" -> "by tool" (categories were
    removed; insights groups by raw tool name).
  - OTLP: state plainly that events export as per-event spans and the tel_spans
    parent/timing linkage isn't reconstructed into connected SpanContexts yet,
    matching the exporter's own docstring.
  - README: "telemetry plane" -> "telemetry system" (stale rename miss); mention
    spans.

Config reference verified to match DEFAULT_CONFIG exactly (9 keys).
2026-06-27 01:27:21 -04:00
emozilla
0ebdd48f9d refactor(telemetry): cut dead schema; tests assert what's actually written
Self-review after the #51714 feedback found the reviewer's dead-table finding
was not isolated — the schema advertised far more than the code populates, and
our own tests hid it by hand-feeding fields production never sends. Make the
surface honest by subtraction.

Schema (10 tel_* tables -> 5):
  - Delete tel_gateway_events, tel_cron_events, tel_skill_events,
    tel_memory_events, tel_feedback_events — declared, never written, never read.
  - Drop columns nothing populates: tel_runs.{profile_id,estimated_cost_usd,
    cost_status}; tel_model_calls.{ttft_ms,estimated_cost_usd,cost_status,
    cost_source,end_reason,retry_count}; tel_tool_calls.{backend,retry_count,
    approval}; tel_spans.attrs_json. Cost duplicated the existing sessions
    billing columns and was always NULL here.
  - events.py / emitter _TABLE_COLUMNS / OTLP _span_attrs / rollup / preview
    display all trimmed to match.

Correctness:
  - end_reason no longer hardcodes "completed". Production finalize callers pass
    `reason` (shutdown/session_expired/session_reset); _coarse_end_reason now
    reads it and maps accordingly.
  - Fix a latent bug the trim exposed: the model_call hook passed end_reason= to
    ModelCallEvent, which the @_safe wrapper was silently swallowing — so
    tel_model_calls dropped every row in real runs. Now writes correctly.

Tests:
  - Stop hand-feeding estimated_cost_usd / turn_exit_reason that no production
    call site sends. Finalize is now driven with the real `reason` kwarg, and
    assertions cover only fields that are actually populated. This is what let
    the model_call drop hide — the suite graded on a fictional contract.

Net: a smaller system that does what it says. Verified end-to-end over the real
dispatch path (runs + connected span tree + model/tool rows populate; dead
tables gone). 160 telemetry/state/insights tests green.
2026-06-27 01:21:41 -04:00
emozilla
d474307cb8 feat(telemetry): write tel_spans — reconstructable run -> calls trace
Addresses the review on #51714: the trace/span layer was declared but unwired —
tel_spans was never written, call rows had no timestamp, and nothing set parent
lineage, so the store was metrics-only and couldn't reconstruct a trace.

Wire the span layer (keeping the praised star-schema shape):
  - New SpanEvent (span_id/trace_id/run_id/parent_span_id/name/kind/start_ns/end_ns)
    mapped into tel_spans via the emitter's _TABLE_COLUMNS.
  - The plugin mints a root span per run and, on each model/tool call, emits a
    SpanEvent (timing + parent = the run's root) keyed by the SAME span_id as the
    detail row, so tel_model_calls / tel_tool_calls JOIN to their span.
  - Call hooks fire on completion, so end_ns = now and start_ns is reconstructed
    from the measured latency/duration. The run's root span is emitted at finalize
    with the true run start/end.

Result: tel_spans is a connected, single-trace_id, run -> calls tree a desktop
waterfall (or any reader) can render directly, ordered by start_ns. Existing
metrics rows (tel_runs/model_calls/tool_calls) are unchanged.

OTLP: spans now flow to the exporter with their trace/parent/timing attributes.
The exporter still emits one OTel span per event rather than reconstructing OTel
SpanContexts into a connected trace tree; that projection is left for a follow-up
and the module docstring now says so plainly instead of over-claiming.

Adds test_spans_trace.py (connected-tree + detail-row JOIN) over the real dispatch
path. Accurate (pre-hook) start times, real OTLP SpanContexts, and subagent
cross-run lineage remain follow-ups.
2026-06-27 00:52:40 -04:00
emozilla
26ede9150c docs(telemetry): clarify reserved subagent-lineage hooks
The subagent_start/stop hooks are registered but no-op. The prior comment implied
subagents need no handling because they inherit via contextvars — misleading, since
a delegated child runs on a separate thread with its own session id and trace.

Clarify the real situation: a subagent's model/tool calls are already captured as
their own tel_runs row via the child's run_conversation, so nothing is lost. These
hooks are reserved for recording parent->child lineage (needs a tel_runs.parent_run_id
column), deferred until a consumer needs the delegation tree. Comment-only.
2026-06-26 23:28:50 -04:00
emozilla
0615a39632 fix(telemetry): aggregate requires local telemetry to be on
Aggregate metrics are derived from the local tel_* tables — they're a coarsened
view of local data, not an independent capture path. With telemetry.local=false
nothing is written, so an aggregate opt-in had nothing to aggregate, yet
may_upload_aggregate() returned True and `status` showed "Aggregate metrics: on".
The config could claim a state it couldn't fulfill.

Gate aggregate on local being enabled:
  - may_upload_aggregate() now requires local_enabled AND allow_aggregate AND
    consent_state == aggregate.
  - `telemetry status` computes aggregate_enabled the same way and, when consent is
    aggregate but local is off, prints "inert: local telemetry is off — nothing to
    aggregate" instead of the opt-in hint.

Happy path is unchanged (local on + consent aggregate -> on). Adds policy and CLI
tests for the inert combo.
2026-06-26 15:10:38 -04:00
emozilla
9737728872 test(telemetry): end-to-end plugin dispatch coverage
The existing hook tests call the plugin's _on_* callbacks directly, which passes
even if the bundled plugin stops auto-loading or a hook name drifts from what core
fires — real runs would go dark while the suite stays green.

Add test_plugin_e2e.py, which drives the real dispatch chain through public entry
points only (discover_plugins -> invoke_hook -> registered callback -> emitter ->
tel_* tables), exactly as core does:

  - one completed turn produces tel_runs / tel_model_calls / tel_tool_calls rows
    with real provider/model/tool values and correct counts;
  - telemetry.local=false means the plugin does not load and nothing is written.

Verified robust against test ordering (singleton resets for the plugin manager and
the emitter in the fixture).
2026-06-26 15:01:10 -04:00
emozilla
1e5e61b4be refactor(telemetry): drop "plane" terminology
Rename the telemetry tiers away from the borrowed control-plane/data-plane
jargon to plain language, across code, CLI output, config, and docs:

  - "local plane"        -> "local telemetry"
  - "aggregate plane"    -> "aggregate metrics"
  - "trajectories plane" -> "trajectories" / "telemetry.trajectories"
  - "three planes with a hard wall" -> "three settings, isolated from each other"

User-facing `hermes telemetry status` now reads "Local telemetry: on" /
"Aggregate metrics: off" / "Content export: off (trajectories disabled)".
The OTLP resource attribute key telemetry.plane is renamed to telemetry.scope
(wire-level identifier; nothing consumes it yet).

No behavior change — wording only. Status renders identically apart from the
labels; tests updated to match the new strings.
2026-06-25 23:39:17 -04:00
emozilla
ea53752eff refactor(telemetry): drop policy.resolve(); read config directly
policy.resolve() / TelemetryDecision was a read-only projection used only by
`hermes telemetry status` for display. The actual behavior gates already read
telemetry.* straight from config: the emitter (whether to write) and the plugin
loader (whether to auto-load) each call .get("local", True) on the loaded config,
never through policy.

Make config the single chokepoint the status command reads too: it now resolves
local/allow_aggregate/consent_state inline from the loaded config, the same way
the other gates do. policy.py keeps only what config can't express on its own —
the consent constants, ensure_install_id(), and may_upload_aggregate(config) as a
pure function (the gate a future uploader must consult). resolve() and the
TelemetryDecision dataclass are removed; policy.py drops 107 -> 70 lines.

No behavior change: status renders identically, and the default-on local plane is
still defaulted in DEFAULT_CONFIG plus a fail-safe .get(..., True) at each gate.
2026-06-25 21:32:30 -04:00
emozilla
7aa6726c06 Merge branch 'main' into feat/telemetry-observability 2026-06-25 21:15:46 -04:00
Que0x
b8fc8c908b fix(approval): fold Windows absolute home paths in dangerous-command detection
The detector folds absolute home / Hermes-home prefixes into their canonical
~/ and ~/.hermes/ forms so static patterns catch /home/alice/.bashrc the same
way they catch ~/.bashrc (abd69b81). On native Windows this fold never fired,
so terminal commands writing to shell startup files, ~/.ssh/authorized_keys,
or ~/.hermes/config.yaml / .env returned "safe" and skipped the approval
prompt — and config.yaml carries the approval policy itself.

Two compounding causes:

1. The fold ran after the backslash-escape strip (r\m -> rm), which dissolves
   the backslash separators in a Windows path (C:\Users\alice\.bashrc ->
   C:Usersalice...) before the fold could match. It now runs before the strip.
2. The fold only recognized POSIX absolute paths and only the home prefix,
   leaving multi-segment backslash suffixes (\.ssh\authorized_keys) to be
   mangled by the strip.

Consolidated into _home_prefix_fold_regex / _fold_home_prefixes: match a home
prefix with either separator, capture the rest of the path token, and
normalize its separators to / so multi-segment patterns match. The
degenerate-path guard generalizes count("/") >= 2 to "at least two components
below the root" (also rejecting a bare drive root C:\). HOME is consulted
directly because Windows' expanduser ignores it; the more specific Hermes home
is folded first, longest candidate first, so neither fold clobbers the other.

POSIX behavior unchanged; the r\m -> rm anti-obfuscation strip still runs.
Adds TestWindowsAbsolutePathFolding, which monkeypatches a Windows-style
HOME/HERMES_HOME so the behavior is also exercised on the CI runner.
2026-06-25 17:49:39 -07:00
emozilla
ccfa079252 feat(telemetry): local-first telemetry & observability
Add a built-in telemetry system that records what the agent does — workflows,
model calls, tool calls, errors — to the local machine, powers `/insights`, and
can export to an operator-chosen destination. Default-on locally; nothing leaves
the machine unless the user exports it or opts into the aggregate plane.

Three planes with a hard wall between them:
  - local: full-fidelity observability (real model/provider/tool names), on by
    default, never leaves the machine.
  - aggregate: opt-in metadata, default off. No uploader ships — consent is
    recorded via telemetry.consent_state, and `preview` shows what would be
    produced, computed locally.
  - trajectories: full message content, opt-in, exported only to the operator's
    own destination.

Mechanism:
  - Bundled `telemetry` plugin registers observational lifecycle hooks
    (on_session_start / post_api_request / post_tool_call / on_session_finalize).
    No core call sites are edited; hooks already carry the data.
  - Fire-and-forget emitter: emit() returns in microseconds, never blocks or
    raises into a model/tool call. A daemon thread writes events to an
    append-only JSONL log and the tel_* tables in state.db (its own sqlite
    connection, separate from SessionDB).
  - tel_runs / tel_model_calls / tel_tool_calls live in the declarative
    SCHEMA_SQL and are reconciled automatically; SCHEMA_VERSION 16 -> 17.
  - metrics derives rollups for /usage and /insights; rollup builds per-run
    summaries for `hermes telemetry preview`.

Consent is config, not a parallel command surface. The config file is the root
of trust: set telemetry.consent_state with `hermes config set`, or pin any
telemetry.* key (including allow_aggregate) via managed scope, which overrides
the user's value per key. `hermes telemetry` exposes only what config cannot:
status (report), preview (query), and export.

Export:
  - exporter_bulk writes telemetry (and, when the trajectories plane is enabled,
    session content) to ndjson/json.
  - otlp_exporter streams spans to a configured OpenTelemetry Collector over
    OTLP/HTTP. The SDK is an optional extra (hermes-agent[otlp]), lazily
    installed via tools.lazy_deps on first use.
  - Secrets are always redacted on every export path
    (redact_sensitive_text(force=True)); content export is gated by the
    trajectories plane, and PII scrubbing follows telemetry.content_redaction.
    OTLP auth headers reference environment variable names, never inline values.

No outbound emission to Nous. The aggregate uploader is intentionally not built.
2026-06-24 02:14:02 -04:00
46 changed files with 4503 additions and 224 deletions

View File

@@ -81,6 +81,19 @@ def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
return ["" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
def _fmt_ms(ms: float) -> str:
"""Compact human duration from milliseconds (e.g. 850ms, 2.4s, 1.5m)."""
try:
ms = float(ms or 0)
except (TypeError, ValueError):
return "0ms"
if ms < 1000:
return f"{int(ms)}ms"
if ms < 60_000:
return f"{ms / 1000:.1f}s"
return f"{ms / 60_000:.1f}m"
class InsightsEngine:
"""
Analyzes session history and produces usage insights.
@@ -138,6 +151,7 @@ class InsightsEngine:
},
"activity": {},
"top_sessions": [],
"telemetry": {},
}
# Compute insights
@@ -148,6 +162,7 @@ class InsightsEngine:
skills = self._compute_skill_breakdown(skill_usage)
activity = self._compute_activity_patterns(sessions)
top_sessions = self._compute_top_sessions(sessions)
telemetry = self._compute_telemetry(cutoff)
return {
"days": days,
@@ -161,8 +176,37 @@ class InsightsEngine:
"skills": skills,
"activity": activity,
"top_sessions": top_sessions,
"telemetry": telemetry,
}
# =========================================================================
# Telemetry (observability) — from the tel_* tables (local telemetry)
# =========================================================================
def _compute_telemetry(self, cutoff: float) -> Dict[str, Any]:
"""Roll up the local telemetry tables for the same window.
Reuses the engine's existing connection. Fully fail-soft: if the tel_*
tables are empty or absent (telemetry.local disabled, fresh install), this
returns an empty dict and the renderer skips the section.
"""
try:
from agent.telemetry import metrics
except Exception:
return {}
try:
since_ns = int(cutoff * 1e9)
if not metrics.has_data(conn=self._conn):
return {}
return {
"workflows": metrics.workflow_summary(since_ns=since_ns, conn=self._conn),
"model_calls": metrics.model_call_summary(since_ns=since_ns, conn=self._conn),
"tool_calls": metrics.tool_call_summary(conn=self._conn),
"errors": metrics.error_summary(conn=self._conn),
}
except Exception:
return {}
# =========================================================================
# Data gathering (SQL queries)
# =========================================================================
@@ -852,8 +896,80 @@ class InsightsEngine:
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
lines.append("")
# Telemetry / observability (local telemetry) — only when data exists
tel = report.get("telemetry") or {}
if tel:
self._append_telemetry_section(lines, tel)
return "\n".join(lines)
def _append_telemetry_section(self, lines: List[str], tel: Dict[str, Any]) -> None:
"""Render the observability rollups (workflows, tools, providers, errors)."""
wf = tel.get("workflows", {})
mc = tel.get("model_calls", {})
tc = tel.get("tool_calls", {})
errs = tel.get("errors", {}).get("by_class", {})
lines.append(" 📡 Observability (local telemetry)")
lines.append(" " + "" * 56)
total_runs = wf.get("total_runs", 0)
if total_runs:
sr = wf.get("success_rate", 0.0) * 100
p50 = wf.get("duration_ms_p50", 0)
p95 = wf.get("duration_ms_p95", 0)
lines.append(
f" Workflows: {total_runs:,} Success: {sr:.1f}% "
f"Duration p50/p95: {_fmt_ms(p50)} / {_fmt_ms(p95)}"
)
by_entry = wf.get("by_entrypoint", {})
if by_entry:
entry_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_entry.items(), key=lambda x: -x[1])
)
lines.append(f" Entrypoints: {entry_str}")
# Tool reliability
if tc.get("total"):
fail_pct = tc.get("failure_rate", 0.0) * 100
lines.append(
f" Tool calls: {tc['total']:,} Failure rate: {fail_pct:.1f}%"
)
tools = tc.get("by_tool", {})
fails = tc.get("failures_by_tool", {})
top = sorted(tools.items(), key=lambda x: -x[1])[:6]
if top:
parts = []
for name, n in top:
f = fails.get(name, 0)
parts.append(f"{name}: {n}" + (f" ({f} failed)" if f else ""))
lines.append(" " + " ".join(parts))
# Provider / model mix + cache (real names)
by_provider = mc.get("by_provider", {})
if by_provider:
prov_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_provider.items(), key=lambda x: -x[1])
)
lines.append(f" Providers: {prov_str}")
by_model = mc.get("by_model", {})
if by_model:
model_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_model.items(), key=lambda x: -x[1])[:8]
)
cache = mc.get("cache_hit_rate", 0.0) * 100
suffix = f" Cache hit: {cache:.1f}%" if cache else ""
lines.append(f" Models: {model_str}{suffix}")
# Error classes
if errs:
err_str = ", ".join(
f"{k}: {v}" for k, v in sorted(errs.items(), key=lambda x: -x[1])[:6]
)
lines.append(f" Errors: {err_str}")
lines.append("")
def format_gateway(self, report: Dict) -> str:
"""Format the insights report for gateway/messaging (shorter)."""
if report.get("empty"):

View File

@@ -0,0 +1,30 @@
"""Hermes telemetry & observability.
Local-first observability, on by default. The ``telemetry`` plugin registers Hermes
lifecycle hooks and hands typed events to the fire-and-forget ``emitter`` (queue ->
background writer -> JSONL + state.db ``tel_*`` index). The emitter never blocks or
raises into a model/tool call (the hot-path invariant).
Events record the observed model ids, provider names, and tool names. ``metrics``
derives rollups for /usage and /insights; ``rollup`` builds the per-run summaries shown
by ``hermes telemetry preview``. ``redaction`` + ``exporter_bulk`` + ``otlp_exporter``
handle export to an operator-chosen destination. ``policy`` holds the consent
constants and the aggregate upload gate (no uploader ships).
"""
from __future__ import annotations
from . import emitter, events, metrics, policy, spans
emit = emitter.emit
get_emitter = emitter.get_emitter
__all__ = [
"emitter",
"events",
"metrics",
"policy",
"spans",
"emit",
"get_emitter",
]

318
agent/telemetry/emitter.py Normal file
View File

@@ -0,0 +1,318 @@
"""Local telemetry emitter: fire-and-forget queue + background writer.
The emitter is the single seam between instrumentation (the telemetry plugin's hook
callbacks) and durable storage. Its contract is the hot-path invariant:
``emit()`` MUST return in O(microseconds), MUST NOT block on disk/network, and
MUST NEVER raise into the caller. A telemetry failure is logged locally and
dropped — it can never affect a model call, a tool call, or a session.
Mechanism:
* ``emit(event)`` does a non-blocking ``queue.put_nowait`` wrapped in a bare except.
On a full queue it drops the *oldest* event and counts the drop.
* A daemon thread drains the queue and writes each event to two places:
1. the append-only JSONL log (source of truth)
2. the ``tel_*`` SQLite tables in state.db (rebuildable index)
* The writer uses its own sqlite connection to state.db, separate from SessionDB,
so telemetry writes never contend with or corrupt session writes.
Local telemetry only. Nothing here uploads anywhere.
"""
from __future__ import annotations
import json
import logging
import queue
import sqlite3
import threading
import time
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_MAX_QUEUE = 10_000 # ring-buffer depth; oldest dropped when full
_DRAIN_BATCH = 256
def _default_dir() -> Path:
"""Resolve the telemetry dir under the active HERMES_HOME (profile-safe)."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "telemetry"
def _default_db_path() -> Path:
"""Resolve state.db under the active HERMES_HOME (profile-safe)."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "state.db"
# Map a telemetry event dict (its "event" tag) to (table, column-ordered insert).
# Only the columns the indexer knows about are written; unknown keys are ignored,
# so an event carrying extra fields never breaks the insert.
_TABLE_COLUMNS: Dict[str, tuple] = {
"run": (
"tel_runs",
("run_id", "trace_id", "session_id", "entrypoint",
"platform", "start_ns", "end_ns", "end_reason",
"model_call_count", "tool_call_count", "error_count"),
),
"span": (
"tel_spans",
("span_id", "trace_id", "run_id", "parent_span_id", "name", "kind",
"start_ns", "end_ns", "status"),
),
"model_call": (
"tel_model_calls",
("span_id", "run_id", "provider", "model", "base_url",
"input_tokens", "output_tokens", "cache_read_tokens",
"cache_write_tokens", "reasoning_tokens", "latency_ms"),
),
"tool_call": (
"tel_tool_calls",
("span_id", "run_id", "tool_name", "duration_ms", "result_class"),
),
"error": (
"tel_error_events",
("run_id", "error_class", "subsystem", "recovery", "ts_ns"),
),
}
class TelemetryEmitter:
"""Owns the queue, the writer thread, and the telemetry sqlite connection."""
def __init__(
self,
*,
events_path: Optional[Path] = None,
db_path: Optional[Path] = None,
enabled: bool = True,
) -> None:
self._dir = (events_path.parent if events_path else _default_dir())
self._events_path = events_path or (self._dir / "events.jsonl")
self._db_path = db_path or _default_db_path()
self._enabled = enabled
self._q: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=_MAX_QUEUE)
self._dropped = 0
self._written = 0
self._stop = threading.Event()
self._started = False
self._lock = threading.Lock()
self._conn: Optional[sqlite3.Connection] = None
self._thread: Optional[threading.Thread] = None
# Optional live subscribers (e.g. OTLP exporter). Called from the writer
# thread AFTER durable writes, fully fail-isolated — a subscriber that
# raises or blocks can never affect the JSONL/SQLite source of truth or
# the hot path. Each subscriber is callable(batch: list[dict]).
self._subscribers: list = []
# ── public API (hot path) ───────────────────────────────────────────────
def emit(self, event: Any) -> None:
"""Enqueue an event. Never blocks, never raises.
``event`` may be a dataclass with ``to_dict()`` or a plain dict.
"""
if not self._enabled:
return
try:
payload = event.to_dict() if hasattr(event, "to_dict") else dict(event)
payload.setdefault("ts_ns", time.time_ns())
self._ensure_started()
try:
self._q.put_nowait(payload)
except queue.Full:
# Drop oldest to make room — bounded memory, newest-wins.
try:
self._q.get_nowait()
self._dropped += 1
self._q.put_nowait(payload)
except Exception:
self._dropped += 1
except Exception: # the hot-path invariant: never propagate
logger.debug("telemetry emit failed", exc_info=True)
# ── lifecycle ───────────────────────────────────────────────────────────
def _ensure_started(self) -> None:
if self._started:
return
with self._lock:
if self._started:
return
try:
self._dir.mkdir(parents=True, exist_ok=True)
except Exception:
logger.debug("telemetry dir create failed", exc_info=True)
self._thread = threading.Thread(
target=self._run, name="hermes-telemetry-writer", daemon=True
)
self._thread.start()
self._started = True
def _open_conn(self) -> Optional[sqlite3.Connection]:
if self._conn is not None:
return self._conn
try:
conn = sqlite3.connect(str(self._db_path), isolation_level=None, timeout=5.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
self._conn = conn
except Exception:
logger.debug("telemetry db open failed", exc_info=True)
self._conn = None
return self._conn
def _run(self) -> None:
while not self._stop.is_set():
try:
first = self._q.get(timeout=0.5)
except queue.Empty:
continue
batch = [first]
while len(batch) < _DRAIN_BATCH:
try:
batch.append(self._q.get_nowait())
except queue.Empty:
break
self._write_batch(batch)
def _write_batch(self, batch) -> None:
# JSONL append (source of truth) — best effort.
try:
with open(self._events_path, "a", encoding="utf-8") as fh:
for ev in batch:
fh.write(json.dumps(ev, ensure_ascii=False) + "\n")
except Exception:
logger.debug("telemetry jsonl append failed", exc_info=True)
# SQLite index — best effort, per-event so one bad row can't lose the batch.
conn = self._open_conn()
if conn is None:
return
for ev in batch:
try:
self._index_one(conn, ev)
self._written += 1
except Exception:
logger.debug("telemetry index row failed", exc_info=True)
# Live fan-out (e.g. OTLP) — AFTER durable writes, fully fail-isolated.
# A slow/raising subscriber never affects JSONL/SQLite or the hot path.
for sub in self._subscribers:
try:
sub(batch)
except Exception:
logger.debug("telemetry subscriber failed", exc_info=True)
def subscribe(self, callback) -> None:
"""Register a live batch subscriber (callable(batch: list[dict])).
Called from the writer thread after durable writes. Used by the OTLP
exporter for continuous streaming. Fail-isolated; never on the hot path.
"""
if callback not in self._subscribers:
self._subscribers.append(callback)
def unsubscribe(self, callback) -> None:
try:
self._subscribers.remove(callback)
except ValueError:
pass
def _index_one(self, conn: sqlite3.Connection, ev: Dict[str, Any]) -> None:
kind = ev.get("event")
spec = _TABLE_COLUMNS.get(kind)
if spec is None:
return
table, cols = spec
values = [ev.get(c) for c in cols]
placeholders = ", ".join("?" for _ in cols)
collist = ", ".join(cols)
conn.execute(
f"INSERT OR REPLACE INTO {table} ({collist}) VALUES ({placeholders})",
values,
)
# ── introspection / shutdown (tests, CLI) ───────────────────────────────
def flush(self, timeout: float = 2.0) -> None:
"""Block until the queue drains (test/CLI helper, NOT the hot path)."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self._q.empty():
# give the writer a tick to finish the in-flight batch
time.sleep(0.05)
if self._q.empty():
return
time.sleep(0.02)
def stats(self) -> Dict[str, int]:
return {
"queued": self._q.qsize(),
"written": self._written,
"dropped": self._dropped,
}
def close(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=2.0)
if self._conn is not None:
try:
self._conn.close()
except Exception:
pass
self._conn = None
self._started = False
# ── process-wide singleton ──────────────────────────────────────────────────
_EMITTER: Optional[TelemetryEmitter] = None
_EMITTER_LOCK = threading.Lock()
def get_emitter() -> TelemetryEmitter:
"""Return the process-wide emitter, honoring telemetry.local config."""
global _EMITTER
if _EMITTER is not None:
return _EMITTER
with _EMITTER_LOCK:
if _EMITTER is None:
enabled = _local_enabled()
_EMITTER = TelemetryEmitter(enabled=enabled)
return _EMITTER
def _local_enabled() -> bool:
try:
from hermes_cli.config import load_config
cfg = load_config()
tel = cfg.get("telemetry") if isinstance(cfg, dict) else {}
return bool((tel or {}).get("local", True))
except Exception:
return True
def emit(event: Any) -> None:
"""Module-level convenience: emit via the singleton."""
get_emitter().emit(event)
def reset_emitter_for_tests(emitter: Optional[TelemetryEmitter] = None) -> None:
"""Swap the singleton (tests only)."""
global _EMITTER
with _EMITTER_LOCK:
if _EMITTER is not None and emitter is not _EMITTER:
try:
_EMITTER.close()
except Exception:
pass
_EMITTER = emitter
__all__ = [
"TelemetryEmitter",
"get_emitter",
"emit",
"reset_emitter_for_tests",
]

111
agent/telemetry/events.py Normal file
View File

@@ -0,0 +1,111 @@
"""Typed local telemetry events.
These dataclasses are the rows written to the local JSONL log and the ``tel_*``
SQLite tables. They record the values observed for each run — model id, provider, tool
name, token counts, durations — and stay on the machine unless explicitly exported.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Optional
# ── local telemetry events (real values) ────────────────────────────────────
def _now_ns() -> int:
return time.time_ns()
@dataclass(slots=True)
class RunEvent:
"""One top-level workflow execution (a trace root). A run spans one session."""
run_id: str
trace_id: str
entrypoint: str
session_id: Optional[str] = None
platform: Optional[str] = None
start_ns: int = field(default_factory=_now_ns)
end_ns: Optional[int] = None
end_reason: Optional[str] = None
model_call_count: int = 0
tool_call_count: int = 0
error_count: int = 0
def to_dict(self) -> Dict[str, Any]:
return {"event": "run", **asdict(self)}
@dataclass(slots=True)
class ModelCallEvent:
span_id: str
run_id: str
provider: Optional[str] = None # raw provider, e.g. "anthropic"
model: Optional[str] = None # raw model id, e.g. "claude-opus-4"
base_url: Optional[str] = None
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
latency_ms: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {"event": "model_call", **asdict(self)}
@dataclass(slots=True)
class ToolCallEvent:
span_id: str
run_id: str
tool_name: Optional[str] = None # raw tool name, e.g. "web_search"
duration_ms: Optional[int] = None
result_class: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"event": "tool_call", **asdict(self)}
@dataclass(slots=True)
class SpanEvent:
"""A timed span — the timing/lineage backbone of a trace.
One row per run (the root, ``parent_span_id=None``) and one per model/tool call
(``parent_span_id`` = the run's root span). Detail rows in ``tel_model_calls`` /
``tel_tool_calls`` share the ``span_id`` and are joined here for ordering and
placement on a timeline.
"""
span_id: str
trace_id: str
run_id: str
name: str
kind: str # "run" | "model" | "tool"
start_ns: int
end_ns: Optional[int] = None
parent_span_id: Optional[str] = None
status: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"event": "span", **asdict(self)}
@dataclass(slots=True)
class ErrorEvent:
run_id: Optional[str]
error_class: str
subsystem: str
recovery: Optional[str] = None
ts_ns: int = field(default_factory=_now_ns)
def to_dict(self) -> Dict[str, Any]:
return {"event": "error", **asdict(self)}
__all__ = [
"RunEvent",
"ModelCallEvent",
"ToolCallEvent",
"SpanEvent",
"ErrorEvent",
]

View File

@@ -0,0 +1,139 @@
"""Export telemetry (and optionally session content) to a file or stream.
Two data domains, both written to an operator-chosen destination:
* Telemetry: the tel_* rows + events.jsonl (structural observability).
* Content (opt-in via telemetry.trajectories): sessions + messages, with every
content field (message body, reasoning, raw tool-call args) passed through the
redaction pipeline (secrets always stripped; PII per content_redaction).
Formats: ndjson (default) and json. OTLP streaming export lives in otlp_exporter.py.
Content export is gated by ``redaction.content_export_enabled``.
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, TextIO
from . import redaction
_TEL_TABLES = (
"tel_runs", "tel_model_calls", "tel_tool_calls", "tel_error_events",
)
def _open(db_path: Optional[Path]) -> sqlite3.Connection:
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
return c
def _iter_telemetry(conn: sqlite3.Connection, since_ns: Optional[int]) -> Iterator[Dict[str, Any]]:
for table in _TEL_TABLES:
# only tel_runs has start_ns; window the rest by run join when needed.
if table == "tel_runs" and since_ns:
rows = conn.execute(
f"SELECT * FROM {table} WHERE start_ns >= ?", (int(since_ns),)
).fetchall()
else:
rows = conn.execute(f"SELECT * FROM {table}").fetchall()
for r in rows:
d = dict(r)
d["_kind"] = table
yield d
def _iter_content(
db_path: Optional[Path],
*,
config: Optional[Dict[str, Any]],
include_content: bool,
) -> Iterator[Dict[str, Any]]:
"""Yield session records. Message bodies included only when trajectories on."""
from hermes_state import SessionDB
content_mode = redaction.content_mode_for(config)
db = SessionDB(db_path=db_path) if db_path else SessionDB()
try:
for session in db.export_all():
msgs = session.get("messages", []) or []
red_msgs = [
redaction.redact_message(
m, content_mode=content_mode, include_content=include_content
)
for m in msgs
]
# Session-level metadata is structural; keep ids/model/counts, drop
# any free-text title only when content is excluded.
out = {
"_kind": "session",
"id": session.get("id"),
"source": session.get("source"),
"model": session.get("model"),
"started_at": session.get("started_at"),
"ended_at": session.get("ended_at"),
"message_count": session.get("message_count"),
"tool_call_count": session.get("tool_call_count"),
"messages": red_msgs,
}
if include_content and session.get("title"):
out["title"] = redaction.redact_for_export(
session["title"], content_mode=content_mode
)
yield out
finally:
db.close()
def export(
out: TextIO,
*,
fmt: str = "ndjson",
since_ns: Optional[int] = None,
include_content: bool = False,
config: Optional[Dict[str, Any]] = None,
db_path: Optional[Path] = None,
) -> Dict[str, int]:
"""Write telemetry (+ optional content) to ``out``. Returns counts.
``include_content`` is honored only when telemetry.trajectories is enabled in
``config``; otherwise content is forced off and only structural data is written.
"""
# Trajectories gate: a flag cannot override the config setting.
content_allowed = include_content and redaction.content_export_enabled(config)
counts = {"telemetry": 0, "sessions": 0, "content_included": int(content_allowed)}
conn = _open(db_path)
records: List[Dict[str, Any]] = []
try:
for rec in _iter_telemetry(conn, since_ns):
counts["telemetry"] += 1
if fmt == "ndjson":
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
else:
records.append(rec)
finally:
conn.close()
# Content/session domain (separate connection via SessionDB).
for rec in _iter_content(db_path, config=config, include_content=content_allowed):
counts["sessions"] += 1
if fmt == "ndjson":
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
else:
records.append(rec)
if fmt != "ndjson":
json.dump({"records": records}, out, ensure_ascii=False, indent=2)
return counts
__all__ = ["export"]

219
agent/telemetry/metrics.py Normal file
View File

@@ -0,0 +1,219 @@
"""Derive metric rollups from the local telemetry tables.
Reads the ``tel_*`` tables in state.db and returns aggregates for /usage, /insights,
and local dashboards. Metrics are computed by querying the event log rather than being
emitted on the hot path.
Each function accepts either an open caller-owned ``conn`` (reused, not closed) or a
``db_path`` (opened and closed internally). InsightsEngine passes its existing
connection; a standalone dashboard passes a path.
"""
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
@contextmanager
def _cursor(
conn: Optional[sqlite3.Connection], db_path: Optional[Path]
) -> Iterator[sqlite3.Connection]:
"""Yield a Row-factory connection. Closes it only if we opened it."""
if conn is not None:
prev_factory = conn.row_factory
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.row_factory = prev_factory
return
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
try:
yield c
finally:
c.close()
def _since_clause(since_ns: Optional[int], col: str = "start_ns") -> str:
return f" WHERE {col} >= {int(since_ns)}" if since_ns else ""
def workflow_summary(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
"""Run-level counters + duration percentiles (local telemetry, exact)."""
with _cursor(conn, db_path) as c:
where = _since_clause(since_ns)
total = c.execute(f"SELECT COUNT(*) n FROM tel_runs{where}").fetchone()["n"]
by_reason = {
r["end_reason"] or "unknown": r["n"]
for r in c.execute(
f"SELECT end_reason, COUNT(*) n FROM tel_runs{where} GROUP BY end_reason"
).fetchall()
}
by_entry = {
r["entrypoint"] or "unknown": r["n"]
for r in c.execute(
f"SELECT entrypoint, COUNT(*) n FROM tel_runs{where} GROUP BY entrypoint"
).fetchall()
}
dur_where = (where + " AND end_ns IS NOT NULL") if where else " WHERE end_ns IS NOT NULL"
durations = [
(r["end_ns"] - r["start_ns"]) / 1e6
for r in c.execute(
f"SELECT start_ns, end_ns FROM tel_runs{dur_where}"
).fetchall()
]
return {
"total_runs": total,
"by_end_reason": by_reason,
"by_entrypoint": by_entry,
"duration_ms_p50": _pct(durations, 50),
"duration_ms_p95": _pct(durations, 95),
"success_rate": round(by_reason.get("completed", 0) / total, 4) if total else 0.0,
}
def model_call_summary(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
rows = c.execute(
"SELECT provider, model, COUNT(*) n, "
"SUM(input_tokens) inp, SUM(output_tokens) outp, "
"SUM(cache_read_tokens) cache, AVG(latency_ms) avg_latency "
"FROM tel_model_calls GROUP BY provider, model"
).fetchall()
by_provider: Dict[str, int] = {}
by_model: Dict[str, int] = {}
tokens = {"input": 0, "output": 0, "cache_read": 0}
breakdown: List[Dict[str, Any]] = []
for r in rows:
prov = r["provider"] or "unknown"
mdl = r["model"] or "unknown"
by_provider[prov] = by_provider.get(prov, 0) + r["n"]
by_model[mdl] = by_model.get(mdl, 0) + r["n"]
tokens["input"] += r["inp"] or 0
tokens["output"] += r["outp"] or 0
tokens["cache_read"] += r["cache"] or 0
breakdown.append({
"provider": r["provider"],
"model": r["model"],
"calls": r["n"],
"avg_latency_ms": round(r["avg_latency"] or 0, 1),
})
cache_total = tokens["cache_read"] + tokens["input"]
return {
"by_provider": by_provider,
"by_model": by_model,
"tokens": tokens,
"cache_hit_rate": round(tokens["cache_read"] / cache_total, 4) if cache_total else 0.0,
"breakdown": breakdown,
}
def tool_call_summary(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
by_tool = {
r["tool_name"] or "unknown": r["n"]
for r in c.execute(
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls GROUP BY tool_name"
).fetchall()
}
fails = {
r["tool_name"] or "unknown": r["n"]
for r in c.execute(
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls "
"WHERE result_class IN ('error','timeout','blocked') GROUP BY tool_name"
).fetchall()
}
total = sum(by_tool.values())
total_fail = sum(fails.values())
return {
"by_tool": by_tool,
"failures_by_tool": fails,
"total": total,
"failure_rate": round(total_fail / total, 4) if total else 0.0,
}
def error_summary(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
return {
"by_class": {
r["error_class"] or "unknown": r["n"]
for r in c.execute(
"SELECT error_class, COUNT(*) n FROM tel_error_events GROUP BY error_class"
).fetchall()
},
}
def _pct(values: List[float], p: int) -> float:
if not values:
return 0.0
s = sorted(values)
k = (len(s) - 1) * (p / 100)
lo = int(k)
hi = min(lo + 1, len(s) - 1)
frac = k - lo
return round(s[lo] + (s[hi] - s[lo]) * frac, 2)
def overview(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
"""One call for a dashboard: all the rollups."""
return {
"workflows": workflow_summary(db_path, since_ns, conn=conn),
"model_calls": model_call_summary(db_path, since_ns, conn=conn),
"tool_calls": tool_call_summary(db_path, conn=conn),
"errors": error_summary(db_path, conn=conn),
}
def has_data(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> bool:
"""True when any telemetry runs exist (cheap guard for /insights rendering)."""
try:
with _cursor(conn, db_path) as c:
return c.execute("SELECT 1 FROM tel_runs LIMIT 1").fetchone() is not None
except Exception:
return False
__all__ = [
"workflow_summary",
"model_call_summary",
"tool_call_summary",
"error_summary",
"overview",
"has_data",
]

View File

@@ -0,0 +1,289 @@
"""Export telemetry to an OpenTelemetry Collector over OTLP/HTTP.
Maps the local tel_* events to OTel spans and sends them to the endpoint configured
under ``telemetry.export.otlp``. Lets an operator stream Hermes telemetry into their
own observability stack.
Notes:
* The destination is operator-configured; this module only sends to that endpoint.
It does not import or interact with any aggregate-metrics path.
* ``opentelemetry-sdk`` + ``opentelemetry-exporter-otlp-proto-http`` are an optional
extra (``pip install hermes-agent[otlp]``), imported lazily so the dependency is
only required when OTLP export is actually used.
* ``headers_env`` maps a header name to an environment variable name; values are read
from the environment at export time and never logged or stored.
* The continuous subscriber runs in the emitter's writer thread after durable writes
and is fail-isolated, so an export error cannot affect a run.
Each event is exported as a span carrying its recorded attributes (provider, model,
tokens, duration, etc.). The timing/parent linkage captured in tel_spans
(trace_id/span_id/parent_span_id/start_ns/end_ns) is not yet reconstructed into OTel
SpanContexts here, so spans currently arrive as independent records rather than a
connected trace tree; building the connected-trace projection is tracked separately.
Spans carry structural telemetry by default. Message content is included only when
trajectories is enabled, and always passes through the export redaction pipeline.
"""
from __future__ import annotations
import logging
import os
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class OTLPUnavailable(RuntimeError):
"""Raised when the optional OpenTelemetry SDK isn't installed."""
def _require_sdk(*, auto_install: bool = True, prompt: bool = True):
"""Import the OTel SDK, lazily installing it on first use if needed.
Routes through tools.lazy_deps (feature 'export.otlp') so a missing SDK
triggers the standard venv install flow — same as every other optional
backend — gated by security.allow_lazy_installs and TTY-prompted. Falls back
to OTLPUnavailable (with a manual install hint) when the SDK can't be made
importable (lazy installs disabled, install failed, or auto_install=False).
``auto_install``: attempt the lazy install when missing (default True).
``prompt``: ask before installing when interactive (default True); pass
False from non-interactive contexts like the continuous streamer.
"""
if auto_install:
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("export.otlp", prompt=prompt)
except ImportError:
pass # lazy_deps unavailable — fall through to the import attempt
except Exception:
# FeatureUnavailable (lazy installs disabled / declined / failed) —
# fall through; the import below raises OTLPUnavailable with the hint.
pass
try:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.trace import SpanKind
return {
"TracerProvider": TracerProvider,
"BatchSpanProcessor": BatchSpanProcessor,
"Resource": Resource,
"OTLPSpanExporter": OTLPSpanExporter,
"SpanKind": SpanKind,
}
except Exception as e: # ImportError or partial install
raise OTLPUnavailable(
"OTLP export requires the optional dependency. Install with:\n"
" pip install 'hermes-agent[otlp]'\n"
f"(import error: {e})"
)
def _resolve_headers(headers_env: Optional[Dict[str, str]]) -> Dict[str, str]:
"""Resolve {header_name: ENV_VAR_NAME} -> {header_name: value} from env.
The config stores environment variable names, not secret values; values are read
from the environment here. Missing variables are skipped (and noted at debug level
without the value).
"""
resolved: Dict[str, str] = {}
for header_name, env_name in (headers_env or {}).items():
val = os.environ.get(str(env_name))
if val:
resolved[str(header_name)] = val
else:
logger.debug("OTLP header %s: env var %s not set; skipping",
header_name, env_name)
return resolved
def _otlp_config(config: Dict[str, Any]) -> Dict[str, Any]:
tel = (config or {}).get("telemetry") or {}
export = tel.get("export") or {}
return export.get("otlp") or {}
def build_exporter(config: Dict[str, Any]):
"""Construct an OTLP span exporter from config. Raises OTLPUnavailable if no SDK."""
sdk = _require_sdk()
otlp = _otlp_config(config)
endpoint = otlp.get("endpoint")
if not endpoint:
raise ValueError("telemetry.export.otlp.endpoint is not set")
headers = _resolve_headers(otlp.get("headers_env"))
return sdk["OTLPSpanExporter"](endpoint=endpoint, headers=headers or None)
def _make_provider(config: Dict[str, Any]):
sdk = _require_sdk()
resource = sdk["Resource"].create({
"service.name": "hermes-agent",
"telemetry.scope": "local", # never aggregate metrics
})
provider = sdk["TracerProvider"](resource=resource)
processor = sdk["BatchSpanProcessor"](build_exporter(config))
provider.add_span_processor(processor)
return provider, processor
# ── event -> span attribute mapping (real values) ───────────────────────────
def _span_attrs(ev: Dict[str, Any]) -> Dict[str, Any]:
"""Span attributes for an event — the real recorded values (local telemetry)."""
kind = ev.get("event")
attrs: Dict[str, Any] = {"hermes.event": kind or "unknown"}
keep_by_kind = {
"run": ("entrypoint", "platform", "end_reason",
"model_call_count", "tool_call_count", "error_count"),
"span": ("trace_id", "run_id", "parent_span_id", "name", "kind",
"start_ns", "end_ns", "status"),
"model_call": ("provider", "model", "base_url",
"input_tokens", "output_tokens", "cache_read_tokens",
"cache_write_tokens", "reasoning_tokens", "latency_ms"),
"tool_call": ("tool_name", "duration_ms", "result_class"),
"error": ("error_class", "subsystem", "recovery"),
}
for col in keep_by_kind.get(kind, ()): # type: ignore[arg-type]
v = ev.get(col)
if v is not None:
attrs[f"hermes.{col}"] = v
return attrs
def export_batch(provider, batch: List[Dict[str, Any]]) -> int:
"""Map a batch of events to OTel spans. Returns spans created."""
tracer = provider.get_tracer("hermes.telemetry")
n = 0
for ev in batch:
try:
name = f"hermes.{ev.get('event', 'event')}"
span = tracer.start_span(name, attributes=_span_attrs(ev))
span.end()
n += 1
except Exception:
logger.debug("OTLP span map failed", exc_info=True)
return n
# ── one-shot drain (export current local rows) ──────────────────────────────
def export_once(
config: Dict[str, Any],
*,
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
) -> int:
"""Drain the local tel_* tables to the configured OTLP endpoint once."""
provider, processor = _make_provider(config)
try:
rows = _read_events(db_path, since_ns)
total = export_batch(provider, rows)
processor.force_flush()
return total
finally:
try:
provider.shutdown()
except Exception:
pass
def _read_events(db_path: Optional[Path], since_ns: Optional[int]) -> List[Dict[str, Any]]:
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
out: List[Dict[str, Any]] = []
try:
table_event = {
"tel_runs": "run", "tel_spans": "span",
"tel_model_calls": "model_call",
"tel_tool_calls": "tool_call", "tel_error_events": "error",
}
for table, evkind in table_event.items():
where = ""
if table == "tel_runs" and since_ns:
where = f" WHERE start_ns >= {int(since_ns)}"
for r in c.execute(f"SELECT * FROM {table}{where}").fetchall():
d = dict(r)
d["event"] = evkind
out.append(d)
finally:
c.close()
return out
# ── continuous streaming subscriber ─────────────────────────────────────────
class OTLPStreamer:
"""A live subscriber that pushes each emitter batch to OTLP as it lands.
Register with ``emitter.subscribe(streamer)``. Fail-isolated by the emitter.
"""
def __init__(self, config: Dict[str, Any]):
self._provider, self._processor = _make_provider(config)
self.exported = 0
def __call__(self, batch: List[Dict[str, Any]]) -> None:
self.exported += export_batch(self._provider, batch)
def shutdown(self) -> None:
try:
self._processor.force_flush()
self._provider.shutdown()
except Exception:
pass
def is_available() -> bool:
"""True when the OTel SDK is already importable. Does NOT auto-install —
this is a pure check (e.g. for status display)."""
try:
_require_sdk(auto_install=False)
return True
except OTLPUnavailable:
return False
def is_enabled(config: Dict[str, Any]) -> bool:
otlp = _otlp_config(config)
return bool(otlp.get("enabled") and otlp.get("endpoint"))
def start_streaming(config: Dict[str, Any]) -> Optional[OTLPStreamer]:
"""If OTLP is enabled, attach a streamer to the singleton emitter.
Non-interactive context (startup): attempts a lazy install with prompt=False
so a configured-but-missing SDK is installed once (gated by
security.allow_lazy_installs), then streams. If it still can't load, logs and
no-ops — never blocks or raises into startup.
"""
if not is_enabled(config):
return None
try:
_require_sdk(prompt=False)
except OTLPUnavailable:
logger.warning("telemetry.export.otlp.enabled but the OTel SDK could not "
"be installed/imported; install 'hermes-agent[otlp]'")
return None
from agent.telemetry.emitter import get_emitter
streamer = OTLPStreamer(config)
get_emitter().subscribe(streamer)
return streamer
__all__ = [
"OTLPUnavailable",
"OTLPStreamer",
"build_exporter",
"export_once",
"export_batch",
"is_available",
"is_enabled",
"start_streaming",
]

72
agent/telemetry/policy.py Normal file
View File

@@ -0,0 +1,72 @@
"""Telemetry consent posture and the aggregate-metrics gate.
Consent is a single config field, ``telemetry.consent_state``:
* "unknown" — no choice recorded; never uploads (the default).
* "local" — declined aggregate metrics; local telemetry only.
* "aggregate" — opted in to aggregate metrics.
The config file is the source of truth: set ``telemetry.consent_state`` with
``hermes config set`` (or a managed-scope pin). Callers that gate behavior read
``telemetry.*`` directly from config; this module only provides the consent
constants, the install-id helper, and the upload gate a future uploader must
consult.
``allow_aggregate`` is the hard gate. An administrator pins
``telemetry.allow_aggregate: false`` through the managed-scope layer
(``/etc/hermes/config.yaml``), which takes precedence over the user's config; when
it is false, aggregate metrics are off regardless of ``consent_state``.
"""
from __future__ import annotations
import uuid
from typing import Any, Dict
CONSENT_UNKNOWN = "unknown"
CONSENT_LOCAL = "local"
CONSENT_AGGREGATE = "aggregate"
VALID_CONSENT_STATES = {CONSENT_UNKNOWN, CONSENT_LOCAL, CONSENT_AGGREGATE}
def _telemetry_cfg(config: Dict[str, Any]) -> Dict[str, Any]:
cfg = config.get("telemetry") if isinstance(config, dict) else None
return cfg if isinstance(cfg, dict) else {}
def ensure_install_id(config: Dict[str, Any]) -> str:
"""Return a stable install id, minting one if the config slot is empty.
Does not persist — the caller writes the returned value back to config.yaml. A
fresh uuid4 is used; clearing ``telemetry.install_id`` (e.g. with
``hermes config set telemetry.install_id ""``) causes the next call to mint anew.
"""
tel = _telemetry_cfg(config)
existing = tel.get("install_id")
if isinstance(existing, str) and existing.strip():
return existing
return str(uuid.uuid4())
def may_upload_aggregate(config: Dict[str, Any]) -> bool:
"""Whether aggregate metrics may upload — the gate a future uploader consults.
Aggregate metrics are derived from the local telemetry tables, so they require
local telemetry to be on. True only when local telemetry is enabled, the admin
hard gate allows it, and the user has opted in via ``telemetry.consent_state``.
"""
tel = _telemetry_cfg(config)
local_enabled = bool(tel.get("local", True))
allow_aggregate = bool(tel.get("allow_aggregate", True))
state = tel.get("consent_state", CONSENT_UNKNOWN)
return local_enabled and allow_aggregate and state == CONSENT_AGGREGATE
__all__ = [
"CONSENT_UNKNOWN",
"CONSENT_LOCAL",
"CONSENT_AGGREGATE",
"VALID_CONSENT_STATES",
"may_upload_aggregate",
"ensure_install_id",
]

View File

@@ -0,0 +1,187 @@
"""Redaction applied to telemetry data on export.
Two independent controls:
* Secrets are always redacted, on every export and in every mode; no setting
disables this. Wraps ``agent/redact.py::redact_sensitive_text(force=True)``.
* Whether message bodies, reasoning, and raw tool arguments are exportable at all is
governed by the trajectories setting (``telemetry.trajectories.enabled``, default
off, admin-pinnable), not by a redaction mode. With trajectories off, content is
dropped. With it on, content is exportable and ``content_redaction`` (none|pii)
controls how much is scrubbed; secrets are still always stripped.
This applies to the local and trajectory export paths. It is unrelated to any
aggregate-metrics path.
"""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional
# Content-redaction strengths for any content that IS exported.
CONTENT_NONE = "none" # drop content entirely (structural telemetry only)
CONTENT_PII = "pii" # codec-aware PII redaction on exported content
CONTENT_MODES = {CONTENT_NONE, CONTENT_PII}
# ── PII patterns (applied only in CONTENT_PII mode, on content that is exported) ──
_EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
# E.164-ish and common separators; conservative to avoid nuking code/IDs.
_PHONE_RE = re.compile(
r"(?<!\w)(?:\+?\d{1,3}[\s.\-]?)?(?:\(\d{2,4}\)[\s.\-]?)?\d{3}[\s.\-]?\d{3,4}(?:[\s.\-]?\d{2,4})?(?!\w)"
)
# Long opaque hex/uuid-ish user identifiers.
_UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b")
def _secret_redact(text: Optional[str]) -> Optional[str]:
"""Always-on secret redaction. force=True so user config can't disable it."""
if text is None:
return None
try:
from agent.redact import redact_sensitive_text
return redact_sensitive_text(str(text), force=True)
except Exception:
# Fail CLOSED: if the redactor can't run, do not emit the raw string.
return "[redaction-unavailable]"
def _pii_redact(text: str) -> str:
text = _EMAIL_RE.sub("[email]", text)
text = _UUID_RE.sub("[id]", text)
text = _PHONE_RE.sub("[phone]", text)
return text
def redact_for_export(
text: Optional[str],
*,
content_mode: str = CONTENT_NONE,
) -> Optional[str]:
"""Redact a single content string for export.
Secrets are ALWAYS stripped. Then PII is stripped when content_mode is 'pii'.
Callers gate *whether content is exported at all* via telemetry.trajectories
(see ``content_export_enabled``); this function only scrubs content that the
caller has already decided to export.
"""
redacted = _secret_redact(text)
if redacted is None:
return None
if content_mode == CONTENT_PII:
redacted = _pii_redact(redacted)
return redacted
def content_export_enabled(config: Optional[Dict[str, Any]]) -> bool:
"""True only when telemetry.trajectories is explicitly enabled.
This is the consent gate for exporting message bodies / reasoning / raw tool
args. Default off. Admin-pinnable via managed scope (telemetry.trajectories.enabled).
"""
try:
tel = (config or {}).get("telemetry") or {}
traj = tel.get("trajectories") or {}
return bool(traj.get("enabled", False))
except Exception:
return False
def content_mode_for(config: Optional[Dict[str, Any]]) -> str:
try:
tel = (config or {}).get("telemetry") or {}
mode = tel.get("content_redaction", CONTENT_NONE)
return mode if mode in CONTENT_MODES else CONTENT_NONE
except Exception:
return CONTENT_NONE
# ── Codec-aware message redaction (NeMo pattern) ─────────────────────────────
# Redact the right fields of a provider message shape rather than regex-blasting
# the whole blob. Structure (roles, names, counts) is preserved; only the
# free-text content fields are scrubbed.
def redact_message(
msg: Dict[str, Any],
*,
content_mode: str = CONTENT_NONE,
include_content: bool = False,
) -> Dict[str, Any]:
"""Redact one chat message dict for export.
When include_content is False (trajectories off), content/reasoning/tool-arg
fields are dropped — only structural fields (role, tool name, counts) remain.
When True, those fields are kept but passed through redact_for_export.
"""
role = msg.get("role")
out: Dict[str, Any] = {"role": role}
# Always-structural fields.
if msg.get("tool_name") is not None:
out["tool_name"] = msg.get("tool_name")
if msg.get("name") is not None:
out["name"] = msg.get("name")
if not include_content:
# Structural only: record presence/size, not bytes.
c = msg.get("content")
if c is not None:
out["content_chars"] = len(str(c))
if msg.get("reasoning_content"):
out["reasoning_chars"] = len(str(msg["reasoning_content"]))
if msg.get("tool_calls"):
out["tool_call_count"] = _count_tool_calls(msg["tool_calls"])
return out
# Content included (trajectories enabled): scrub then keep.
if msg.get("content") is not None:
out["content"] = redact_for_export(msg["content"], content_mode=content_mode)
if msg.get("reasoning_content"):
out["reasoning_content"] = redact_for_export(
msg["reasoning_content"], content_mode=content_mode
)
if msg.get("tool_calls"):
out["tool_calls"] = _redact_tool_calls(msg["tool_calls"], content_mode=content_mode)
return out
def _count_tool_calls(tool_calls: Any) -> int:
try:
import json
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
return len(tc) if isinstance(tc, list) else (1 if tc else 0)
except Exception:
return 0
def _redact_tool_calls(tool_calls: Any, *, content_mode: str) -> Any:
"""Redact raw tool-call arguments (free text) while keeping function names."""
import json
try:
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
except Exception:
return "[unparseable-tool-calls]"
if not isinstance(tc, list):
return []
out: List[Dict[str, Any]] = []
for call in tc:
if not isinstance(call, dict):
continue
fn = (call.get("function") or {}) if isinstance(call.get("function"), dict) else {}
name = fn.get("name") or call.get("name")
args = fn.get("arguments")
red_args = redact_for_export(args, content_mode=content_mode) if args is not None else None
out.append({"name": name, "arguments": red_args})
return out
__all__ = [
"CONTENT_NONE",
"CONTENT_PII",
"CONTENT_MODES",
"redact_for_export",
"content_export_enabled",
"content_mode_for",
"redact_message",
]

144
agent/telemetry/rollup.py Normal file
View File

@@ -0,0 +1,144 @@
"""Build per-run summary events from the local telemetry tables.
Reads the ``tel_*`` tables and projects each completed run into a summary dict holding
the recorded values: provider, models used, tool names, token totals, duration, and
cost. Powers ``hermes telemetry preview``. No aggregation or bucketing is applied here.
"""
from __future__ import annotations
import platform
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
def _os_family() -> str:
s = platform.system().lower()
if s.startswith("lin"):
return "linux"
if s == "darwin":
return "macos"
if s.startswith("win"):
return "windows"
return "other"
def _hermes_version() -> str:
try:
from hermes_cli import __version__
return str(__version__)
except Exception:
return "0.0.0"
def _open(db_path: Optional[Path], conn: Optional[sqlite3.Connection]):
if conn is not None:
prev = conn.row_factory
conn.row_factory = sqlite3.Row
return conn, prev, False
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
return c, None, True
def _run_events(c: sqlite3.Connection, since_ns: Optional[int]) -> List[Dict[str, Any]]:
"""Project completed runs into per-run summary dicts."""
where = " WHERE end_ns IS NOT NULL"
if since_ns:
where += f" AND start_ns >= {int(since_ns)}"
rows = c.execute(
"SELECT run_id, entrypoint, platform, end_reason, start_ns, end_ns, "
"model_call_count, tool_call_count, error_count "
"FROM tel_runs" + where
).fetchall()
events: List[Dict[str, Any]] = []
for r in rows:
# Models actually used in this run (real ids), with token totals.
models = [
{"provider": m["provider"], "model": m["model"],
"calls": m["n"], "input_tokens": int(m["inp"] or 0),
"output_tokens": int(m["outp"] or 0)}
for m in c.execute(
"SELECT provider, model, COUNT(*) n, SUM(input_tokens) inp, "
"SUM(output_tokens) outp FROM tel_model_calls WHERE run_id = ? "
"GROUP BY provider, model ORDER BY n DESC",
(r["run_id"],),
).fetchall()
]
tools = [
row["tool_name"]
for row in c.execute(
"SELECT DISTINCT tool_name FROM tel_tool_calls WHERE run_id = ?",
(r["run_id"],),
).fetchall()
if row["tool_name"]
]
trow = c.execute(
"SELECT SUM(input_tokens) inp, SUM(output_tokens) outp "
"FROM tel_model_calls WHERE run_id = ?",
(r["run_id"],),
).fetchone()
duration_ms = (r["end_ns"] - r["start_ns"]) / 1e6 if r["end_ns"] else None
events.append({
"event_name": "workflow_completed",
"run_id": r["run_id"],
"entrypoint": r["entrypoint"] or "cli",
"platform": r["platform"],
"end_reason": r["end_reason"] or "completed",
"models_used": models,
"tools_used": tools,
"model_call_count": r["model_call_count"] or 0,
"tool_call_count": r["tool_call_count"] or 0,
"error_count": r["error_count"] or 0,
"duration_ms": round(duration_ms, 1) if duration_ms is not None else None,
"input_tokens": int((trow["inp"] if trow else 0) or 0),
"output_tokens": int((trow["outp"] if trow else 0) or 0),
})
return events
def build_aggregate_events(
*,
install_id: str,
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
conn: Optional[sqlite3.Connection] = None,
include_heartbeat: bool = True,
) -> List[Dict[str, Any]]:
"""Return per-run summary events plus an optional heartbeat."""
c, prev_factory, owned = _open(db_path, conn)
try:
events = _run_events(c, since_ns)
if include_heartbeat:
events.append({
"event_name": "heartbeat",
"install_id": install_id,
"hermes_version": _hermes_version(),
"os_family": _os_family(),
"entrypoint": "cli",
})
return events
finally:
if owned:
c.close()
elif prev_factory is not None:
c.row_factory = prev_factory
def summarize(events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Counts by event_name + field coverage, for status/preview output."""
by_name: Dict[str, int] = {}
fields = set()
for e in events:
name = e.get("event_name", "?")
by_name[name] = by_name.get(name, 0) + 1
fields.update(e.keys())
return {"total": len(events), "by_event_name": by_name, "fields_present": sorted(fields)}
__all__ = ["build_aggregate_events", "summarize"]

83
agent/telemetry/spans.py Normal file
View File

@@ -0,0 +1,83 @@
"""Trace / run / span id propagation via contextvars.
Telemetry events share IDs so a workflow can be reconstructed: one ``trace_id`` per
workflow, one ``run_id`` per top-level execution, ``span_id`` per timed operation, and
``parent_span_id`` for nesting. These live in contextvars so async tool calls and
spawned subagents inherit the lineage automatically.
Provides helpers to start/clear a run context and mint child span ids. The telemetry
plugin sets the run context on session start and reads it in each hook callback.
Nothing here writes to storage — it only carries ids.
"""
from __future__ import annotations
import contextvars
import uuid
from dataclasses import dataclass
from typing import Optional
_trace_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_trace_id", default=None
)
_run_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_run_id", default=None
)
_parent_span_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_parent_span_id", default=None
)
def new_id() -> str:
return uuid.uuid4().hex
@dataclass(slots=True)
class RunContext:
trace_id: str
run_id: str
def start_run(trace_id: Optional[str] = None, run_id: Optional[str] = None) -> RunContext:
"""Begin a run context, minting ids when not supplied. Sets contextvars."""
tid = trace_id or new_id()
rid = run_id or new_id()
_trace_id.set(tid)
_run_id.set(rid)
_parent_span_id.set(None)
return RunContext(trace_id=tid, run_id=rid)
def current_trace_id() -> Optional[str]:
return _trace_id.get()
def current_run_id() -> Optional[str]:
return _run_id.get()
def current_parent_span_id() -> Optional[str]:
return _parent_span_id.get()
def new_span_id() -> str:
"""Mint a span id (does not alter the parent pointer)."""
return new_id()
def clear_run() -> None:
_trace_id.set(None)
_run_id.set(None)
_parent_span_id.set(None)
__all__ = [
"RunContext",
"new_id",
"start_run",
"current_trace_id",
"current_run_id",
"current_parent_span_id",
"new_span_id",
"clear_run",
]

View File

@@ -217,9 +217,7 @@ class CodexEventProjector:
def _project_mcp_tool_call(self, item: dict, item_id: str) -> ProjectionResult:
server = item.get("server") or "mcp"
tool = item.get("tool") or "unknown"
# Mirror the native MCP tool-name convention (mcp__server__tool) so the
# deterministic call_id input stays consistent with registration names.
call_id = _deterministic_call_id(f"mcp__{server}__{tool}", item_id)
call_id = _deterministic_call_id(f"mcp_{server}_{tool}", item_id)
args = item.get("arguments") or {}
if not isinstance(args, dict):
args = {"arguments": args}

View File

@@ -1169,6 +1169,49 @@ display:
# # Routing/delivery still uses the original values internally.
# redact_pii: false
# =============================================================================
# Telemetry & Observability
# =============================================================================
# Three settings, isolated from each other:
# local — full-fidelity observability you own. Default ON.
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
# trajectories — content trajectories for training. Separate consent (later).
#
# Local telemetry records real values (actual models, providers, tool names) —
# your own data, on your machine. Aggregate metrics are opt-in and have no
# uploader today; if one ships it would summarize at that egress boundary.
#
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
# value. To hard-forbid egress on a locked-down deployment, pin
# `telemetry.allow_aggregate: false` there.
# telemetry:
# # Local telemetry: event log + SQLite index in state.db. Never leaves your
# # machine unless you export it or opt into aggregate metrics.
# local: true
# # Hard gate for aggregate metrics. When false, aggregate metrics are off
# # regardless of consent_state. Pin false via managed scope to forbid egress.
# allow_aggregate: true
# # Aggregate-metrics consent (the opt-in). No uploader ships yet.
# # unknown (no choice — never uploads) | local (declined) | aggregate (opted in)
# consent_state: unknown
# # Stable install id (aggregate metrics only). Empty = mint on first use;
# # clear it to rotate.
# install_id: ""
# # Local event-log retention before rotation (days).
# retention_days: 90
# # Keep secret redaction on even at full local capture.
# redact_secrets: true
# # Content redaction for exports / support bundles: none | pii.
# content_redaction: none
# # Exporters. The OTLP exporter sends spans to a configured Collector endpoint;
# # header values reference environment variable names, not inline secrets.
# export:
# otlp:
# enabled: false
# endpoint: null
# headers_env: {} # e.g. {Authorization: MY_OTLP_TOKEN_ENVVAR}
# =============================================================================
# Shell-script hooks
# =============================================================================

View File

@@ -307,6 +307,13 @@ nested agent work or security lifecycle events.
## Existing Consumers
The bundled **`telemetry`** plugin is the built-in local-first telemetry system: it
records runs, spans, model calls, and tool calls to a local event log + `state.db`, powers
`hermes insights`, and supports export to your own file or OpenTelemetry Collector. It
is on by default and never sends data to Nous unless you opt in. See
[`telemetry.md`](./telemetry.md) for the full feature, the `hermes telemetry` commands,
and enterprise export.
The bundled Langfuse plugin demonstrates direct hook-based observability for
turns, provider requests, and tool calls.

View File

@@ -0,0 +1,228 @@
# Telemetry & Observability
Hermes ships with a built-in, local-first telemetry system. It records what your
agent does — workflows, model calls, tool calls, errors — to your own machine, powers
`/insights`, and (when *you* enable it) exports everything to your own observability
stack. It is private by default and never sends your data to Nous unless you explicitly
opt in to anonymous aggregate metrics.
This page explains the whole feature: the three telemetry settings, what's captured, the
`hermes telemetry` commands, and how an enterprise streams or exports all of its data
to its own infrastructure.
> Looking for the **plugin hook contract** (how to write your own observer plugin)?
> That's in [`README.md`](./README.md). This page is about the built-in telemetry
> system and its CLI.
## The three settings
Telemetry has three settings, isolated from each other:
| Setting | What it holds | Default | Destination |
| --- | --- | --- | --- |
| **local** | Full-fidelity observability — runs, model/tool calls (real model & provider names), durations, errors | **on** | your machine only |
| **aggregate** | Opt-in metadata to Nous (no uploader ships yet) | **off** (opt-in) | Nous, only if you enable it |
| **trajectories** | Full message content / reasoning / raw tool args | **off** (opt-in) | your own export destinations only |
Local telemetry is the one you'll use day to day — it records the real values that
happened (actual model ids, providers, tool names). Aggregate metrics are the only
thing that could ever leave for Nous; they are opt-in, default-off, and have no uploader
today. Trajectories unlock full-content export to *your own* destinations — never wired
to Nous.
## Local telemetry — always-on observability
Local telemetry is implemented as a bundled `telemetry` plugin that listens to Hermes
lifecycle hooks (model calls, tool calls, session start/finalize) and writes events to:
- an append-only JSONL log at `~/.hermes/telemetry/events.jsonl` (the source of truth)
- indexed `tel_*` tables in `state.db` (a rebuildable index for fast queries):
`tel_runs`, `tel_spans`, `tel_model_calls`, `tel_tool_calls`, `tel_error_events`
Writes are fire-and-forget on a background thread: telemetry can never block, slow, or
fail a model call or tool call. If local telemetry is disabled (`telemetry.local: false`)
the plugin does not load at all.
### Traces and spans
A **run** is one session (from `on_session_start` to `on_session_finalize`). Each run gets
a root span in `tel_spans`, and every model and tool call within it is recorded as a child
span (timing + `parent_span_id` = the run's root) keyed by the same `span_id` as its
detail row in `tel_model_calls` / `tel_tool_calls`. So a run reconstructs as a connected
`run -> calls` tree, ordered by `start_ns` and joinable to the per-call detail — the shape
a trace viewer or any OpenTelemetry backend can render.
Call spans are timed from the measured latency/duration the hooks report; cross-run
subagent lineage (linking a delegated child run to its parent) is not yet recorded.
### Seeing your local data
```bash
hermes insights # usage report — now includes an "Observability" section
hermes telemetry status # settings, consent, export posture, local data volume
```
The `insights` Observability section shows workflow counts and success rate, duration
p50/p95, tool failure rates by tool, provider/model mix, and cache hit rate —
all computed locally with exact values.
## `hermes telemetry` commands
```text
hermes telemetry status Show settings, consent state, export posture, local volume
hermes telemetry preview Show the aggregate events that would be produced (local)
hermes telemetry export Export local telemetry to a file/stream or OTLP endpoint
```
Consent and the install id are plain config, not separate verbs — set them with
`hermes config set` (or a managed-scope pin):
```bash
hermes config set telemetry.consent_state aggregate # opt in to aggregate metrics
hermes config set telemetry.consent_state local # opt out (local telemetry stays on)
hermes config set telemetry.install_id "" # reset the install id (mints a new one)
```
### Aggregate metrics (opt-in)
Aggregate metrics are **off by default** and have **no uploader today** — nothing is
sent to Nous. Consent lives in `telemetry.consent_state` (`unknown` / `local` /
`aggregate`); setting it to `aggregate` records the opt-in for if/when an uploader ships.
If one is built, it would summarize at that egress boundary.
`hermes telemetry preview` shows your recent runs as they'd be summarized — computed and
shown **locally only**, with the real model and tool names from your own telemetry. It's
a local inspection surface, not an upload.
## Enterprise: getting all of your data
Everything below sends data to **your own** destination — a file, your SIEM, or your own
OpenTelemetry Collector. None of it goes to Nous.
### Bulk export to a file
```bash
# Structural telemetry only (default — no message content)
hermes telemetry export --out telemetry.ndjson
# JSON instead of NDJSON, last 7 days only
hermes telemetry export --out dump.json --format json --since 7
```
By default the export is **structural** — runs, model/tool-call metadata, session shells
with message *counts* but no message bodies.
### Including content (trajectories)
To export full message content, enable trajectories. This is a deliberate, separate
consent — it's how an enterprise opts into exporting work-product content to its own
store:
```yaml
# config.yaml
telemetry:
trajectories:
enabled: true # unlocks content export to YOUR destination
content_redaction: pii # "none" | "pii"
```
```bash
hermes telemetry export --out full.ndjson --include-content
```
`--include-content` is a no-op unless trajectories are enabled — the config setting
governs, not the flag.
### Live streaming to your OpenTelemetry Collector / SIEM (OTLP)
Hermes can stream telemetry to your own OTLP endpoint. This requires the optional `otlp`
extra:
```bash
pip install 'hermes-agent[otlp]'
```
```yaml
# config.yaml
telemetry:
export:
otlp:
enabled: true
endpoint: "https://collector.your-corp.internal:4318/v1/traces"
headers_env: # secrets by reference — env var NAMES, not values
Authorization: MY_OTLP_TOKEN_ENVVAR
```
Set the referenced environment variable, then run the export:
```bash
hermes telemetry export --otlp # drain current telemetry to your collector
```
The token value lives only in the environment variable named by `headers_env`. The
config holds the *name* of an environment variable rather than the secret itself; the
value is read at export time and is never written to config or logged.
Each telemetry event is exported as an OTel span carrying its recorded attributes
(provider, model, tokens, duration, etc.). The `tel_spans` timing/parent linkage is not
yet reconstructed into connected OTel `SpanContext`s, so spans currently arrive as
independent records rather than a connected trace tree; that projection is planned.
## Redaction
Two independent controls govern what content looks like on export:
| Control | Values | Effect |
| --- | --- | --- |
| Secret redaction | always on | API keys, tokens, auth headers, connection strings are **always** stripped on every export path. Cannot be disabled. |
| `content_redaction` | `none` \| `pii` | When content is exported, `pii` additionally redacts emails, phone numbers, and id-shaped strings. |
Secret redaction is always on — even at full content fidelity — because a SIEM or
warehouse full of live credentials is a bigger attack target than the data it holds. It
fails closed: if the redactor can't run, the raw string is not emitted.
## Configuration reference
```yaml
telemetry:
local: true # local telemetry (default on)
allow_aggregate: true # hard gate; pin false to forbid aggregate metrics entirely
consent_state: unknown # aggregate opt-in: unknown | local | aggregate
install_id: "" # stable anon id; "" mints one; clear to rotate
retention_days: 90 # local event-log retention
redact_secrets: true # always-on secret redaction (kept on by design)
content_redaction: none # none | pii
trajectories:
enabled: false # unlocks full-content export to your destination
export:
otlp:
enabled: false
endpoint: null
headers_env: {} # {HeaderName: ENV_VAR_NAME}
```
### Enterprise policy via managed scope
Any `telemetry.*` key can be pinned by an administrator through Hermes' managed-scope
layer (`/etc/hermes/config.yaml`), which wins over the user's value on a per-key basis.
There is no telemetry-specific policy block — to lock down a fleet, pin the keys you care
about. Common examples:
- `telemetry.allow_aggregate: false` — aggregate metrics stay off even if
`consent_state` is set to `aggregate`.
- `telemetry.export.otlp.endpoint` — point every install at the corporate collector.
- `telemetry.trajectories.enabled` — centrally decide whether content export is allowed.
When a key is managed, attempts to change it are rejected by managed scope with a message
naming the source. `hermes telemetry status` shows the current export posture (endpoint
host, whether the auth env var is set, content gate, redaction modes) — it never prints
secret values.
## Privacy summary
- Local telemetry never leaves your machine.
- Aggregate metrics (the only thing that could go to Nous) are opt-in, default-off,
and have no uploader today — nothing is sent.
- All export surfaces (file, OTLP) point at *your* destinations.
- Secrets are always redacted on export; content export is off until you enable
trajectories; PII redaction is a knob.

View File

@@ -1866,7 +1866,61 @@ DEFAULT_CONFIG = {
"privacy": {
"redact_pii": False, # When True, hash user IDs and strip phone numbers from LLM context
},
# Telemetry & observability. Three settings, isolated from each other:
# local — full-fidelity local observability the user owns (real
# models, providers, tool names). Default ON.
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
# trajectories — content trajectories for training. Separate consent (not here yet).
#
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
# value on a per-key basis. There is no telemetry-specific policy block — pin
# e.g. `telemetry.allow_aggregate: false` in managed scope to hard-forbid egress.
"telemetry": {
# Local telemetry: event log + SQLite index in state.db. The user's own data;
# never leaves the machine unless they export it or opt into aggregate metrics.
"local": True,
# Hard gate for aggregate metrics. When False, aggregate metrics are off
# regardless of consent_state. Intended to be pinned False by an administrator
# via managed scope on locked-down or air-gapped deployments. Default True
# (consent_state still governs the opt-in).
"allow_aggregate": True,
# Aggregate-metrics consent — the single source of truth for the opt-in:
# "unknown" (no choice made — never uploads), "local" (declined),
# "aggregate" (opted in). Set with `hermes config set telemetry.consent_state`
# or a managed-scope pin. Non-interactive installs sit at "unknown".
"consent_state": "unknown",
# Stable install identifier (aggregate metrics only). Empty string means "mint a
# fresh UUID on first use"; clear it to rotate. Never sent by local telemetry.
"install_id": "",
# Local event-log retention before rotation (days). Local telemetry only.
"retention_days": 90,
# Keep secret redaction on even at full local capture (a SIEM full of live
# credentials is a bigger attack target). Admin may override via managed scope.
"redact_secrets": True,
# Content redaction for exports / support bundles: "none" | "pii".
"content_redaction": "none",
# Trajectories: full message content / reasoning / raw tool args.
# Off by default. When enabled, full content becomes exportable to the
# configured destination — always secret-redacted, and PII-redacted per
# content_redaction. This is the consent gate for content export and is
# admin-pinnable via managed scope. It does not enable any upload.
"trajectories": {
"enabled": False,
},
# Exporters. The OTLP exporter sends to a configured Collector endpoint;
# endpoint headers reference environment variable names rather than inline
# secrets.
"export": {
"otlp": {
"enabled": False,
"endpoint": None,
"headers_env": {}, # {"Authorization": "MY_OTLP_TOKEN_ENVVAR"}
},
},
},
# Text-to-speech configuration
# Each provider supports an optional `max_text_length:` override for the
# per-request input-character cap. Omit it to use the provider's documented

View File

@@ -295,6 +295,7 @@ from hermes_cli.subcommands.memory import build_memory_parser
from hermes_cli.subcommands.acp import build_acp_parser
from hermes_cli.subcommands.tools import build_tools_parser
from hermes_cli.subcommands.insights import build_insights_parser
from hermes_cli.subcommands.telemetry import build_telemetry_parser
from hermes_cli.subcommands.skills import build_skills_parser
from hermes_cli.subcommands.pairing import build_pairing_parser
from hermes_cli.subcommands.plugins import build_plugins_parser
@@ -11590,7 +11591,7 @@ _BUILTIN_SUBCOMMANDS = frozenset(
"project", "proxy",
"prompt-size",
"send", "sessions", "setup",
"skills", "slack", "status", "tools", "uninstall", "update",
"skills", "slack", "status", "telemetry", "tools", "uninstall", "update",
"version", "webhook", "whatsapp", "whatsapp-cloud", "chat", "secrets", "security",
# Help-ish invocations — plugin commands not being listed in
# top-level --help is an acceptable trade-off for skipping an
@@ -12022,6 +12023,195 @@ def cmd_insights(args):
print(f"Error generating insights: {e}")
def cmd_telemetry(args):
"""Local-only telemetry control + inspection. No uploader exists yet."""
import json as _json
import time
from hermes_cli.config import load_config, save_config
from agent.telemetry import policy, rollup, metrics
action = getattr(args, "telemetry_action", None) or "status"
config = load_config()
tel = config.get("telemetry") if isinstance(config.get("telemetry"), dict) else {}
local_enabled = bool(tel.get("local", True))
allow_aggregate = bool(tel.get("allow_aggregate", True))
consent_state = tel.get("consent_state", policy.CONSENT_UNKNOWN)
if consent_state not in policy.VALID_CONSENT_STATES:
consent_state = policy.CONSENT_UNKNOWN
aggregate_enabled = (local_enabled and allow_aggregate
and consent_state == policy.CONSENT_AGGREGATE)
install_id = policy.ensure_install_id(config)
def _persist_install_id():
# Make sure a minted id is written back so it stays stable.
config.setdefault("telemetry", {})["install_id"] = install_id
save_config(config)
if action == "status":
print("Telemetry status")
print("" * 56)
print(f" Local telemetry: {'on' if local_enabled else 'off'} "
f"(telemetry.local)")
print(f" Aggregate metrics: {'on' if aggregate_enabled else 'off'} "
f"(opt-in; consent_state={consent_state})")
if consent_state == policy.CONSENT_AGGREGATE and not local_enabled:
print(" ⚠ inert: local telemetry is off — nothing to aggregate")
elif consent_state != policy.CONSENT_AGGREGATE and allow_aggregate:
print(" opt in: hermes config set telemetry.consent_state aggregate")
if not allow_aggregate:
print(" ⚠ allow_aggregate is false (egress hard-disabled)")
print(f" Install id: {install_id}")
print(" Upload: DISABLED — no server yet. Aggregate metrics are "
"computed locally only.")
print()
# Export posture — where YOUR data can flow (never Nous). Values only;
# lock-state is managed scope's concern, surfaced by its own tooling.
try:
from agent.telemetry import otlp_exporter, redaction
otlp = otlp_exporter._otlp_config(config)
print(" Export")
if otlp.get("enabled") and otlp.get("endpoint"):
_host = str(otlp.get("endpoint"))
print(f" OTLP: enabled → {_host}")
# Show the header name + env var + whether it's set, never the value.
hdrs = otlp.get("headers_env") or {}
if hdrs:
import os as _os
for _h, _env in hdrs.items():
_state = "set" if _os.environ.get(str(_env)) else "NOT set"
print(f" auth: header '{_h}' ← ${_env} ({_state})")
_sdk = "installed" if otlp_exporter.is_available() else "MISSING — pip install 'hermes-agent[otlp]'"
print(f" SDK: {_sdk}")
else:
print(" OTLP: disabled (telemetry.export.otlp.enabled)")
# Content gate (telemetry.trajectories) + redaction posture.
if redaction.content_export_enabled(config):
print(f" Content export: on (trajectories enabled) — message "
f"content exportable")
else:
print(" Content export: off (trajectories disabled) — structural "
"telemetry only")
print(" Secret redaction: on (always)")
print(f" PII redaction: {redaction.content_mode_for(config)}")
print(" Bulk export: hermes telemetry export --out FILE [--otlp]")
except Exception:
pass
print()
# Local data volume
try:
ov = metrics.overview()
runs = ov["workflows"]["total_runs"]
mcs = sum(ov["model_calls"]["by_provider"].values())
tcs = ov["tool_calls"]["total"]
print(f" Local data: {runs:,} workflows · {mcs:,} model calls · {tcs:,} tool calls")
except Exception:
print(" Local data: (none yet)")
print("\n Inspect what would be shared: hermes telemetry preview")
return
if action == "preview":
_persist_install_id()
since_ns = int((time.time() - args.days * 86400) * 1e9)
events = rollup.build_aggregate_events(
install_id=install_id, since_ns=since_ns
)
summary = rollup.summarize(events)
print("Telemetry preview — computed locally, NOT uploaded")
print("" * 56)
print(f" Window: last {args.days} days Events: {summary['total']}")
for name, n in sorted(summary["by_event_name"].items()):
print(f" {name}: {n}")
print(" Shows the actual models, tools, and counts from local telemetry.")
print()
shown = events[: args.limit]
if getattr(args, "json", False):
print(_json.dumps(shown, indent=2))
else:
for e in shown:
if e.get("event_name") == "heartbeat":
print(f" • heartbeat hermes={e.get('hermes_version')} os={e.get('os_family')}")
continue
bits = [f"{e.get('event_name')}"]
for k in ("entrypoint", "platform", "end_reason", "duration_ms"):
if e.get(k) is not None:
bits.append(f"{k}={e[k]}")
models = e.get("models_used") or []
if models:
bits.append("models=" + ",".join(
f"{m.get('model') or m.get('provider') or '?'}" for m in models))
if e.get("tools_used"):
bits.append("tools=" + ",".join(e["tools_used"]))
print("" + " ".join(bits))
if len(events) > len(shown):
print(f" ... and {len(events) - len(shown)} more (use --limit)")
return
if action == "export":
import sys as _sys
from agent.telemetry import exporter_bulk, redaction
since_ns = int((time.time() - args.since * 86400) * 1e9) if getattr(args, "since", 0) else None
want_content = getattr(args, "include_content", False)
# OTLP path: stream spans to the configured Collector endpoint.
if getattr(args, "otlp", False):
from agent.telemetry import otlp_exporter
if not otlp_exporter.is_enabled(config):
print("telemetry.export.otlp is not enabled/endpoint not set. "
"Set telemetry.export.otlp.enabled + .endpoint.", file=_sys.stderr)
return
# The OTel SDK is an optional dep; export_once() lazily installs it on
# first use (gated by security.allow_lazy_installs, TTY-prompted),
# same as every other optional backend. OTLPUnavailable is the
# fallback when it can't be installed.
try:
n = otlp_exporter.export_once(config, since_ns=since_ns)
except otlp_exporter.OTLPUnavailable as e:
print(str(e), file=_sys.stderr)
return
except Exception as e:
print(f"OTLP export failed: {e}", file=_sys.stderr)
return
print(f"Exported {n} spans to the OTLP endpoint "
f"({(otlp_exporter._otlp_config(config) or {}).get('endpoint')}).",
file=_sys.stderr)
return
content_ok = redaction.content_export_enabled(config)
if want_content and not content_ok:
print("⚠ --include-content ignored: telemetry.trajectories.enabled is false.")
print(" Enable trajectories (admin/config) to export message content.")
print(" Exporting structural telemetry only.")
if not getattr(args, "out", None):
print("--out is required (or use --otlp).", file=_sys.stderr)
return
out_path = args.out
if out_path == "-":
counts = exporter_bulk.export(
_sys.stdout, fmt=args.fmt, since_ns=since_ns,
include_content=want_content, config=config,
)
else:
with open(out_path, "w", encoding="utf-8") as fh:
counts = exporter_bulk.export(
fh, fmt=args.fmt, since_ns=since_ns,
include_content=want_content, config=config,
)
# status to stderr so stdout stays clean for `--out -`
msg = (f"Exported {counts['telemetry']} telemetry records"
+ (f" + {counts['sessions']} sessions" if counts["sessions"] else "")
+ (" (content INCLUDED, redacted)" if counts["content_included"]
else " (structural only)")
+ (f" -> {out_path}" if out_path != "-" else ""))
print(msg, file=_sys.stderr)
return
print(f"Unknown telemetry action: {action}")
print("Use: status | preview | export")
def cmd_skills(args):
# Route 'config' action to skills_config module
if getattr(args, "skills_action", None) == "config":
@@ -13081,6 +13271,9 @@ def main():
# =========================================================================
build_insights_parser(subparsers, cmd_insights=cmd_insights)
# telemetry command (parser in hermes_cli/subcommands/telemetry.py)
build_telemetry_parser(subparsers, cmd_telemetry=cmd_telemetry)
# =========================================================================
# claw command (parser built in hermes_cli/subcommands/claw.py)
# =========================================================================

View File

@@ -204,6 +204,23 @@ def _env_enabled(name: str) -> bool:
return env_var_enabled(name)
def _telemetry_local_enabled() -> bool:
"""True when the local telemetry plane is enabled (telemetry.local, default true).
Gates auto-loading of the bundled ``telemetry`` plugin. Reads config defensively
so a malformed/missing telemetry section defaults to on (the design default).
"""
try:
from hermes_cli.config import load_config
config = load_config()
tel = cfg_get(config, "telemetry", default={})
if not isinstance(tel, dict):
return True
return bool(tel.get("local", True))
except Exception:
return True
def _get_disabled_plugins() -> set:
"""Read the disabled plugins list from config.yaml.
@@ -1326,6 +1343,24 @@ class PluginManager:
self._load_plugin(manifest)
continue
# The bundled telemetry plugin auto-loads when the local plane is
# enabled (telemetry.local, default true). It's observational
# (lifecycle hooks -> local event log), writes nothing to the
# network, and powers /usage and /insights, so it should be on out
# of the box without an opt-in. A user who sets telemetry.local:
# false opts out and it is skipped like any disabled plugin.
if (
manifest.source == "bundled"
and (manifest.key or manifest.name) == "telemetry"
):
if _telemetry_local_enabled():
self._load_plugin(manifest)
else:
loaded = LoadedPlugin(manifest=manifest, enabled=False)
loaded.error = "disabled (telemetry.local is false)"
self._plugins[lookup_key] = loaded
continue
# Everything else (standalone, user-installed backends,
# entry-point plugins) is opt-in via plugins.enabled.
# Accept both the path-derived key and the legacy bare name

View File

@@ -0,0 +1,54 @@
"""``hermes telemetry`` subcommand parser.
Telemetry control and inspection. ``preview`` shows the per-run summary events that
would be produced for aggregate metrics; there is no uploader, so it terminates as a
local view.
The handler is injected to avoid importing ``main`` (mirrors the insights subcommand).
"""
from __future__ import annotations
from typing import Callable
def build_telemetry_parser(subparsers, *, cmd_telemetry: Callable) -> None:
"""Attach the ``telemetry`` subcommand (with actions) to ``subparsers``."""
p = subparsers.add_parser(
"telemetry",
help="Inspect local telemetry and export it",
description=(
"Local-first telemetry. Local telemetry records observability on this "
"machine. Aggregate metrics are opt-in (set telemetry.consent_state via "
"`hermes config set`); they have no uploader and are shown only via `preview`."
),
)
sub = p.add_subparsers(dest="telemetry_action")
sub.add_parser("status", help="Show telemetry settings, consent state, and local data volume")
prev = sub.add_parser(
"preview",
help="Show the aggregate events that would be produced (computed locally, not uploaded)",
)
prev.add_argument("--days", type=int, default=30, help="Window to roll up (default: 30)")
prev.add_argument("--limit", type=int, default=10, help="Max events to print (default: 10)")
prev.add_argument("--json", action="store_true", help="Print raw JSON events")
exp = sub.add_parser(
"export",
help="Export local telemetry (and optional content) to a file, stream, or OTLP endpoint",
)
exp.add_argument("--out", help="Output file path (use - for stdout). Not needed with --otlp.")
exp.add_argument("--format", dest="fmt", choices=["ndjson", "json"], default="ndjson",
help="Output format (default: ndjson)")
exp.add_argument("--since", type=int, default=0,
help="Only telemetry from the last N days (0 = all)")
exp.add_argument("--include-content", action="store_true",
help="Include session/message content (requires telemetry.trajectories.enabled). "
"Secrets always redacted; PII per telemetry.content_redaction.")
exp.add_argument("--otlp", action="store_true",
help="Export to the configured OTLP endpoint (telemetry.export.otlp.*) "
"instead of a file. Requires the optional 'otlp' extra.")
p.set_defaults(func=cmd_telemetry)

View File

@@ -120,7 +120,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 16
SCHEMA_VERSION = 17 # v17: tel_* telemetry tables (local observability) — runs, spans, model/tool calls, errors
# ---------------------------------------------------------------------------
# WAL-compatibility fallback
@@ -603,6 +603,76 @@ CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_compression_locks_expires ON compression_locks(expires_at);
-- ── Telemetry (local observability) ─────────────────────────────────────────
-- Event/span layer co-located in state.db so runs JOIN to sessions/messages.
-- The append-only JSONL log at ~/.hermes/telemetry/events.jsonl is the source
-- of truth; these tables are a rebuildable index. They hold the user's own data
-- (real model ids, provider/tool names) and never leave the machine unless the
-- user exports them. A "run" is one session (from on_session_start to
-- on_session_finalize); model/tool calls within it are recorded as spans.
CREATE TABLE IF NOT EXISTS tel_runs (
run_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
session_id TEXT,
entrypoint TEXT NOT NULL,
platform TEXT,
start_ns INTEGER NOT NULL,
end_ns INTEGER,
end_reason TEXT,
model_call_count INTEGER DEFAULT 0,
tool_call_count INTEGER DEFAULT 0,
error_count INTEGER DEFAULT 0,
schema_v INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS tel_spans (
span_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
run_id TEXT NOT NULL,
parent_span_id TEXT,
name TEXT NOT NULL,
kind TEXT,
start_ns INTEGER NOT NULL,
end_ns INTEGER,
status TEXT
);
CREATE TABLE IF NOT EXISTS tel_model_calls (
span_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
provider TEXT, -- raw provider, e.g. "anthropic"
model TEXT, -- raw model id, e.g. "claude-opus-4"
base_url TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
latency_ms INTEGER
);
CREATE TABLE IF NOT EXISTS tel_tool_calls (
span_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
tool_name TEXT, -- raw tool name (e.g. "web_search")
duration_ms INTEGER,
result_class TEXT
);
CREATE TABLE IF NOT EXISTS tel_error_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
error_class TEXT, subsystem TEXT, recovery TEXT -- enums only; NO raw message/stack
);
CREATE INDEX IF NOT EXISTS ix_tel_runs_trace ON tel_runs(trace_id);
CREATE INDEX IF NOT EXISTS ix_tel_runs_session ON tel_runs(session_id);
CREATE INDEX IF NOT EXISTS ix_tel_runs_start ON tel_runs(start_ns);
CREATE INDEX IF NOT EXISTS ix_tel_spans_run ON tel_spans(run_id);
CREATE INDEX IF NOT EXISTS ix_tel_spans_trace ON tel_spans(trace_id);
CREATE INDEX IF NOT EXISTS ix_tel_model_run ON tel_model_calls(run_id);
CREATE INDEX IF NOT EXISTS ix_tel_tool_run ON tel_tool_calls(run_id);
"""
# Indexes that reference columns added in later schema versions must be

View File

@@ -0,0 +1,382 @@
"""Telemetry plugin — wires Hermes lifecycle hooks to the local telemetry emitter.
This is the *only* instrumentation seam. It registers observational hooks (which core
already invokes fail-open) and translates each into a typed local telemetry event
handed to ``agent.telemetry.emitter``. There are zero edits to core call sites:
the hooks already carry model/provider/usage/duration/tool data.
Everything here is best-effort and fail-open — a raised exception in a hook callback is
swallowed by core, and we additionally guard each callback so a telemetry bug can never
disturb a session. No content, no network: local telemetry only.
Hooks consumed:
on_session_start -> begin a run context (trace_id/run_id + root span id)
post_api_request -> one model_call event + its timing span (tokens, latency)
api_request_error -> one error event
post_tool_call -> one tool_call event + its timing span (duration, result class)
on_session_finalize -> finalize the run row + emit the run's root span
subagent_start/stop -> (reserved) lineage markers
Each model/tool call emits a SpanEvent (timing + parent = the run's root span) keyed by
the same span_id as its detail row, so tel_spans reconstructs a run -> calls trace tree.
"""
from __future__ import annotations
import logging
import threading
import time
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Per-run accumulators keyed by run_id, so on_session_finalize can roll up counts.
_runs: Dict[str, Dict[str, Any]] = {}
_runs_lock = threading.Lock()
def _safe(fn):
"""Decorator: never let a telemetry hook raise into core."""
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception:
logger.debug("telemetry hook %s failed", getattr(fn, "__name__", "?"), exc_info=True)
return None
wrapper.__name__ = getattr(fn, "__name__", "wrapper")
return wrapper
def _run_key(session_id: Optional[str], task_id: Optional[str]) -> str:
return session_id or task_id or "default"
# ── on_session_start ────────────────────────────────────────────────────────
@_safe
def _on_session_start(**kw: Any) -> None:
from agent.telemetry import spans
session_id = kw.get("session_id") or ""
platform = kw.get("platform") or kw.get("source") or ""
ctx = spans.start_run()
now = time.time_ns()
root_span_id = spans.new_span_id()
key = _run_key(session_id, kw.get("task_id"))
with _runs_lock:
_runs[key] = {
"run_id": ctx.run_id,
"trace_id": ctx.trace_id,
"root_span_id": root_span_id,
"session_id": session_id or None,
"entrypoint": _entrypoint_for(platform, kw.get("source")),
"platform": platform or None,
"start_ns": now,
"model_call_count": 0,
"tool_call_count": 0,
"error_count": 0,
}
def _ensure_run(session_id: Optional[str], task_id: Optional[str], platform: str = "") -> Dict[str, Any]:
"""Return the run accumulator, lazily creating one if session_start was missed."""
from agent.telemetry import spans
key = _run_key(session_id, task_id)
with _runs_lock:
run = _runs.get(key)
if run is None:
rid = spans.current_run_id() or spans.new_id()
tid = spans.current_trace_id() or spans.new_id()
run = {
"run_id": rid,
"trace_id": tid,
"root_span_id": spans.new_span_id(),
"session_id": session_id or None,
"entrypoint": _entrypoint_for(platform),
"platform": platform or None,
"start_ns": time.time_ns(),
"model_call_count": 0,
"tool_call_count": 0,
"error_count": 0,
}
_runs[key] = run
return run
def _emit_call_span(run: Dict[str, Any], span_id: str, name: str, kind: str,
duration_ms: Optional[int], status: Optional[str]) -> None:
"""Emit the timing/lineage span for a model or tool call.
The call hooks fire on completion, so end_ns is ~now and start_ns is reconstructed
from the measured duration (end - duration). The span is parented to the run's root
so a 2-level run -> calls waterfall can be reconstructed from tel_spans.
"""
from agent.telemetry import emitter
from agent.telemetry.events import SpanEvent
end_ns = time.time_ns()
dur_ns = int(duration_ms) * 1_000_000 if isinstance(duration_ms, (int, float)) else 0
start_ns = end_ns - dur_ns
emitter.emit(SpanEvent(
span_id=span_id,
trace_id=run["trace_id"],
run_id=run["run_id"],
parent_span_id=run.get("root_span_id"),
name=name,
kind=kind,
start_ns=start_ns,
end_ns=end_ns,
status=status,
))
def _entrypoint_for(platform: Optional[str], source: Optional[str] = None) -> str:
"""Coarse entrypoint label (cli / gateway / tui / api / cron …).
This is a workflow *surface* label, not model/tool anonymization — it answers
"where did this run come from", which is genuinely categorical.
"""
s = (source or platform or "").lower()
if s in ("", "chat", "interactive", "cli", "desktop"):
return "cli"
if s in ("telegram", "discord", "slack", "whatsapp", "signal", "matrix",
"email", "sms", "teams", "feishu", "wecom", "line", "google_chat"):
return "gateway"
if s == "tui":
return "tui"
if s in ("api", "api_server", "openai_api"):
return "api"
if s == "cron":
return "cron"
if s == "batch":
return "batch"
if s == "acp":
return "acp"
return "cli"
# ── post_api_request -> model_call event ────────────────────────────────────
@_safe
def _on_post_api_request(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import ModelCallEvent
session_id = kw.get("session_id") or ""
platform = kw.get("platform") or ""
run = _ensure_run(session_id, kw.get("task_id"), platform)
usage = kw.get("usage") or {}
duration = kw.get("api_duration")
latency_ms = int(duration * 1000) if isinstance(duration, (int, float)) else None
span_id = spans.new_span_id()
evt = ModelCallEvent(
span_id=span_id,
run_id=run["run_id"],
provider=kw.get("provider"), # raw
model=kw.get("model"), # raw
base_url=kw.get("base_url"),
input_tokens=int(usage.get("input_tokens") or 0),
output_tokens=int(usage.get("output_tokens") or 0),
cache_read_tokens=int(usage.get("cache_read_tokens") or 0),
cache_write_tokens=int(usage.get("cache_write_tokens") or 0),
reasoning_tokens=int(usage.get("reasoning_tokens") or 0),
latency_ms=latency_ms,
)
with _runs_lock:
run["model_call_count"] += 1
_emit_call_span(run, span_id, name=kw.get("model") or "model_call",
kind="model", duration_ms=latency_ms, status="ok")
emitter.emit(evt)
# ── api_request_error -> error event ────────────────────────────────────────
@_safe
def _on_api_request_error(**kw: Any) -> None:
from agent.telemetry import emitter
from agent.telemetry.events import ErrorEvent
session_id = kw.get("session_id") or ""
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
error_class = _coarse_error_class(kw.get("error_type") or kw.get("error") or "")
with _runs_lock:
run["error_count"] += 1
emitter.emit(ErrorEvent(
run_id=run["run_id"],
error_class=error_class,
subsystem="model_api",
recovery=None,
))
def _coarse_error_class(raw: Any) -> str:
s = str(raw).lower()
if "timeout" in s:
return "provider_timeout"
if "rate" in s and "limit" in s:
return "rate_limit"
if any(k in s for k in ("auth", "401", "403", "unauthorized", "forbidden")):
return "auth"
if any(k in s for k in ("connection", "network", "dns", "socket")):
return "network"
if "context" in s and ("length" in s or "overflow" in s or "token" in s):
return "context_overflow"
if any(k in s for k in ("500", "502", "503", "server error", "provider")):
return "provider_error"
return "unknown"
# ── post_tool_call -> tool_call event ───────────────────────────────────────
@_safe
def _on_post_tool_call(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import ToolCallEvent
session_id = kw.get("session_id") or ""
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
function_name = kw.get("function_name") or kw.get("tool_name")
duration_ms = kw.get("duration_ms")
result = kw.get("result")
result_class = _tool_result_class(result)
with _runs_lock:
run["tool_call_count"] += 1
if result_class == "error":
run["error_count"] += 1
dur_int = int(duration_ms) if isinstance(duration_ms, (int, float)) else None
span_id = spans.new_span_id()
_emit_call_span(run, span_id, name=function_name or "tool_call",
kind="tool", duration_ms=dur_int, status=result_class)
emitter.emit(ToolCallEvent(
span_id=span_id,
run_id=run["run_id"],
tool_name=function_name, # raw tool name
duration_ms=dur_int,
result_class=result_class,
))
def _tool_result_class(result: Any) -> str:
"""Classify a tool result without retaining content — error vs ok vs blocked."""
try:
import json
if isinstance(result, str):
r = result.strip()
if r.startswith("{"):
obj = json.loads(r)
if isinstance(obj, dict):
if obj.get("error") or obj.get("blocked"):
return "blocked" if obj.get("blocked") else "error"
if obj.get("timeout"):
return "timeout"
return "ok"
if isinstance(result, dict):
if result.get("error"):
return "error"
return "ok"
except Exception:
return "ok"
return "ok"
# ── on_session_finalize -> finalize the run row ─────────────────────────────
@_safe
def _on_session_finalize(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import RunEvent, SpanEvent
session_id = kw.get("session_id") or ""
key = _run_key(session_id, kw.get("task_id"))
with _runs_lock:
run = _runs.pop(key, None)
if run is None:
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
with _runs_lock:
_runs.pop(key, None)
end_ns = time.time_ns()
start_ns = run.get("start_ns", end_ns)
end_reason = _coarse_end_reason(kw)
# Root span for the run — the trace root the call spans hang off of.
emitter.emit(SpanEvent(
span_id=run.get("root_span_id") or spans.new_span_id(),
trace_id=run["trace_id"],
run_id=run["run_id"],
parent_span_id=None,
name=f"run:{run.get('entrypoint', 'cli')}",
kind="run",
start_ns=start_ns,
end_ns=end_ns,
status=end_reason,
))
emitter.emit(RunEvent(
run_id=run["run_id"],
trace_id=run["trace_id"],
entrypoint=run.get("entrypoint", "cli"),
session_id=run.get("session_id"),
platform=run.get("platform"),
start_ns=start_ns,
end_ns=end_ns,
end_reason=end_reason,
model_call_count=run.get("model_call_count", 0),
tool_call_count=run.get("tool_call_count", 0),
error_count=run.get("error_count", 0),
))
spans.clear_run()
def _coarse_end_reason(kw: Dict[str, Any]) -> str:
"""Map a finalize payload to a coarse end reason.
Production finalize callers pass ``reason`` (e.g. "shutdown", "session_expired",
"session_reset"); the older ``turn_exit_reason``/``interrupted``/``failed`` keys are
honored when present. Defaults to "ended".
"""
if kw.get("interrupted"):
return "interrupted"
if kw.get("failed"):
return "failed"
reason = str(kw.get("turn_exit_reason") or kw.get("reason") or "").lower()
if "max_iteration" in reason:
return "max_iterations"
if "timeout" in reason:
return "timeout"
if "expired" in reason:
return "expired"
if "reset" in reason:
return "reset"
if "shutdown" in reason or "complete" in reason:
return "completed"
return reason or "ended"
# ── subagent lineage (reserved) ─────────────────────────────────────────────
# A delegated subagent runs its own ``run_conversation`` with its own session id, so
# its model/tool calls are already captured as a separate tel_runs row via the normal
# hooks — no subagent activity is lost. These hooks fire with the parent<->child bridge
# (parent_session_id, child_session_id, child_role, child_goal); they are reserved for
# recording parent->child *lineage* (linking a child run back to its parent), which
# needs a tel_runs.parent_run_id column. Deferred until a consumer needs the delegation
# tree; left registered as the attachment point.
@_safe
def _on_subagent_start(**kw: Any) -> None:
return None
@_safe
def _on_subagent_stop(**kw: Any) -> None:
return None
# ── registration ────────────────────────────────────────────────────────────
def register(ctx) -> None:
ctx.register_hook("on_session_start", _on_session_start)
ctx.register_hook("post_api_request", _on_post_api_request)
ctx.register_hook("api_request_error", _on_api_request_error)
ctx.register_hook("post_tool_call", _on_post_tool_call)
ctx.register_hook("on_session_finalize", _on_session_finalize)
ctx.register_hook("subagent_start", _on_subagent_start)
ctx.register_hook("subagent_stop", _on_subagent_stop)
logger.debug("telemetry plugin registered 7 hooks")

View File

@@ -0,0 +1,12 @@
name: telemetry
version: 1.0.0
description: "Local-first telemetry & observability. Records runs, model calls, tool calls, and errors to a local event log + state.db index via plugin hooks — no agent action, no content, no network. Powers /usage, /insights, and local dashboards."
author: NousResearch
hooks:
- post_api_request
- api_request_error
- post_tool_call
- on_session_start
- on_session_finalize
- subagent_start
- subagent_stop

View File

@@ -154,6 +154,10 @@ edge-tts = ["edge-tts==7.2.7"]
modal = ["modal==1.3.4"]
daytona = ["daytona==0.155.0"]
hindsight = ["hindsight-client==0.6.1"]
# OTLP telemetry export (optional). Provides the OpenTelemetry SDK + OTLP/HTTP
# exporter so `hermes telemetry export --otlp` can send spans to a Collector.
# Lazy-imported by agent/telemetry/otlp_exporter.py; never a core dependency.
otlp = ["opentelemetry-sdk==1.30.0", "opentelemetry-exporter-otlp-proto-http==1.30.0"]
dev = ["debugpy==1.8.20", "pytest==9.0.2", "pytest-asyncio==1.3.0", "mcp==1.26.0", "starlette==1.0.1", "ty==0.0.21", "ruff==0.15.10", "setuptools==81.0.0"] # starlette: CVE-2026-48710; setuptools: latest <82 (torch >=2.11 caps setuptools<82)
messaging = ["python-telegram-bot[webhooks]==22.6", "discord.py[voice]==2.7.1", "aiohttp==3.13.4", "brotlicffi==1.2.0.1", "slack-bolt==1.27.0", "slack-sdk==3.40.1", "qrcode==7.4.2"] # aiohttp: CVE-2026-34513/34518/34519/34520/34525
cron = [] # croniter is now a core dependency; this extra kept for back-compat

View File

@@ -760,7 +760,11 @@ class TestPluginHooks:
mgr.discover_and_load()
assert mgr.has_hook("pre_api_request") is True
assert mgr.has_hook("post_api_request") is False
# Negative sentinel: a hook no plugin (user or bundled) registers, proving
# that registering pre_api_request doesn't conjure an unrelated hook.
# (post_api_request is now registered by the bundled telemetry plugin, so
# it is no longer a valid "nothing registered this" sentinel.)
assert mgr.has_hook("transform_terminal_output") is False
results = mgr.invoke_hook(
"pre_api_request",
session_id="s1",

View File

View File

@@ -0,0 +1,108 @@
"""`hermes telemetry` handler smoke tests (local-only; no upload)."""
from __future__ import annotations
import sqlite3
import time
import types
import pytest
import hermes_state
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# state.db with tel_* + seeded data
db = tmp_path / "state.db"
hermes_state.SessionDB(db_path=db)
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import RunEvent, ModelCallEvent, ToolCallEvent
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=now - 60_000_000, end_ns=now, model_call_count=1,
tool_call_count=1))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4",
input_tokens=20000, output_tokens=2000))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search",
result_class="ok"))
em.flush()
em.close()
yield tmp_path
def _run(action, **kw):
from hermes_cli.main import cmd_telemetry
args = types.SimpleNamespace(telemetry_action=action, days=30, limit=10, json=False)
for k, v in kw.items():
setattr(args, k, v)
cmd_telemetry(args)
def test_status_runs(home, capsys):
_run("status")
out = capsys.readouterr().out
assert "Telemetry status" in out
assert "Upload:" in out and "DISABLED" in out
assert "Local data:" in out
def test_preview_shows_real_values(home, capsys):
_run("preview")
out = capsys.readouterr().out
assert "NOT uploaded" in out
assert "workflow_completed" in out
# real model + tool names ARE shown (this is the user's own local data)
assert "claude-opus-4" in out
assert "web_search" in out
def test_status_reflects_consent_set_via_config(home, capsys):
# Opting in is a plain config write now (no `enable` verb). status should
# reflect consent_state=aggregate as aggregate metrics being on.
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg.setdefault("telemetry", {})["consent_state"] = "aggregate"
save_config(cfg)
_run("status")
out = capsys.readouterr().out
assert "consent_state=aggregate" in out
assert "Aggregate metrics: on" in out
def test_status_shows_optin_hint_when_unknown(home, capsys):
_run("status")
out = capsys.readouterr().out
assert "Aggregate metrics: off" in out
assert "config set telemetry.consent_state aggregate" in out
def test_allow_aggregate_false_keeps_metrics_off_in_status(home, capsys):
# Even with consent opted in, a managed allow_aggregate:false wins.
from hermes_cli.config import load_config, save_config
cfg = load_config()
tel = cfg.setdefault("telemetry", {})
tel["consent_state"] = "aggregate"
tel["allow_aggregate"] = False
save_config(cfg)
_run("status")
out = capsys.readouterr().out
assert "Aggregate metrics: off" in out
assert "allow_aggregate is false" in out
def test_local_off_with_consent_shows_inert_in_status(home, capsys):
# local off + opted in: aggregate is off and the status explains why.
from hermes_cli.config import load_config, save_config
cfg = load_config()
tel = cfg.setdefault("telemetry", {})
tel["local"] = False
tel["consent_state"] = "aggregate"
save_config(cfg)
_run("status")
out = capsys.readouterr().out
assert "Aggregate metrics: off" in out
assert "inert: local telemetry is off" in out

View File

@@ -0,0 +1,107 @@
"""Emitter tests — the hot-path invariant is the one that matters most.
Invariant: emit() never blocks, never raises, and a broken writer cannot slow or
break the caller. Plus: JSONL + SQLite round-trip, and the SQLite index is rebuildable
from the JSONL source of truth.
"""
from __future__ import annotations
import sqlite3
import time
import hermes_state
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _fresh_db(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
return db
def test_emit_is_fast_even_when_writer_is_broken(tmp_path, monkeypatch):
"""The core guarantee: a writer that raises AND sleeps cannot stall emit()."""
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
# Sabotage the row indexer to raise after a long sleep.
def broken(_conn, _ev):
time.sleep(5.0)
raise RuntimeError("writer exploded")
monkeypatch.setattr(em, "_index_one", broken)
start = time.monotonic()
for i in range(50):
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=10))
elapsed = time.monotonic() - start
# 50 emits must complete in well under the writer's single 5s sleep.
assert elapsed < 1.0, f"emit() blocked: {elapsed:.2f}s"
em.close()
def test_emit_never_raises_on_bad_event(tmp_path):
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
# Non-serializable / wrong-shaped inputs must not raise out of emit().
em.emit(object()) # no to_dict, not a mapping
em.emit({"event": "run"}) # minimal dict
em.close()
def test_jsonl_and_sqlite_roundtrip(tmp_path):
db = _fresh_db(tmp_path)
jsonl = tmp_path / "telemetry" / "events.jsonl"
em = TelemetryEmitter(events_path=jsonl, db_path=db)
em.emit(RunEvent(run_id="run1", trace_id="t1", entrypoint="cli", end_reason="completed"))
em.emit(ModelCallEvent(span_id="m1", run_id="run1", provider="anthropic",
model="claude-opus-4", input_tokens=100, output_tokens=20))
em.emit(ToolCallEvent(span_id="tc1", run_id="run1", tool_name="web_search",
duration_ms=120, result_class="ok"))
em.flush()
em.close()
# JSONL has all three lines
lines = [l for l in jsonl.read_text(encoding="utf-8").splitlines() if l.strip()]
assert len(lines) == 3
# SQLite index has the rows in the right tables
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 1
assert conn.execute("SELECT COUNT(*) FROM tel_tool_calls").fetchone()[0] == 1
row = conn.execute("SELECT provider, model, input_tokens FROM tel_model_calls").fetchone()
assert row == ("anthropic", "claude-opus-4", 100)
conn.close()
def test_unknown_event_kind_is_ignored_not_fatal(tmp_path):
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
em.emit({"event": "totally_unknown", "foo": "bar"})
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli"))
em.flush()
em.close()
conn = sqlite3.connect(db)
# The unknown event is in JSONL but skipped by the indexer; the known one indexes.
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
conn.close()
def test_disabled_emitter_writes_nothing(tmp_path):
db = _fresh_db(tmp_path)
jsonl = tmp_path / "telemetry" / "events.jsonl"
em = TelemetryEmitter(events_path=jsonl, db_path=db, enabled=False)
em.emit(RunEvent(run_id="r3", trace_id="t3", entrypoint="cli"))
em.flush()
em.close()
assert not jsonl.exists()
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 0
conn.close()

View File

@@ -0,0 +1,113 @@
"""Export redaction pipeline tests — the security-critical layer.
Invariants:
* Secrets ALWAYS stripped, every mode, no flag disables it.
* Content gated by telemetry.trajectories, not a redaction mode.
* PII stripped in 'pii' mode; structure preserved (codec-aware).
"""
from __future__ import annotations
import json
from agent.telemetry import redaction as R
# ── secrets are always redacted ─────────────────────────────────────────────
def test_secrets_stripped_in_none_mode():
text = "here is sk-ant-api03-SECRETKEY123 and a token"
out = R.redact_for_export(text, content_mode=R.CONTENT_NONE)
assert "SECRETKEY123" not in out
def test_secrets_stripped_in_pii_mode():
text = "Authorization: Bearer abcdef123456789secret"
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
assert "abcdef123456789secret" not in out
def test_secret_redactor_fails_closed(monkeypatch):
# If the underlying redactor raises, we must NOT return the raw string.
import agent.redact as ar
monkeypatch.setattr(ar, "redact_sensitive_text", lambda *a, **k: (_ for _ in ()).throw(RuntimeError()))
out = R.redact_for_export("sk-secret-value", content_mode=R.CONTENT_NONE)
assert "sk-secret-value" not in out
assert out == "[redaction-unavailable]"
# ── PII ─────────────────────────────────────────────────────────────────────
def test_pii_mode_strips_email_and_phone():
text = "contact alice@example.com or +1 415 555 1234"
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
assert "alice@example.com" not in out
assert "[email]" in out
assert "555" not in out or "[phone]" in out
def test_none_mode_keeps_nonsecret_text_but_drops_via_message_path():
# redact_for_export(none) scrubs secrets but doesn't strip ordinary words;
# content *dropping* happens at the message layer (trajectories gate).
out = R.redact_for_export("just ordinary words", content_mode=R.CONTENT_NONE)
assert "ordinary" in out
# ── trajectories gate (content_export_enabled) ──────────────────────────────
def test_content_export_disabled_by_default():
assert R.content_export_enabled({}) is False
assert R.content_export_enabled({"telemetry": {}}) is False
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": False}}}) is False
def test_content_export_enabled_when_trajectories_on():
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": True}}}) is True
# ── codec-aware message redaction ───────────────────────────────────────────
def test_message_structural_only_when_content_excluded():
msg = {"role": "user", "content": "my email is bob@x.com and key sk-12345"}
out = R.redact_message(msg, include_content=False)
assert out["role"] == "user"
assert "content" not in out # body dropped entirely
assert out["content_chars"] == len(msg["content"]) # only the size remains
assert "bob@x.com" not in json.dumps(out)
def test_message_content_included_is_redacted():
msg = {"role": "user", "content": "email bob@x.com secret sk-ant-SECRET999"}
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
assert "content" in out
assert "SECRET999" not in out["content"] # secret gone
assert "bob@x.com" not in out["content"] # pii gone
assert "[email]" in out["content"]
def test_tool_calls_redacted_names_kept_args_scrubbed():
msg = {
"role": "assistant",
"tool_calls": json.dumps([
{"function": {"name": "web_search", "arguments": '{"q": "email me at z@z.com"}'}}
]),
}
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
tc = out["tool_calls"]
assert tc[0]["name"] == "web_search" # structure/name preserved
assert "z@z.com" not in json.dumps(tc) # arg pii scrubbed
def test_tool_calls_counted_when_content_excluded():
msg = {
"role": "assistant",
"tool_calls": json.dumps([
{"function": {"name": "a", "arguments": "{}"}},
{"function": {"name": "b", "arguments": "{}"}},
]),
}
out = R.redact_message(msg, include_content=False)
assert out["tool_call_count"] == 2
assert "tool_calls" not in out
def test_content_mode_for_reads_config():
assert R.content_mode_for({"telemetry": {"content_redaction": "pii"}}) == "pii"
assert R.content_mode_for({"telemetry": {"content_redaction": "bogus"}}) == "none"
assert R.content_mode_for({}) == "none"

View File

@@ -0,0 +1,102 @@
"""Bulk export tests — telemetry always, content only behind the trajectories gate."""
from __future__ import annotations
import io
import json
import sqlite3
import time
import hermes_state
from agent.telemetry import exporter_bulk
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _seed(tmp_path, with_secret_content=True):
db = tmp_path / "state.db"
sdb = hermes_state.SessionDB(db_path=db)
# telemetry
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=now - 60_000_000, end_ns=now, model_call_count=1, tool_call_count=1))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-sonnet-4", input_tokens=1000, output_tokens=100))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
em.flush()
em.close()
# session + message content (with an embedded secret + email)
sdb.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
if with_secret_content:
sdb.append_message("s1", role="user",
content="my key is AKIAIOSFODNN7EXAMPLE and email me at carol@corp.com")
sdb.append_message("s1", role="assistant", content="ok")
sdb.close()
return db
def test_telemetry_exported_content_excluded_by_default(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=False, config={}, db_path=db)
assert counts["telemetry"] >= 3
assert counts["content_included"] == 0
text = buf.getvalue()
# message bodies must NOT be present
assert "carol@corp.com" not in text
assert "AKIAIOSFODNN7EXAMPLE" not in text
# but a session record (structural) is present with message structure
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
sess = [r for r in lines if r.get("_kind") == "session"]
assert sess and sess[0]["messages"]
assert "content" not in sess[0]["messages"][0] # structural only
assert "content_chars" in sess[0]["messages"][0]
def test_include_content_ignored_without_trajectories(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
# request content but trajectories disabled -> forced off
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True,
config={"telemetry": {"trajectories": {"enabled": False}}}, db_path=db)
assert counts["content_included"] == 0
assert "carol@corp.com" not in buf.getvalue()
def test_content_included_when_trajectories_on_but_redacted(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
cfg = {"telemetry": {"trajectories": {"enabled": True}, "content_redaction": "pii"}}
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True, config=cfg, db_path=db)
assert counts["content_included"] == 1
text = buf.getvalue()
# content present but secret + pii scrubbed
assert "AKIAIOSFODNN7EXAMPLE" not in text # secret always gone
assert "carol@corp.com" not in text # pii gone in pii mode
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
sess = [r for r in lines if r.get("_kind") == "session"][0]
# the user message now has a (redacted) content field
user_msg = [m for m in sess["messages"] if m["role"] == "user"][0]
assert "content" in user_msg
assert "[email]" in user_msg["content"]
def test_json_format_roundtrips(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
exporter_bulk.export(buf, fmt="json", include_content=False, config={}, db_path=db)
obj = json.loads(buf.getvalue())
assert "records" in obj
assert any(r["_kind"] == "tel_runs" for r in obj["records"])
def test_since_window_filters_runs(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
# since 1ns ago in the future-ish -> the run (start ~60ms ago) excluded
future_ns = int((time.time() + 1) * 1e9)
counts = exporter_bulk.export(buf, fmt="ndjson", since_ns=future_ns, config={}, db_path=db)
lines = [json.loads(l) for l in buf.getvalue().splitlines() if l.strip()]
runs = [r for r in lines if r.get("_kind") == "tel_runs"]
assert runs == []

View File

@@ -0,0 +1,95 @@
"""Export configuration visibility in `hermes telemetry status`.
The status Export block reports the current export configuration. Whether a key is
locked is handled by the managed-scope layer, not repeated here; the allow_aggregate
gate is covered by a test so a managed pin can't be regressed.
"""
from __future__ import annotations
import time
import types
import pytest
import hermes_state
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
hermes_state.SessionDB(db_path=tmp_path / "state.db")
yield tmp_path
def _status(capsys):
from hermes_cli.main import cmd_telemetry
cmd_telemetry(types.SimpleNamespace(telemetry_action="status"))
return capsys.readouterr().out
def test_export_block_default_shows_otlp_disabled(home, capsys):
out = _status(capsys)
assert "Export" in out
assert "OTLP:" in out and "disabled" in out
assert "Content export: off" in out
assert "Secret redaction: on (always)" in out
def test_export_block_shows_endpoint_host_never_token(home, capsys, monkeypatch):
from hermes_cli.config import load_config, save_config
monkeypatch.setenv("CORP_OTLP_TOKEN", "supersecret-do-not-print")
c = load_config()
t = c.setdefault("telemetry", {})
t.setdefault("export", {})["otlp"] = {
"enabled": True,
"endpoint": "https://collector.corp:4318/v1/traces",
"headers_env": {"Authorization": "CORP_OTLP_TOKEN"},
}
save_config(c)
out = _status(capsys)
# endpoint host present
assert "https://collector.corp:4318/v1/traces" in out
# env var name + set-state present; the VALUE never printed
assert "CORP_OTLP_TOKEN" in out
assert "(set)" in out
assert "supersecret-do-not-print" not in out
def test_export_block_reflects_trajectories_gate(home, capsys):
from hermes_cli.config import load_config, save_config
c = load_config()
c.setdefault("telemetry", {})["trajectories"] = {"enabled": True}
save_config(c)
out = _status(capsys)
assert "Content export: on (trajectories enabled)" in out
def test_token_env_not_set_shows_not_set(home, capsys):
from hermes_cli.config import load_config, save_config
c = load_config()
t = c.setdefault("telemetry", {})
t.setdefault("export", {})["otlp"] = {
"enabled": True,
"endpoint": "https://x:4318/v1/traces",
"headers_env": {"Authorization": "TOTALLY_UNSET_ENV_VAR_XYZ"},
}
save_config(c)
out = _status(capsys)
assert "(NOT set)" in out
def test_allow_aggregate_pin_blocks_opt_in(home):
"""A managed allow_aggregate:false pin overrides a consent_state opt-in.
Consent is set in config (as a user or managed-scope pin would); the hard gate
still wins, so may_upload stays false.
"""
from hermes_cli.config import load_config, save_config
from agent.telemetry import policy
c = load_config()
tel = c.setdefault("telemetry", {})
tel["consent_state"] = "aggregate"
tel["allow_aggregate"] = False
save_config(c)
assert policy.may_upload_aggregate(load_config()) is False

View File

@@ -0,0 +1,82 @@
"""Insights ↔ telemetry integration: the observability section in /insights output."""
from __future__ import annotations
import time
import pytest
from hermes_state import SessionDB
from agent.insights import InsightsEngine
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
@pytest.fixture()
def db(tmp_path):
session_db = SessionDB(db_path=tmp_path / "ins_tel.db")
yield session_db
session_db.close()
def _seed_telemetry(db_path):
em = TelemetryEmitter(events_path=db_path.parent / "tel" / "events.jsonl", db_path=db_path)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
platform="telegram", end_reason="completed",
start_ns=now - 90_000_000, end_ns=now))
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli",
end_reason="failed", start_ns=now - 11_000_000, end_ns=now))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=5000, output_tokens=800,
cache_read_tokens=1000, latency_ms=2200))
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search", result_class="ok"))
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate", result_class="error"))
em.flush()
em.close()
def test_report_includes_telemetry_when_present(db):
# A session so generate() isn't the empty branch
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
_seed_telemetry(db.db_path)
engine = InsightsEngine(db)
report = engine.generate(days=30)
tel = report.get("telemetry")
assert tel, "telemetry section missing"
assert tel["workflows"]["total_runs"] == 2
assert tel["workflows"]["success_rate"] == 0.5
assert tel["tool_calls"]["total"] == 2
assert tel["tool_calls"]["failure_rate"] == 0.5
assert tel["model_calls"]["by_provider"]["anthropic"] == 1
def test_terminal_output_renders_observability_section(db):
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
_seed_telemetry(db.db_path)
engine = InsightsEngine(db)
out = engine.format_terminal(engine.generate(days=30))
assert "Observability" in out
assert "Workflows:" in out
assert "Failure rate:" in out
assert "Providers:" in out
def test_telemetry_section_absent_when_no_tel_rows(db):
# Session present, but no telemetry events seeded.
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report.get("telemetry") == {}
out = engine.format_terminal(report)
assert "Observability" not in out
def test_empty_report_has_telemetry_key(db):
# No sessions at all -> empty branch still carries the key (renderer-safe).
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report.get("empty") is True
assert report.get("telemetry") == {}

View File

@@ -0,0 +1,171 @@
"""OTLP exporter tests.
Skip cleanly when the optional OTel SDK isn't installed. When it is, verify:
* event -> OTel span attribute mapping
* headers_env resolves the value from the named environment variable, not config
* a failing or slow subscriber never breaks the emitter hot path
* is_enabled / is_available gating
"""
from __future__ import annotations
import sqlite3
import time
import pytest
import hermes_state
from agent.telemetry import otlp_exporter as OE
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
otel = pytest.importorskip("opentelemetry.sdk.trace", reason="otlp extra not installed")
def _in_memory_provider():
"""A TracerProvider with an in-memory span exporter (no network)."""
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
provider = TracerProvider()
mem = InMemorySpanExporter()
provider.add_span_processor(SimpleSpanProcessor(mem))
return provider, mem
def test_event_maps_to_span_with_real_attrs():
provider, mem = _in_memory_provider()
batch = [
{"event": "run", "entrypoint": "gateway", "platform": "telegram",
"end_reason": "completed", "model_call_count": 2, "tool_call_count": 3},
{"event": "model_call", "provider": "anthropic", "model": "claude-opus-4",
"input_tokens": 5000, "output_tokens": 800},
{"event": "tool_call", "tool_name": "web_search", "result_class": "ok"},
]
n = OE.export_batch(provider, batch)
assert n == 3
spans = mem.get_finished_spans()
names = {s.name for s in spans}
assert names == {"hermes.run", "hermes.model_call", "hermes.tool_call"}
# real values present as span attributes
run = [s for s in spans if s.name == "hermes.run"][0]
assert run.attributes["hermes.entrypoint"] == "gateway"
assert run.attributes["hermes.platform"] == "telegram"
model = [s for s in spans if s.name == "hermes.model_call"][0]
assert model.attributes["hermes.model"] == "claude-opus-4"
assert model.attributes["hermes.provider"] == "anthropic"
tool = [s for s in spans if s.name == "hermes.tool_call"][0]
assert tool.attributes["hermes.tool_name"] == "web_search"
def test_headers_resolve_from_env_not_value(monkeypatch):
monkeypatch.setenv("MY_OTLP_TOKEN", "supersecretvalue")
resolved = OE._resolve_headers({"Authorization": "MY_OTLP_TOKEN"})
assert resolved == {"Authorization": "supersecretvalue"}
# missing env var -> skipped, not crashed
assert OE._resolve_headers({"X": "NOPE_NOT_SET"}) == {}
def test_is_enabled_requires_endpoint_and_flag():
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}) is True
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True}}}}) is False
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": False, "endpoint": "http://x"}}}}) is False
assert OE.is_enabled({}) is False
def test_require_sdk_routes_through_lazy_install(monkeypatch):
# _require_sdk(auto_install=True) should call lazy_deps.ensure('export.otlp').
import tools.lazy_deps as ld
calls = []
monkeypatch.setattr(ld, "ensure", lambda feature, **kw: calls.append((feature, kw)))
OE._require_sdk(auto_install=True, prompt=False)
assert calls == [("export.otlp", {"prompt": False})]
def test_is_available_does_not_install(monkeypatch):
# A pure availability check must NEVER trigger an install.
import tools.lazy_deps as ld
calls = []
monkeypatch.setattr(ld, "ensure", lambda *a, **k: calls.append(a))
OE.is_available()
assert calls == []
def test_export_otlp_feature_specs_match_pyproject():
# The LAZY_DEPS entry must track the [otlp] extra in pyproject.toml.
from tools.lazy_deps import feature_specs
import pathlib, re
specs = set(feature_specs("export.otlp"))
pyproject = pathlib.Path(__file__).resolve().parents[2] / "pyproject.toml"
text = pyproject.read_text(encoding="utf-8")
m = re.search(r"^otlp\s*=\s*\[([^\]]*)\]", text, re.MULTILINE)
assert m, "otlp extra not found in pyproject.toml"
extra = set(re.findall(r'"([^"]+)"', m.group(1)))
assert specs == extra, f"LAZY_DEPS {specs} != pyproject extra {extra}"
def test_streamer_subscription_receives_events(tmp_path, monkeypatch):
# Wire an OTLPStreamer-like subscriber via the in-memory provider.
provider, mem = _in_memory_provider()
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
def subscriber(batch):
OE.export_batch(provider, batch)
em.subscribe(subscriber)
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed"))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic", model="claude-opus-4"))
em.flush()
em.close()
spans = mem.get_finished_spans()
assert {s.name for s in spans} == {"hermes.run", "hermes.model_call"}
def test_failing_subscriber_never_breaks_hot_path(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
def bad_subscriber(batch):
time.sleep(0.2)
raise RuntimeError("OTLP collector down")
em.subscribe(bad_subscriber)
start = time.monotonic()
for i in range(30):
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=1))
elapsed = time.monotonic() - start
# emit() returns immediately regardless of the broken subscriber
assert elapsed < 1.0
em.flush()
# durable writes still happened despite the subscriber raising
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 30
conn.close()
em.close()
def test_export_once_reads_db_and_returns_count(tmp_path, monkeypatch):
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=time.time_ns(), end_ns=time.time_ns()))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
em.flush(); em.close()
# Patch the provider builder to an in-memory one (no network). The processor
# stand-in only needs force_flush(); provider.shutdown() works on the real one.
provider, mem = _in_memory_provider()
class _Proc:
def force_flush(self, *a, **k):
return True
monkeypatch.setattr(OE, "_make_provider", lambda config: (provider, _Proc()))
n = OE.export_once({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}, db_path=db)
assert n == 2
assert len(mem.get_finished_spans()) == 2

View File

@@ -0,0 +1,53 @@
"""Telemetry plugin auto-load gating: on by default, off when telemetry.local=false."""
from __future__ import annotations
import hermes_cli.plugins as plugins_mod
def test_local_enabled_defaults_true(monkeypatch):
monkeypatch.setattr(plugins_mod, "load_config", lambda: {}, raising=False)
# With no telemetry section, the default is on.
monkeypatch.setattr(
"hermes_cli.config.load_config", lambda: {}, raising=False
)
assert plugins_mod._telemetry_local_enabled() is True
def test_local_disabled_when_config_false(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": {"local": False}},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is False
def test_local_enabled_when_config_true(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": {"local": True}},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is True
def test_malformed_config_defaults_on(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": "not a dict"},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is True
def test_plugin_manifest_is_discoverable():
"""The bundled telemetry plugin.yaml exists and declares the lifecycle hooks."""
from pathlib import Path
import hermes_cli.plugins as p
bundled = p.get_bundled_plugins_dir()
manifest = bundled / "telemetry" / "plugin.yaml"
assert manifest.exists(), f"missing {manifest}"
text = manifest.read_text(encoding="utf-8")
for hook in ("post_api_request", "post_tool_call", "on_session_finalize"):
assert hook in text

View File

@@ -0,0 +1,127 @@
"""End-to-end telemetry wiring test.
Unlike test_plugin_hooks.py (which calls the plugin's ``_on_*`` callbacks directly),
this drives the REAL dispatch chain that core uses at runtime:
discover_plugins() -> plugin registers hooks -> invoke_hook(name, **kwargs)
-> registered callback -> emitter -> tel_* tables
If the bundled plugin stops auto-loading, stops registering a hook, or the hook name
drifts from what core fires, the hand-written hook tests still pass but real runs go
dark. This test is the guard against that — it only touches public entry points
(``discover_plugins`` / ``invoke_hook``), exactly as core does.
"""
from __future__ import annotations
import sqlite3
import time
import pytest
import hermes_state
@pytest.fixture
def runtime(tmp_path, monkeypatch):
"""A clean HERMES_HOME with state.db, a fresh plugin manager, and a fresh emitter."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
db = tmp_path / "state.db"
hermes_state.SessionDB(db_path=db)
# Reset the global plugin-manager singleton so discovery re-runs in this HERMES_HOME.
import hermes_cli.plugins as plugins_mod
monkeypatch.setattr(plugins_mod, "_plugin_manager", None, raising=False)
# Reset the emitter singleton so it binds to this state.db (and tear it down after).
from agent.telemetry import emitter as emitter_mod
emitter_mod.reset_emitter_for_tests(None)
# Clear the plugin's per-run accumulators between tests.
import plugins.telemetry as plug
plug._runs.clear()
yield db, plugins_mod, emitter_mod, plug
try:
emitter_mod.get_emitter().flush()
except Exception:
pass
emitter_mod.reset_emitter_for_tests(None)
monkeypatch.setattr(plugins_mod, "_plugin_manager", None, raising=False)
def _fire_one_turn(invoke_hook):
"""Fire the hook sequence of a single completed turn, as core does."""
invoke_hook("on_session_start", session_id="s1",
model="anthropic/claude-opus-4", platform="cli")
invoke_hook("post_api_request", session_id="s1", platform="cli",
provider="anthropic", base_url=None, model="claude-opus-4",
api_duration=0.9,
usage={"input_tokens": 1000, "output_tokens": 120,
"cache_read_tokens": 0, "cache_write_tokens": 0,
"reasoning_tokens": 0})
invoke_hook("post_tool_call", session_id="s1", platform="cli",
function_name="web_search", duration_ms=210, result='{"data": "ok"}')
invoke_hook("on_session_finalize", session_id="s1", platform="cli",
reason="shutdown")
def test_real_dispatch_writes_tel_rows(runtime):
"""The bundled plugin, loaded via discover_plugins, captures a turn end to end."""
db, plugins_mod, emitter_mod, _plug = runtime
plugins_mod.discover_plugins(force=True)
# The plugin must have registered the lifecycle hooks core fires.
mgr = plugins_mod.get_plugin_manager()
registered = {k for k, v in getattr(mgr, "_hooks", {}).items() if v}
for hook in ("on_session_start", "post_api_request", "post_tool_call",
"on_session_finalize"):
assert hook in registered, f"core hook {hook!r} not registered by the plugin"
_fire_one_turn(plugins_mod.invoke_hook)
time.sleep(0.5) # let the background writer drain
emitter_mod.get_emitter().flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
assert conn.execute("SELECT COUNT(*) c FROM tel_runs").fetchone()["c"] == 1
assert conn.execute("SELECT COUNT(*) c FROM tel_model_calls").fetchone()["c"] == 1
assert conn.execute("SELECT COUNT(*) c FROM tel_tool_calls").fetchone()["c"] == 1
# Real values, not buckets.
mc = conn.execute("SELECT provider, model FROM tel_model_calls").fetchone()
assert mc["provider"] == "anthropic"
assert mc["model"] == "claude-opus-4"
tc = conn.execute("SELECT tool_name FROM tel_tool_calls").fetchone()
assert tc["tool_name"] == "web_search"
run = conn.execute("SELECT end_reason, model_call_count, tool_call_count "
"FROM tel_runs").fetchone()
assert run["end_reason"] == "completed"
assert run["model_call_count"] == 1
assert run["tool_call_count"] == 1
conn.close()
def test_local_disabled_writes_nothing(runtime, monkeypatch):
"""telemetry.local=false: the plugin does not auto-load, so no rows are written."""
db, plugins_mod, emitter_mod, _plug = runtime
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": {"local": False}},
raising=False,
)
plugins_mod.discover_plugins(force=True)
_fire_one_turn(plugins_mod.invoke_hook)
time.sleep(0.3)
try:
emitter_mod.get_emitter().flush()
except Exception:
pass
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 0
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 0
conn.close()

View File

@@ -0,0 +1,134 @@
"""Telemetry plugin hook tests — feed realistic kwargs, assert tel_* rows + no leak."""
from __future__ import annotations
import sqlite3
import pytest
import hermes_state
from agent.telemetry import emitter as emitter_mod
from agent.telemetry.emitter import TelemetryEmitter
@pytest.fixture
def wired(tmp_path, monkeypatch):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
emitter_mod.reset_emitter_for_tests(em)
# reset the plugin's per-run accumulators between tests
import plugins.telemetry as plug
plug._runs.clear()
yield db, em, plug
em.flush()
em.close()
emitter_mod.reset_emitter_for_tests(None)
def test_full_session_lifecycle_produces_rows(wired):
db, em, plug = wired
plug._on_session_start(session_id="sess1", platform="telegram")
plug._on_post_api_request(
session_id="sess1", platform="telegram",
provider="anthropic", base_url=None, model="claude-opus-4",
api_duration=2.5,
usage={"input_tokens": 5000, "output_tokens": 800, "cache_read_tokens": 1000,
"cache_write_tokens": 0, "reasoning_tokens": 0},
)
plug._on_post_tool_call(
session_id="sess1", platform="telegram",
function_name="web_search", duration_ms=812, result="{\"data\": \"...\"}",
)
# Production finalize callers pass `reason` (e.g. "shutdown"), not cost.
plug._on_session_finalize(
session_id="sess1", platform="telegram", reason="shutdown",
)
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
run = conn.execute("SELECT * FROM tel_runs").fetchone()
assert run is not None
assert run["entrypoint"] == "gateway"
assert run["platform"] == "telegram"
assert run["end_reason"] == "completed"
assert run["model_call_count"] == 1
assert run["tool_call_count"] == 1
mc = conn.execute("SELECT * FROM tel_model_calls").fetchone()
assert mc["provider"] == "anthropic"
assert mc["model"] == "claude-opus-4"
assert mc["input_tokens"] == 5000
tc = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
assert tc["tool_name"] == "web_search"
assert tc["result_class"] == "ok"
conn.close()
def test_tool_error_result_classified_and_counted(wired):
db, em, plug = wired
plug._on_session_start(session_id="s2", platform="cli")
plug._on_post_tool_call(
session_id="s2", function_name="terminal", duration_ms=10,
result="{\"error\": \"command failed\"}",
)
plug._on_session_finalize(session_id="s2", reason="shutdown")
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
tc = conn.execute("SELECT result_class, tool_name FROM tel_tool_calls").fetchone()
assert tc["result_class"] == "error"
assert tc["tool_name"] == "terminal"
run = conn.execute("SELECT error_count FROM tel_runs").fetchone()
assert run["error_count"] == 1
conn.close()
def test_api_error_recorded(wired):
db, em, plug = wired
plug._on_session_start(session_id="s3", platform="cli")
plug._on_api_request_error(session_id="s3", error_type="provider timeout after 60s")
plug._on_session_finalize(session_id="s3", failed=True)
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
err = conn.execute("SELECT error_class, subsystem FROM tel_error_events").fetchone()
assert err["error_class"] == "provider_timeout"
assert err["subsystem"] == "model_api"
run = conn.execute("SELECT end_reason FROM tel_runs").fetchone()
assert run["end_reason"] == "failed"
conn.close()
def test_hooks_never_raise_on_garbage_kwargs(wired):
_, em, plug = wired
# Missing everything — must be swallowed by the _safe wrapper.
plug._on_post_api_request()
plug._on_post_tool_call()
plug._on_session_finalize()
plug._on_api_request_error()
em.flush()
def test_no_message_content_in_tool_rows(wired):
"""The tool hook receives a result blob; only the classification persists, not content."""
db, em, plug = wired
plug._on_session_start(session_id="s4", platform="cli")
secret = "{\"data\": \"USER SECRET sk-ABCDEF and /Users/alice/file.txt\"}"
plug._on_post_tool_call(session_id="s4", function_name="web_search",
duration_ms=5, result=secret)
plug._on_session_finalize(session_id="s4")
em.flush()
# The whole tel_tool_calls row must contain none of the result content.
conn = sqlite3.connect(db)
row = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
conn.close()
blob = " ".join(str(x) for x in row)
assert "sk-ABCDEF" not in blob
assert "/Users/alice" not in blob
assert "SECRET" not in blob

View File

@@ -0,0 +1,53 @@
"""Consent gate tests.
Consent is a single config field (``telemetry.consent_state``); the aggregate opt-in
is expressed by setting it to ``"aggregate"`` (via ``hermes config set`` or a
managed-scope pin). ``allow_aggregate`` is the hard gate. ``policy.may_upload_aggregate``
is the gate a future uploader must consult.
"""
from __future__ import annotations
from agent.telemetry import policy
def _cfg(**telemetry):
return {"telemetry": telemetry}
def test_default_posture_never_uploads():
# No consent recorded → unknown → never uploads.
assert policy.may_upload_aggregate(_cfg(local=True, consent_state="unknown")) is False
def test_missing_telemetry_block_never_uploads():
assert policy.may_upload_aggregate({}) is False
def test_opted_in_uploads():
assert policy.may_upload_aggregate(_cfg(consent_state="aggregate")) is True
def test_declined_does_not_upload():
assert policy.may_upload_aggregate(_cfg(consent_state="local")) is False
def test_allow_aggregate_false_overrides_opt_in():
# An admin pins telemetry.allow_aggregate: false via managed scope.
cfg = _cfg(consent_state="aggregate", allow_aggregate=False)
assert policy.may_upload_aggregate(cfg) is False # the hard gate wins
def test_local_off_makes_aggregate_inert():
# Aggregate metrics derive from the local tables; with local off there is
# nothing to aggregate, so opting in cannot upload.
cfg = _cfg(local=False, consent_state="aggregate", allow_aggregate=True)
assert policy.may_upload_aggregate(cfg) is False
def test_install_id_minted_when_empty_and_stable_when_set():
cfg = _cfg(install_id="")
minted = policy.ensure_install_id(cfg)
assert minted and len(minted) >= 32 # uuid4
cfg2 = _cfg(install_id="fixed-id")
assert policy.ensure_install_id(cfg2) == "fixed-id"

View File

@@ -0,0 +1,88 @@
"""rollup tests: tel_* -> per-run summary events with REAL values (local only)."""
from __future__ import annotations
import sqlite3
import time
import hermes_state
from agent.telemetry import rollup
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _seed(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
platform="telegram", end_reason="completed",
start_ns=now - 90_000_000, end_ns=now,
model_call_count=2, tool_call_count=2))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=60000, output_tokens=8000))
em.emit(ModelCallEvent(span_id="m2", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=5000, output_tokens=500))
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search",
result_class="ok"))
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate",
result_class="ok"))
# an in-progress run (no end_ns) must be excluded
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli", start_ns=now))
em.flush()
em.close()
return db
def test_builds_one_event_per_completed_run_with_real_values(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db,
include_heartbeat=False)
wf = [e for e in events if e["event_name"] == "workflow_completed"]
assert len(wf) == 1 # r2 (no end_ns) excluded
e = wf[0]
assert e["entrypoint"] == "gateway"
assert e["platform"] == "telegram"
# REAL model id + provider, not a bucket/class
models = {m["model"] for m in e["models_used"]}
assert models == {"claude-opus-4"}
assert e["models_used"][0]["provider"] == "anthropic"
assert sorted(e["tools_used"]) == ["browser_navigate", "web_search"]
# real token totals, not buckets
assert e["input_tokens"] == 65000
assert e["output_tokens"] == 8500
def test_real_model_and_tool_names_present(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
blob = " ".join(str(v) for e in events for v in e.values())
assert "claude-opus-4" in blob
assert "web_search" in blob
def test_heartbeat_included_by_default(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
assert any(e["event_name"] == "heartbeat" for e in events)
def test_summarize_counts_by_event_name(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
s = rollup.summarize(events)
assert s["total"] == len(events)
assert s["by_event_name"]["workflow_completed"] == 1
assert s["by_event_name"]["heartbeat"] == 1
def test_empty_db_yields_only_heartbeat(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
events = rollup.build_aggregate_events(install_id="x", db_path=db)
assert [e["event_name"] for e in events] == ["heartbeat"]

View File

@@ -0,0 +1,45 @@
"""Schema tests: tel_* tables exist after init; SCHEMA_VERSION bumped."""
from __future__ import annotations
import sqlite3
import hermes_state
TEL_TABLES = {
"tel_runs", "tel_spans", "tel_model_calls", "tel_tool_calls",
"tel_error_events",
}
def test_schema_version_is_17_or_higher():
assert hermes_state.SCHEMA_VERSION >= 17
def test_tel_tables_present_in_schema_sql():
for tbl in TEL_TABLES:
assert f"CREATE TABLE IF NOT EXISTS {tbl}" in hermes_state.SCHEMA_SQL
def test_tel_tables_created_on_executescript(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
rows = {
r[0]
for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
conn.close()
assert TEL_TABLES.issubset(rows), f"missing: {TEL_TABLES - rows}"
def test_executescript_is_idempotent(tmp_path):
# IF NOT EXISTS means re-running on an existing DB is a no-op, not an error.
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.executescript(hermes_state.SCHEMA_SQL) # second run must not raise
conn.close()

View File

@@ -0,0 +1,109 @@
"""Trace/span layer: tel_spans is populated as a connected run -> calls tree.
Drives the real dispatch chain (discover_plugins -> invoke_hook) and asserts the
timing/lineage backbone in tel_spans:
- one root span per run (kind="run", parent_span_id NULL),
- one child span per model/tool call parented to the root,
- a single trace_id across the run,
- call detail rows (tel_model_calls / tel_tool_calls) JOIN to their span by span_id,
- reconstructed durations match the reported latency/duration.
This is the regression guard for the waterfall a desktop trace viewer renders.
"""
from __future__ import annotations
import sqlite3
import time
import pytest
import hermes_state
@pytest.fixture
def runtime(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
db = tmp_path / "state.db"
hermes_state.SessionDB(db_path=db)
import hermes_cli.plugins as plugins_mod
monkeypatch.setattr(plugins_mod, "_plugin_manager", None, raising=False)
from agent.telemetry import emitter as emitter_mod
emitter_mod.reset_emitter_for_tests(None)
import plugins.telemetry as plug
plug._runs.clear()
yield db, plugins_mod, emitter_mod
try:
emitter_mod.get_emitter().flush()
except Exception:
pass
emitter_mod.reset_emitter_for_tests(None)
monkeypatch.setattr(plugins_mod, "_plugin_manager", None, raising=False)
def _one_turn(invoke_hook):
invoke_hook("on_session_start", session_id="s1",
model="anthropic/claude-opus-4", platform="cli")
invoke_hook("post_api_request", session_id="s1", platform="cli",
provider="anthropic", model="claude-opus-4", api_duration=0.9,
usage={"input_tokens": 1000, "output_tokens": 120})
invoke_hook("post_tool_call", session_id="s1", platform="cli",
function_name="web_search", duration_ms=210, result='{"data": "ok"}')
invoke_hook("on_session_finalize", session_id="s1", platform="cli",
reason="shutdown")
def test_tel_spans_forms_connected_trace(runtime):
db, plugins_mod, emitter_mod = runtime
plugins_mod.discover_plugins(force=True)
_one_turn(plugins_mod.invoke_hook)
time.sleep(0.5)
emitter_mod.get_emitter().flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
spans = conn.execute(
"SELECT span_id, parent_span_id, kind, name, start_ns, end_ns, status, trace_id "
"FROM tel_spans"
).fetchall()
# root + model + tool
assert len(spans) == 3
roots = [s for s in spans if s["parent_span_id"] is None]
children = [s for s in spans if s["parent_span_id"] is not None]
assert len(roots) == 1
assert roots[0]["kind"] == "run"
assert len(children) == 2
# single trace, all children parented to the root
assert len({s["trace_id"] for s in spans}) == 1
assert all(c["parent_span_id"] == roots[0]["span_id"] for c in children)
# spans are time-ordered and carry real durations
by_kind = {s["kind"]: s for s in spans}
assert (by_kind["model"]["end_ns"] - by_kind["model"]["start_ns"]) == 900 * 1_000_000
assert (by_kind["tool"]["end_ns"] - by_kind["tool"]["start_ns"]) == 210 * 1_000_000
assert by_kind["run"]["end_ns"] >= by_kind["run"]["start_ns"]
def test_detail_rows_join_to_spans(runtime):
db, plugins_mod, emitter_mod = runtime
plugins_mod.discover_plugins(force=True)
_one_turn(plugins_mod.invoke_hook)
time.sleep(0.5)
emitter_mod.get_emitter().flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
mc = conn.execute(
"SELECT m.model, s.kind, s.trace_id FROM tel_model_calls m "
"JOIN tel_spans s ON m.span_id = s.span_id"
).fetchone()
assert mc is not None and mc["model"] == "claude-opus-4" and mc["kind"] == "model"
tc = conn.execute(
"SELECT t.tool_name, s.kind FROM tel_tool_calls t "
"JOIN tel_spans s ON t.span_id = s.span_id"
).fetchone()
assert tc is not None and tc["tool_name"] == "web_search" and tc["kind"] == "tool"
conn.close()

View File

@@ -133,9 +133,9 @@ class TestValidateToolset:
def test_mcp_alias_uses_live_registry(self, monkeypatch):
reg = ToolRegistry()
reg.register(
name="mcp__dynserver__ping",
name="mcp_dynserver_ping",
toolset="mcp-dynserver",
schema=_make_schema("mcp__dynserver__ping", "Ping"),
schema=_make_schema("mcp_dynserver_ping", "Ping"),
handler=_dummy_handler,
)
reg.register_toolset_alias("dynserver", "mcp-dynserver")
@@ -144,7 +144,7 @@ class TestValidateToolset:
assert validate_toolset("dynserver") is True
assert validate_toolset("mcp-dynserver") is True
assert "mcp__dynserver__ping" in resolve_toolset("dynserver")
assert "mcp_dynserver_ping" in resolve_toolset("dynserver")
class TestGetToolsetInfo:

View File

@@ -738,6 +738,64 @@ class TestSensitiveInPlaceEditPattern:
assert key is None
class TestWindowsAbsolutePathFolding:
"""Windows absolute home / Hermes-home prefixes must fold to ~/ and
~/.hermes/ in dangerous-command detection.
Regression: on native Windows the home prefix uses backslash separators
(``C:\\Users\\alice\\.ssh\\authorized_keys``). Detection stripped backslash
escapes *before* folding, dissolving those separators, so writes to startup,
SSH, and Hermes config/env files returned "safe" without an approval prompt.
The OS-specific ``Path.home()`` / ``get_hermes_home()`` tests above only
exercise this branch on a Windows host; these monkeypatch a Windows-style
HOME/HERMES_HOME so the fold is verified on the POSIX CI runner too."""
def test_windows_home_bashrc_folds(self, monkeypatch):
monkeypatch.setenv("HOME", r"C:\Users\tester")
dangerous, key, _ = detect_dangerous_command(
r"echo 'pwned' > C:\Users\tester\.bashrc"
)
assert dangerous is True
assert key is not None
def test_windows_home_ssh_authorized_keys_multiseg_folds(self, monkeypatch):
# The multi-segment suffix (\.ssh\authorized_keys) must also have its
# separators normalized, not just the home prefix.
monkeypatch.setenv("HOME", r"C:\Users\tester")
dangerous, key, _ = detect_dangerous_command(
r"cat key >> C:\Users\tester\.ssh\authorized_keys"
)
assert dangerous is True
assert key is not None
def test_windows_home_forward_slash_folds(self, monkeypatch):
monkeypatch.setenv("HOME", r"C:\Users\tester")
dangerous, key, _ = detect_dangerous_command(
"cat key >> C:/Users/tester/.ssh/authorized_keys"
)
assert dangerous is True
assert key is not None
def test_windows_hermes_home_config_folds(self, monkeypatch):
# Hermes home nests under the user home on Windows; it must fold before
# the user-home rewrite eats its prefix.
monkeypatch.setenv("HOME", r"C:\Users\tester")
monkeypatch.setenv("HERMES_HOME", r"C:\Users\tester\.hermes")
dangerous, key, _ = detect_dangerous_command(
r"sed -i 's/manual/off/' C:\Users\tester\.hermes\config.yaml"
)
assert dangerous is True
assert key is not None
def test_windows_unrelated_path_not_flagged(self, monkeypatch):
monkeypatch.setenv("HOME", r"C:\Users\tester")
dangerous, key, _ = detect_dangerous_command(
r"cp report.txt C:\Users\tester\notes.txt"
)
assert dangerous is False
assert key is None
class TestProjectSensitiveTeePattern:
def test_tee_to_local_dotenv_requires_approval(self):
dangerous, key, desc = detect_dangerous_command("printenv | tee .env.local")

View File

@@ -30,10 +30,10 @@ class TestRegisterServerTools:
with patch("tools.registry.registry", mock_registry):
registered = _register_server_tools("my_srv", server, {})
assert "mcp__my_srv__my_tool" in registered
assert "mcp__my_srv__my_tool" in mock_registry.get_all_tool_names()
assert "mcp_my_srv_my_tool" in registered
assert "mcp_my_srv_my_tool" in mock_registry.get_all_tool_names()
assert validate_toolset("my_srv") is True
assert "mcp__my_srv__my_tool" in resolve_toolset("my_srv")
assert "mcp_my_srv_my_tool" in resolve_toolset("my_srv")
class TestRefreshTools:
@@ -53,11 +53,11 @@ class TestRefreshTools:
# Seed initial state: one old tool registered
mock_registry.register(
name="mcp__live_srv__old_tool", toolset="mcp-live_srv", schema={},
name="mcp_live_srv_old_tool", toolset="mcp-live_srv", schema={},
handler=lambda x: x, check_fn=lambda: True, is_async=False,
description="", emoji="",
)
server._registered_tool_names = ["mcp__live_srv__old_tool"]
server._registered_tool_names = ["mcp_live_srv_old_tool"]
# New tool list from server
new_tool = _make_mcp_tool("new_tool", "new behavior")
@@ -69,11 +69,11 @@ class TestRefreshTools:
with patch("tools.registry.registry", mock_registry):
await server._refresh_tools()
assert "mcp__live_srv__old_tool" not in mock_registry.get_all_tool_names()
assert "mcp__live_srv__old_tool" not in resolve_toolset("live_srv")
assert "mcp__live_srv__new_tool" in mock_registry.get_all_tool_names()
assert "mcp__live_srv__new_tool" in resolve_toolset("live_srv")
assert server._registered_tool_names == ["mcp__live_srv__new_tool"]
assert "mcp_live_srv_old_tool" not in mock_registry.get_all_tool_names()
assert "mcp_live_srv_old_tool" not in resolve_toolset("live_srv")
assert "mcp_live_srv_new_tool" in mock_registry.get_all_tool_names()
assert "mcp_live_srv_new_tool" in resolve_toolset("live_srv")
assert server._registered_tool_names == ["mcp_live_srv_new_tool"]
class TestMessageHandler:

View File

@@ -143,7 +143,7 @@ class TestSchemaConversion:
mcp_tool = _make_mcp_tool(name="read_file", description="Read a file")
schema = _convert_mcp_schema("filesystem", mcp_tool)
assert schema["name"] == "mcp__filesystem__read_file"
assert schema["name"] == "mcp_filesystem_read_file"
assert schema["description"] == "Read a file"
assert "properties" in schema["parameters"]
@@ -376,7 +376,7 @@ class TestSchemaConversion:
bare_tool = types.SimpleNamespace(name="probe", description="Probe")
schema = _convert_mcp_schema("srv", bare_tool)
assert schema["name"] == "mcp__srv__probe"
assert schema["name"] == "mcp_srv_probe"
assert schema["parameters"] == {"type": "object", "properties": {}}
def test_convert_mcp_schema_with_none_inputschema(self):
@@ -398,7 +398,7 @@ class TestSchemaConversion:
mcp_tool = _make_mcp_tool(name="list_dir")
schema = _convert_mcp_schema("my_server", mcp_tool)
assert schema["name"] == "mcp__my_server__list_dir"
assert schema["name"] == "mcp_my_server_list_dir"
def test_hyphens_sanitized_to_underscores(self):
"""Hyphens in tool/server names are replaced with underscores for LLM compat."""
@@ -407,7 +407,7 @@ class TestSchemaConversion:
mcp_tool = _make_mcp_tool(name="get-sum")
schema = _convert_mcp_schema("my-server", mcp_tool)
assert schema["name"] == "mcp__my_server__get_sum"
assert schema["name"] == "mcp_my_server_get_sum"
assert "-" not in schema["name"]
@@ -736,10 +736,10 @@ class TestDiscoverAndRegister:
_discover_and_register_server("fs", {"command": "npx", "args": []})
)
assert "mcp__fs__read_file" in registered
assert "mcp__fs__write_file" in registered
assert "mcp__fs__read_file" in mock_registry.get_all_tool_names()
assert "mcp__fs__write_file" in mock_registry.get_all_tool_names()
assert "mcp_fs_read_file" in registered
assert "mcp_fs_write_file" in registered
assert "mcp_fs_read_file" in mock_registry.get_all_tool_names()
assert "mcp_fs_write_file" in mock_registry.get_all_tool_names()
_servers.pop("fs", None)
@@ -767,8 +767,8 @@ class TestDiscoverAndRegister:
assert validate_toolset("myserver") is True
assert validate_toolset("mcp-myserver") is True
assert "mcp__myserver__ping" in resolve_toolset("myserver")
assert "mcp__myserver__ping" in resolve_toolset("mcp-myserver")
assert "mcp_myserver_ping" in resolve_toolset("myserver")
assert "mcp_myserver_ping" in resolve_toolset("mcp-myserver")
_servers.pop("myserver", None)
@@ -793,9 +793,9 @@ class TestDiscoverAndRegister:
_discover_and_register_server("srv", {"command": "test"})
)
entry = mock_registry._tools.get("mcp__srv__do_thing")
entry = mock_registry._tools.get("mcp_srv_do_thing")
assert entry is not None
assert entry.schema["name"] == "mcp__srv__do_thing"
assert entry.schema["name"] == "mcp_srv_do_thing"
assert "parameters" in entry.schema
assert entry.is_async is False
assert entry.toolset == "mcp-srv"
@@ -876,7 +876,7 @@ class TestMCPServerTask:
server = MCPServerTask("srv")
server._config = {"command": "test"}
server._tools = [_make_mcp_tool("old"), _make_mcp_tool("keep")]
server._registered_tool_names = ["mcp__srv__old", "mcp__srv__keep"]
server._registered_tool_names = ["mcp_srv_old", "mcp_srv_keep"]
server.session = MagicMock()
server.session.list_tools = AsyncMock(
return_value=SimpleNamespace(tools=[_make_mcp_tool("keep"), _make_mcp_tool("new")])
@@ -884,31 +884,31 @@ class TestMCPServerTask:
with patch("tools.registry.registry", mock_registry):
mock_registry.register(
name="mcp__srv__old",
name="mcp_srv_old",
toolset="mcp-srv",
schema={"name": "mcp__srv__old", "description": "Old"},
schema={"name": "mcp_srv_old", "description": "Old"},
handler=lambda *_args, **_kwargs: "{}",
)
mock_registry.register(
name="mcp__srv__keep",
name="mcp_srv_keep",
toolset="mcp-srv",
schema={"name": "mcp__srv__keep", "description": "Keep"},
schema={"name": "mcp_srv_keep", "description": "Keep"},
handler=lambda *_args, **_kwargs: "{}",
)
asyncio.run(server._refresh_tools())
names = mock_registry.get_all_tool_names()
assert "mcp__srv__old" not in names
assert "mcp__srv__keep" in names
assert "mcp__srv__new" in names
assert "mcp_srv_old" not in names
assert "mcp_srv_keep" in names
assert "mcp_srv_new" in names
assert set(server._registered_tool_names) == {
"mcp__srv__keep",
"mcp__srv__new",
"mcp__srv__list_resources",
"mcp__srv__read_resource",
"mcp__srv__list_prompts",
"mcp__srv__get_prompt",
"mcp_srv_keep",
"mcp_srv_new",
"mcp_srv_list_resources",
"mcp_srv_read_resource",
"mcp_srv_list_prompts",
"mcp_srv_get_prompt",
}
def test_schedule_tools_refresh_keeps_task_until_done(self):
@@ -1059,11 +1059,11 @@ class TestToolsetInjection:
from tools.mcp_tool import discover_mcp_tools
result = discover_mcp_tools()
assert "mcp__fs__list_files" in result
assert "mcp_fs_list_files" in result
assert validate_toolset("fs") is True
assert validate_toolset("mcp-fs") is True
assert "mcp__fs__list_files" in resolve_toolset("fs")
assert "mcp__fs__list_files" in resolve_toolset("mcp-fs")
assert "mcp_fs_list_files" in resolve_toolset("fs")
assert "mcp_fs_list_files" in resolve_toolset("mcp-fs")
def test_server_toolset_skips_builtin_collision(self):
"""MCP raw aliases never overwrite a built-in toolset name."""
@@ -1099,9 +1099,9 @@ class TestToolsetInjection:
discover_mcp_tools()
assert fake_toolsets["terminal"]["description"] == "Terminal tools"
assert "mcp__terminal__run" not in resolve_toolset("terminal")
assert "mcp_terminal_run" not in resolve_toolset("terminal")
assert validate_toolset("mcp-terminal") is True
assert "mcp__terminal__run" in resolve_toolset("mcp-terminal")
assert "mcp_terminal_run" in resolve_toolset("mcp-terminal")
def test_server_connection_failure_skipped(self):
"""If one server fails to connect, others still proceed."""
@@ -1139,8 +1139,8 @@ class TestToolsetInjection:
from tools.mcp_tool import discover_mcp_tools
result = discover_mcp_tools()
assert "mcp__good__ping" in result
assert "mcp__broken__ping" not in result
assert "mcp_good_ping" in result
assert "mcp_broken_ping" not in result
assert call_count == 2
def test_partial_failure_retry_on_second_call(self):
@@ -1182,8 +1182,8 @@ class TestToolsetInjection:
# First call: good connects, broken fails
result1 = discover_mcp_tools()
assert "mcp__good__ping" in result1
assert "mcp__broken__ping" not in result1
assert "mcp_good_ping" in result1
assert "mcp_broken_ping" not in result1
first_attempts = call_count
# "Fix" the broken server
@@ -1192,8 +1192,8 @@ class TestToolsetInjection:
# Second call: should retry broken, skip good
result2 = discover_mcp_tools()
assert "mcp__good__ping" in result2
assert "mcp__broken__ping" in result2
assert "mcp_good_ping" in result2
assert "mcp_broken_ping" in result2
assert call_count == 1 # Only broken retried
@@ -1261,10 +1261,10 @@ class TestShutdown:
_servers.clear()
registry.register(
name="mcp__test__ping",
name="mcp_test_ping",
toolset="mcp-test",
schema={
"name": "mcp__test__ping",
"name": "mcp_test_ping",
"description": "Ping",
"parameters": {"type": "object", "properties": {}},
},
@@ -1273,19 +1273,19 @@ class TestShutdown:
registry.register_toolset_alias("test", "mcp-test")
server = MCPServerTask("test")
server._registered_tool_names = ["mcp__test__ping"]
server._registered_tool_names = ["mcp_test_ping"]
_servers["test"] = server
mcp_mod._ensure_mcp_loop()
try:
assert validate_toolset("test") is True
assert "mcp__test__ping" in resolve_toolset("test")
assert "mcp_test_ping" in resolve_toolset("test")
shutdown_mcp_servers()
finally:
mcp_mod._mcp_loop = None
mcp_mod._mcp_thread = None
assert "mcp__test__ping" not in registry.get_all_tool_names()
assert "mcp_test_ping" not in registry.get_all_tool_names()
assert validate_toolset("test") is False
def test_shutdown_handles_errors(self):
@@ -1961,10 +1961,10 @@ class TestUtilitySchemas:
schemas = _build_utility_schemas("myserver")
assert len(schemas) == 4
names = [s["schema"]["name"] for s in schemas]
assert "mcp__myserver__list_resources" in names
assert "mcp__myserver__read_resource" in names
assert "mcp__myserver__list_prompts" in names
assert "mcp__myserver__get_prompt" in names
assert "mcp_myserver_list_resources" in names
assert "mcp_myserver_read_resource" in names
assert "mcp_myserver_list_prompts" in names
assert "mcp_myserver_get_prompt" in names
def test_hyphens_sanitized_in_utility_names(self):
from tools.mcp_tool import _build_utility_schemas
@@ -1973,7 +1973,7 @@ class TestUtilitySchemas:
names = [s["schema"]["name"] for s in schemas]
for name in names:
assert "-" not in name
assert "mcp__my_server__list_resources" in names
assert "mcp_my_server_list_resources" in names
def test_list_resources_schema_no_required_params(self):
from tools.mcp_tool import _build_utility_schemas
@@ -2296,11 +2296,11 @@ class TestUtilityToolRegistration:
)
# Regular tool + 4 utility tools
assert "mcp__fs__read_file" in registered
assert "mcp__fs__list_resources" in registered
assert "mcp__fs__read_resource" in registered
assert "mcp__fs__list_prompts" in registered
assert "mcp__fs__get_prompt" in registered
assert "mcp_fs_read_file" in registered
assert "mcp_fs_list_resources" in registered
assert "mcp_fs_read_resource" in registered
assert "mcp_fs_list_prompts" in registered
assert "mcp_fs_get_prompt" in registered
assert len(registered) == 5
# All in the registry
@@ -2331,8 +2331,8 @@ class TestUtilityToolRegistration:
)
# Check that utility tools are in the right toolset
for tool_name in ["mcp__myserv__list_resources", "mcp__myserv__read_resource",
"mcp__myserv__list_prompts", "mcp__myserv__get_prompt"]:
for tool_name in ["mcp_myserv_list_resources", "mcp_myserv_read_resource",
"mcp_myserv_list_prompts", "mcp_myserv_get_prompt"]:
entry = mock_registry._tools.get(tool_name)
assert entry is not None, f"{tool_name} not found in registry"
assert entry.toolset == "mcp-myserv"
@@ -2359,7 +2359,7 @@ class TestUtilityToolRegistration:
_discover_and_register_server("chk", {"command": "test"})
)
entry = mock_registry._tools.get("mcp__chk__list_resources")
entry = mock_registry._tools.get("mcp_chk_list_resources")
assert entry is not None
# Server is connected, check_fn should return True
assert entry.check_fn() is True
@@ -3284,12 +3284,12 @@ class TestDiscoveryFailedCount:
server.session = MagicMock()
server._tools = [_make_mcp_tool("tool_a")]
_servers[name] = server
return [f"mcp__{name}__tool_a"]
return [f"mcp_{name}_tool_a"]
with patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp__good_server__tool_a"]):
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_good_server_tool_a"]):
_ensure_mcp_loop()
# Capture the logger to verify failed_count in summary
@@ -3358,12 +3358,12 @@ class TestDiscoveryFailedCount:
server.session = MagicMock()
server._tools = [_make_mcp_tool("t")]
_servers[name] = server
return [f"mcp__{name}__t"]
return [f"mcp_{name}_t"]
with patch("tools.mcp_tool._load_mcp_config", return_value=fake_config), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=selective_register), \
patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp__ok1__t", "mcp__ok2__t"]):
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_ok1_t", "mcp_ok2_t"]):
_ensure_mcp_loop()
with patch("tools.mcp_tool.logger") as mock_logger:
@@ -3430,7 +3430,7 @@ class TestMCPSelectiveToolLoading:
config,
session=SimpleNamespace(),
)
assert registered == ["mcp__ink__create_service"]
assert registered == ["mcp_ink_create_service"]
def test_exclude_filter_registers_all_except_listed_tools(self):
config = {
@@ -3444,8 +3444,8 @@ class TestMCPSelectiveToolLoading:
session=SimpleNamespace(),
)
assert registered == [
"mcp__ink_exclude__create_service",
"mcp__ink_exclude__list_services",
"mcp_ink_exclude_create_service",
"mcp_ink_exclude_list_services",
]
def test_include_filter_skips_utility_tools_without_capabilities(self):
@@ -3459,8 +3459,8 @@ class TestMCPSelectiveToolLoading:
config,
session=SimpleNamespace(),
)
assert registered == ["mcp__ink_no_caps__create_service"]
assert set(mock_registry.get_all_tool_names()) == {"mcp__ink_no_caps__create_service"}
assert registered == ["mcp_ink_no_caps_create_service"]
assert set(mock_registry.get_all_tool_names()) == {"mcp_ink_no_caps_create_service"}
def test_no_filter_registers_all_server_tools_when_no_utilities_supported(self):
registered, _ = self._run_discover(
@@ -3470,9 +3470,9 @@ class TestMCPSelectiveToolLoading:
session=SimpleNamespace(),
)
assert registered == [
"mcp__ink_no_filter__create_service",
"mcp__ink_no_filter__delete_service",
"mcp__ink_no_filter__list_services",
"mcp_ink_no_filter_create_service",
"mcp_ink_no_filter_delete_service",
"mcp_ink_no_filter_list_services",
]
def test_resources_and_prompts_can_be_disabled_explicitly(self):
@@ -3495,7 +3495,7 @@ class TestMCPSelectiveToolLoading:
config,
session=session,
)
assert registered == ["mcp__ink_disabled_utils__create_service"]
assert registered == ["mcp_ink_disabled_utils_create_service"]
def test_registers_only_utility_tools_supported_by_server_capabilities(self):
session = SimpleNamespace(
@@ -3508,11 +3508,11 @@ class TestMCPSelectiveToolLoading:
{"url": "https://mcp.example.com"},
session=session,
)
assert "mcp__ink_resources_only__create_service" in registered
assert "mcp__ink_resources_only__list_resources" in registered
assert "mcp__ink_resources_only__read_resource" in registered
assert "mcp__ink_resources_only__list_prompts" not in registered
assert "mcp__ink_resources_only__get_prompt" not in registered
assert "mcp_ink_resources_only_create_service" in registered
assert "mcp_ink_resources_only_list_resources" in registered
assert "mcp_ink_resources_only_read_resource" in registered
assert "mcp_ink_resources_only_list_prompts" not in registered
assert "mcp_ink_resources_only_get_prompt" not in registered
def test_existing_tool_names_reflect_registered_subset(self):
from tools.mcp_tool import _existing_tool_names, _servers, _discover_and_register_server
@@ -3541,8 +3541,8 @@ class TestMCPSelectiveToolLoading:
try:
registered, existing = asyncio.run(run())
assert registered == ["mcp__ink_existing__create_service"]
assert existing == ["mcp__ink_existing__create_service"]
assert registered == ["mcp_ink_existing_create_service"]
assert existing == ["mcp_ink_existing_create_service"]
finally:
_servers.pop("ink_existing", None)
@@ -3667,12 +3667,12 @@ class TestMCPBuiltinCollisionGuard:
# Pre-register a "built-in" tool with the name that the MCP tool would produce.
# Server "abc", tool "search" → mcp_abc_search
builtin_schema = {
"name": "mcp__abc__search",
"name": "mcp_abc_search",
"description": "A hypothetical built-in",
"parameters": {"type": "object", "properties": {}},
}
mock_registry.register(
name="mcp__abc__search", toolset="web",
name="mcp_abc_search", toolset="web",
schema=builtin_schema, handler=lambda a, **k: "{}",
)
@@ -3692,8 +3692,8 @@ class TestMCPBuiltinCollisionGuard:
)
# The MCP tool should have been skipped — built-in preserved.
assert "mcp__abc__search" not in registered
assert mock_registry.get_toolset_for_tool("mcp__abc__search") == "web"
assert "mcp_abc_search" not in registered
assert mock_registry.get_toolset_for_tool("mcp_abc_search") == "web"
_servers.pop("abc", None)
@@ -3718,8 +3718,8 @@ class TestMCPBuiltinCollisionGuard:
_discover_and_register_server("minimax", {"command": "test", "args": []})
)
assert "mcp__minimax__web_search" in registered
assert mock_registry.get_toolset_for_tool("mcp__minimax__web_search") == "mcp-minimax"
assert "mcp_minimax_web_search" in registered
assert mock_registry.get_toolset_for_tool("mcp_minimax_web_search") == "mcp-minimax"
_servers.pop("minimax", None)
@@ -3732,12 +3732,12 @@ class TestMCPBuiltinCollisionGuard:
# Pre-register an MCP tool from a different server.
mcp_schema = {
"name": "mcp__srv__do_thing",
"name": "mcp_srv_do_thing",
"description": "From another MCP server",
"parameters": {"type": "object", "properties": {}},
}
mock_registry.register(
name="mcp__srv__do_thing", toolset="mcp-old",
name="mcp_srv_do_thing", toolset="mcp-old",
schema=mcp_schema, handler=lambda a, **k: "{}",
)
@@ -3757,8 +3757,8 @@ class TestMCPBuiltinCollisionGuard:
)
# MCP-to-MCP collision is allowed — the new server wins.
assert "mcp__srv__do_thing" in registered
assert mock_registry.get_toolset_for_tool("mcp__srv__do_thing") == "mcp-srv"
assert "mcp_srv_do_thing" in registered
assert mock_registry.get_toolset_for_tool("mcp_srv_do_thing") == "mcp-srv"
_servers.pop("srv", None)
@@ -3805,7 +3805,7 @@ class TestSanitizeMcpNameComponent:
mcp_tool = _make_mcp_tool(name="search")
schema = _convert_mcp_schema("ai.exa/exa", mcp_tool)
assert schema["name"] == "mcp__ai_exa_exa__search"
assert schema["name"] == "mcp_ai_exa_exa_search"
# Must match Anthropic's pattern: ^[a-zA-Z0-9_-]{1,128}$
import re
assert re.match(r"^[a-zA-Z0-9_-]{1,128}$", schema["name"])
@@ -3827,16 +3827,16 @@ class TestSanitizeMcpNameComponent:
reg = ToolRegistry()
reg.register(
name="mcp__ai_exa_exa__search",
name="mcp_ai_exa_exa_search",
toolset="mcp-ai.exa/exa",
schema={"name": "mcp__ai_exa_exa__search", "description": "Search", "parameters": {"type": "object", "properties": {}}},
schema={"name": "mcp_ai_exa_exa_search", "description": "Search", "parameters": {"type": "object", "properties": {}}},
handler=lambda *_args, **_kwargs: "{}",
)
reg.register_toolset_alias("ai.exa/exa", "mcp-ai.exa/exa")
with patch("tools.registry.registry", reg):
assert validate_toolset("ai.exa/exa") is True
assert "mcp__ai_exa_exa__search" in resolve_toolset("ai.exa/exa")
assert "mcp_ai_exa_exa_search" in resolve_toolset("ai.exa/exa")
# ---------------------------------------------------------------------------
@@ -3869,9 +3869,9 @@ class TestRegisterMcpServers:
try:
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp__existing__tool"]):
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_existing_tool"]):
result = register_mcp_servers({"existing": {"command": "test"}})
assert result == ["mcp__existing__tool"]
assert result == ["mcp_existing_tool"]
finally:
_servers.pop("existing", None)
@@ -3893,17 +3893,17 @@ class TestRegisterMcpServers:
async def fake_register(name, cfg):
server = _make_mock_server(name)
server._registered_tool_names = ["mcp__my_server__tool1"]
server._registered_tool_names = ["mcp_my_server_tool1"]
_servers[name] = server
return ["mcp__my_server__tool1"]
return ["mcp_my_server_tool1"]
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp__my_server__tool1"]):
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_my_server_tool1"]):
_ensure_mcp_loop()
result = register_mcp_servers(fake_config)
assert "mcp__my_server__tool1" in result
assert "mcp_my_server_tool1" in result
_servers.pop("my_server", None)
def test_logs_summary_on_success(self):
@@ -3913,13 +3913,13 @@ class TestRegisterMcpServers:
async def fake_register(name, cfg):
server = _make_mock_server(name)
server._registered_tool_names = ["mcp__srv__t1", "mcp__srv__t2"]
server._registered_tool_names = ["mcp_srv_t1", "mcp_srv_t2"]
_servers[name] = server
return ["mcp__srv__t1", "mcp__srv__t2"]
return ["mcp_srv_t1", "mcp_srv_t2"]
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._discover_and_register_server", side_effect=fake_register), \
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp__srv__t1", "mcp__srv__t2"]):
patch("tools.mcp_tool._existing_tool_names", return_value=["mcp_srv_t1", "mcp_srv_t2"]):
_ensure_mcp_loop()
with patch("tools.mcp_tool.logger") as mock_logger:
@@ -3957,7 +3957,7 @@ class TestMcpParallelToolCalls:
with _lock:
_parallel_safe_servers.clear()
_mcp_tool_server_names.clear()
assert is_mcp_tool_parallel_safe("mcp__docs__search") is False
assert is_mcp_tool_parallel_safe("mcp_docs_search") is False
def test_is_mcp_tool_parallel_safe_with_flag(self):
"""MCP tool from a parallel-safe server returns True."""
@@ -3967,20 +3967,20 @@ class TestMcpParallelToolCalls:
)
with _lock:
_parallel_safe_servers.add("docs")
_mcp_tool_server_names["mcp__docs__search"] = "docs"
_mcp_tool_server_names["mcp__docs__read_file"] = "docs"
_mcp_tool_server_names["mcp__github__list_repos"] = "github"
_mcp_tool_server_names["mcp_docs_search"] = "docs"
_mcp_tool_server_names["mcp_docs_read_file"] = "docs"
_mcp_tool_server_names["mcp_github_list_repos"] = "github"
try:
assert is_mcp_tool_parallel_safe("mcp__docs__search") is True
assert is_mcp_tool_parallel_safe("mcp__docs__read_file") is True
assert is_mcp_tool_parallel_safe("mcp_docs_search") is True
assert is_mcp_tool_parallel_safe("mcp_docs_read_file") is True
# Different server should be False
assert is_mcp_tool_parallel_safe("mcp__github__list_repos") is False
assert is_mcp_tool_parallel_safe("mcp_github_list_repos") is False
finally:
with _lock:
_parallel_safe_servers.discard("docs")
_mcp_tool_server_names.pop("mcp__docs__search", None)
_mcp_tool_server_names.pop("mcp__docs__read_file", None)
_mcp_tool_server_names.pop("mcp__github__list_repos", None)
_mcp_tool_server_names.pop("mcp_docs_search", None)
_mcp_tool_server_names.pop("mcp_docs_read_file", None)
_mcp_tool_server_names.pop("mcp_github_list_repos", None)
def test_is_mcp_tool_parallel_safe_server_with_underscores(self):
"""Server names containing underscores are correctly matched."""
@@ -3990,13 +3990,13 @@ class TestMcpParallelToolCalls:
)
with _lock:
_parallel_safe_servers.add("my_server")
_mcp_tool_server_names["mcp__my_server__query"] = "my_server"
_mcp_tool_server_names["mcp_my_server_query"] = "my_server"
try:
assert is_mcp_tool_parallel_safe("mcp__my_server__query") is True
assert is_mcp_tool_parallel_safe("mcp_my_server_query") is True
finally:
with _lock:
_parallel_safe_servers.discard("my_server")
_mcp_tool_server_names.pop("mcp__my_server__query", None)
_mcp_tool_server_names.pop("mcp_my_server_query", None)
def test_is_mcp_tool_parallel_safe_uses_exact_registered_server(self):
"""Ambiguous MCP names must not match a shorter parallel-safe prefix."""
@@ -4006,16 +4006,16 @@ class TestMcpParallelToolCalls:
)
with _lock:
_parallel_safe_servers.add("a")
_mcp_tool_server_names["mcp__a__search"] = "a"
_mcp_tool_server_names["mcp__a_b__tool"] = "a_b"
_mcp_tool_server_names["mcp_a_search"] = "a"
_mcp_tool_server_names["mcp_a_b_tool"] = "a_b"
try:
assert is_mcp_tool_parallel_safe("mcp__a__search") is True
assert is_mcp_tool_parallel_safe("mcp__a_b__tool") is False
assert is_mcp_tool_parallel_safe("mcp_a_search") is True
assert is_mcp_tool_parallel_safe("mcp_a_b_tool") is False
finally:
with _lock:
_parallel_safe_servers.discard("a")
_mcp_tool_server_names.pop("mcp__a__search", None)
_mcp_tool_server_names.pop("mcp__a_b__tool", None)
_mcp_tool_server_names.pop("mcp_a_search", None)
_mcp_tool_server_names.pop("mcp_a_b_tool", None)
def test_registered_tool_provenance_prevents_prefix_collision(self):
"""Registration records exact server ownership for ambiguous names."""
@@ -4031,22 +4031,22 @@ class TestMcpParallelToolCalls:
)
registered = _register_server_tools("a_b", server, {})
try:
assert registered == ["mcp__a_b__tool"]
assert registered == ["mcp_a_b_tool"]
with _lock:
assert _mcp_tool_server_names["mcp__a_b__tool"] == "a_b"
assert _mcp_tool_server_names["mcp_a_b_tool"] == "a_b"
_parallel_safe_servers.add("a")
assert is_mcp_tool_parallel_safe("mcp__a_b__tool") is False
assert is_mcp_tool_parallel_safe("mcp_a_b_tool") is False
with _lock:
_parallel_safe_servers.add("a_b")
assert is_mcp_tool_parallel_safe("mcp__a_b__tool") is True
assert is_mcp_tool_parallel_safe("mcp_a_b_tool") is True
finally:
for tool_name in registered:
registry.deregister(tool_name)
with _lock:
_parallel_safe_servers.discard("a")
_parallel_safe_servers.discard("a_b")
_mcp_tool_server_names.pop("mcp__a_b__tool", None)
_mcp_tool_server_names.pop("mcp_a_b_tool", None)
def test_is_mcp_tool_parallel_safe_no_tool_suffix(self):
"""Tool name that is just 'mcp_{server}' without a tool part returns False."""
@@ -4057,12 +4057,12 @@ class TestMcpParallelToolCalls:
with _lock:
_parallel_safe_servers.add("docs")
_mcp_tool_server_names.pop("mcp_docs", None)
_mcp_tool_server_names.pop("mcp__docs__", None)
_mcp_tool_server_names.pop("mcp_docs_", None)
try:
# "mcp_docs" has no tool part after the server name
assert is_mcp_tool_parallel_safe("mcp_docs") is False
# "mcp_docs_" has empty tool part
assert is_mcp_tool_parallel_safe("mcp__docs__") is False
assert is_mcp_tool_parallel_safe("mcp_docs_") is False
finally:
with _lock:
_parallel_safe_servers.discard("docs")

View File

@@ -10,6 +10,7 @@ This module is the single source of truth for the dangerous command system:
import contextvars
import fnmatch
import functools
import logging
import os
import re
@@ -571,87 +572,136 @@ def _normalize_command_for_detection(command: str) -> str:
command = command.replace('\x00', '')
# Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.)
command = unicodedata.normalize('NFKC', command)
# Fold absolute home / active-profile-home prefixes into their canonical
# ~/ and ~/.hermes/ forms so static user-sensitive patterns catch
# /home/alice/.bashrc and C:\Users\alice\.bashrc the same way they catch
# ~/.bashrc. Resolve at detection time (not via an import-time snapshot) so
# it tracks HOME / HERMES_HOME even when those are set after this module is
# imported — as the hermetic test conftest and profile/session launchers do.
#
# This MUST run before the backslash-escape strip below: on Windows the home
# prefix is separated by backslashes (C:\Users\alice\...), which that strip
# would otherwise dissolve (-> C:Usersalice) and make the fold impossible.
# The fold matches either separator, so POSIX paths are unaffected by order.
#
# Fold the (more specific) Hermes home first: on Windows it nests under the
# user home (C:\Users\alice\AppData\...\hermes), so folding the user home
# first would eat the prefix the Hermes-home fold needs.
command = _rewrite_resolved_hermes_home(command)
command = _rewrite_resolved_user_home(command)
# Strip shell backslash-escapes: r\m → rm. Prevents \-injection bypass.
command = re.sub(r'\\([^\n])', r'\1', command)
# Strip empty-string literals that split tokens: r''m → rm, r"\"m → rm.
command = re.sub(r"''|\"\"", '', command)
# Fold the current user's resolved absolute home path into ~/ at detection
# time so static user-sensitive patterns catch /home/alice/.bashrc the same
# way they catch ~/.bashrc. Do not snapshot this at import time: tests and
# profile/session launchers can set HOME after this module is imported.
command = _rewrite_resolved_user_home(command)
# Fold the resolved absolute active-profile home path into the canonical
# ~/.hermes/ form so the Hermes config/env patterns catch it. In Docker and
# gateway deployments the agent often references the resolved absolute path
# directly (e.g. `sed -i ... /home/hermes/.hermes/config.yaml`) rather than
# ~, $HOME, or $HERMES_HOME. Done at detection time (not via an import-time
# pattern snapshot) so it tracks the live HERMES_HOME even when that is set
# after this module is imported — as the hermetic test conftest does.
command = _rewrite_resolved_hermes_home(command)
return command
# Shell metacharacters, quotes, and whitespace that terminate a filesystem
# path token on a command line. Used to bound the path tail we normalize.
_PATH_TOKEN_STOP = r"""\s'"`;|&<>()"""
# One path segment (no separators, no terminators) preceded by a separator.
_PATH_TAIL = r"(?P<tail>(?:[/\\][^/\\" + _PATH_TOKEN_STOP + r"]*)+)"
@functools.lru_cache(maxsize=64)
def _home_prefix_fold_regex(path: str):
"""Compile a regex matching *path* used as an absolute directory prefix.
The home components are matched with either separator (``/`` or ``\\``)
between them, followed by the rest of the path token (the ``tail`` group),
so a Windows native path (``C:\\Users\\alice\\.ssh\\authorized_keys``), its
forward-slash form, and mixed-separator forms all fold — and the tail's
backslashes get normalized to ``/`` by the caller so multi-segment static
patterns (``~/.ssh/authorized_keys``) still match. The trailing tail is
required (``+``), so a bare home with no path under it is not folded.
Returns ``None`` for an unset or degenerate path — one with fewer than two
components below the root — so a stray HOME / HERMES_HOME such as ``/``,
``C:\\`` or ``""`` cannot rewrite unrelated filesystem prefixes. Cached
because the resolved home is stable across calls on this hot path.
"""
if not path:
return None
components = [c for c in re.split(r"[/\\]+", path) if c]
# Require at least two non-empty components below the root. For POSIX this
# mirrors the historical ``count("/") >= 2`` guard (``/home/alice`` folds,
# ``/home`` does not); for Windows it rejects a bare drive root (``C:\\``)
# while accepting a real home (``C:\\Users\\alice``).
if len(components) < 2:
return None
body = r"[/\\]+".join(re.escape(c) for c in components)
# Optional leading root separator (POSIX ``/`` or UNC ``\\``); a Windows
# drive letter is captured as the first component.
return re.compile(r"[/\\]*" + body + _PATH_TAIL)
def _fold_home_prefixes(command: str, paths, replacement: str) -> str:
"""Fold each resolved home *path* prefix in *command* to *replacement*.
*replacement* has no trailing separator (``~`` / ``~/.hermes``); the matched
path tail (with its backslashes normalized to ``/``) supplies it. Longest
candidate first so a deeper home (e.g. an explicit HOME under USERPROFILE)
folds before a shorter overlapping one that would otherwise clobber it.
"""
seen: set[str] = set()
for path in sorted((p for p in paths if p), key=len, reverse=True):
if path in seen:
continue
seen.add(path)
pattern = _home_prefix_fold_regex(path)
if pattern is not None:
command = pattern.sub(
lambda m: replacement + m.group("tail").replace("\\", "/"),
command,
)
return command
def _rewrite_resolved_user_home(command: str) -> str:
"""Rewrite the current user's absolute home prefix to ``~/``.
Resolves HOME at detection time, including its symlink-resolved form, so
terminal commands targeting absolute home paths are checked by the same
static patterns as tilde and $HOME forms. No-op when HOME is unset or
degenerate.
Resolves the home at detection time — its expanduser form, symlink-resolved
form, and an explicitly set ``HOME`` — so absolute home paths are checked by
the same static patterns as tilde and ``$HOME`` forms. ``HOME`` is consulted
directly because Windows' ``os.path.expanduser`` resolves ``~`` from
``USERPROFILE`` and ignores ``HOME``, unlike POSIX. Matches both POSIX
(``/home/alice``) and Windows (``C:\\Users\\alice`` or ``C:/Users/alice``)
separators. No-op when the home is unset or degenerate.
"""
try:
home = os.path.expanduser("~")
candidates = [
home.rstrip("/"),
os.path.realpath(home).rstrip("/"),
home,
os.path.realpath(home),
os.environ.get("HOME", ""),
]
except Exception:
return command
seen: set[str] = set()
for path in candidates:
if not path or path in seen:
continue
seen.add(path)
# Require an absolute path below root so a bad HOME cannot rewrite the
# whole filesystem namespace.
normalized = path.rstrip("/")
if not normalized.startswith("/") or normalized.count("/") < 2:
continue
command = command.replace(normalized + "/", "~/")
return command
return _fold_home_prefixes(command, candidates, "~")
def _rewrite_resolved_hermes_home(command: str) -> str:
"""Rewrite the resolved absolute Hermes home prefix to ``~/.hermes/``.
Resolves the active ``HERMES_HOME`` at call time (and its symlink-resolved
form) and replaces an occurrence of ``<home>/`` in *command* with
form) and folds an occurrence of ``<home>/`` in *command* into
``~/.hermes/`` so the static ``_HERMES_CONFIG_PATH`` / ``_HERMES_ENV_PATH``
patterns match. No-op when the path can't be resolved or doesn't appear.
patterns match. In Docker and gateway deployments the agent often references
the resolved absolute path directly (e.g. ``sed -i ...
/home/hermes/.hermes/config.yaml``) rather than ``~``, ``$HOME``, or
``$HERMES_HOME``. Matches both POSIX and Windows separators. No-op when the
path can't be resolved or doesn't appear.
"""
try:
from hermes_constants import get_hermes_home
home = get_hermes_home().expanduser()
candidates = [
str(home).rstrip("/"),
str(home.resolve(strict=False)).rstrip("/"),
str(home),
str(home.resolve(strict=False)),
]
except Exception:
return command
seen: set[str] = set()
for path in candidates:
if not path or path in seen:
continue
seen.add(path)
# Guard against a degenerate HERMES_HOME (e.g. "/" or "") rewriting
# unrelated paths: require an absolute path with at least one non-root
# component. The active profile home is always a real directory like
# /home/hermes/.hermes or a per-test tempdir, never a bare root.
normalized = path.rstrip("/")
if not normalized.startswith("/") or normalized.count("/") < 2:
continue
command = command.replace(normalized + "/", "~/.hermes/")
return command
return _fold_home_prefixes(command, candidates, "~/.hermes")
def detect_dangerous_command(command: str) -> tuple:

View File

@@ -134,6 +134,15 @@ LAZY_DEPS: dict[str, tuple[str, ...]] = {
# ─── Image generation backends ─────────────────────────────────────────
"image.fal": ("fal-client==0.13.1",),
# ─── Observability ─────────────────────────────────────────────────────
# OTLP telemetry export. Lazily installed on first use of
# `hermes telemetry export --otlp`. Tracks the `otlp` extra in
# pyproject.toml — bump both together.
"export.otlp": (
"opentelemetry-sdk==1.30.0",
"opentelemetry-exporter-otlp-proto-http==1.30.0",
),
# ─── Memory providers ──────────────────────────────────────────────────
"memory.honcho": ("honcho-ai==2.0.1",),
"memory.hindsight": ("hindsight-client==0.6.1",),

View File

@@ -1602,7 +1602,8 @@ class MCPServerTask:
# notifications. Tools absent from the fresh list are no longer
# callable, so remove only those stale registry entries first.
stale_tool_names = old_tool_names - {
mcp_prefixed_tool_name(self.name, tool.name)
f"mcp_{sanitize_mcp_name_component(self.name)}_"
f"{sanitize_mcp_name_component(tool.name)}"
for tool in new_mcp_tools
}
for tool_name in stale_tool_names:
@@ -3677,27 +3678,6 @@ def sanitize_mcp_name_component(value: str) -> str:
return re.sub(r"[^A-Za-z0-9_]", "_", str(value or ""))
# Native MCP tool-name prefix. Hermes uses the ``mcp__<server>__<tool>``
# convention shared by Claude Code, Codex, and OpenCode (anomalyco/opencode
# #33533). The double-underscore delimiter disambiguates the server/tool
# boundary even when either component contains underscores, and matches the
# naming models are trained on. It also aligns native registration with the
# Anthropic-OAuth wire form (``_MCP_TOOL_PREFIX`` in anthropic_adapter.py),
# removing the single->double rewrite that path previously had to perform.
MCP_TOOL_NAME_PREFIX = "mcp__"
_MCP_NAME_DELIM = "__"
def mcp_prefixed_tool_name(server_name: str, tool_name: str) -> str:
"""Build the registry/wire name for an MCP tool.
Produces ``mcp__<sanitizedServer>__<sanitizedTool>``.
"""
safe_server = sanitize_mcp_name_component(server_name)
safe_tool = sanitize_mcp_name_component(tool_name)
return f"{MCP_TOOL_NAME_PREFIX}{safe_server}{_MCP_NAME_DELIM}{safe_tool}"
def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
"""Convert an MCP tool listing to the Hermes registry schema format.
@@ -3709,7 +3689,9 @@ def _convert_mcp_schema(server_name: str, mcp_tool) -> dict:
Returns:
A dict suitable for ``registry.register(schema=...)``.
"""
prefixed_name = mcp_prefixed_tool_name(server_name, mcp_tool.name)
safe_tool_name = sanitize_mcp_name_component(mcp_tool.name)
safe_server_name = sanitize_mcp_name_component(server_name)
prefixed_name = f"mcp_{safe_server_name}_{safe_tool_name}"
return {
"name": prefixed_name,
"description": mcp_tool.description or f"MCP tool {mcp_tool.name} from {server_name}",
@@ -3723,10 +3705,11 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
Returns a list of (schema, handler_factory_name) tuples encoded as dicts
with keys: schema, handler_key.
"""
safe_name = sanitize_mcp_name_component(server_name)
return [
{
"schema": {
"name": mcp_prefixed_tool_name(server_name, "list_resources"),
"name": f"mcp_{safe_name}_list_resources",
"description": f"List available resources from MCP server '{server_name}'",
"parameters": {
"type": "object",
@@ -3737,7 +3720,7 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
},
{
"schema": {
"name": mcp_prefixed_tool_name(server_name, "read_resource"),
"name": f"mcp_{safe_name}_read_resource",
"description": f"Read a resource by URI from MCP server '{server_name}'",
"parameters": {
"type": "object",
@@ -3754,7 +3737,7 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
},
{
"schema": {
"name": mcp_prefixed_tool_name(server_name, "list_prompts"),
"name": f"mcp_{safe_name}_list_prompts",
"description": f"List available prompts from MCP server '{server_name}'",
"parameters": {
"type": "object",
@@ -3765,7 +3748,7 @@ def _build_utility_schemas(server_name: str) -> List[dict]:
},
{
"schema": {
"name": mcp_prefixed_tool_name(server_name, "get_prompt"),
"name": f"mcp_{safe_name}_get_prompt",
"description": f"Get a prompt by name from MCP server '{server_name}'",
"parameters": {
"type": "object",
@@ -4225,15 +4208,15 @@ def discover_mcp_tools() -> List[str]:
def is_mcp_tool_parallel_safe(tool_name: str) -> bool:
"""Check if an MCP tool belongs to a server that supports parallel tool calls.
MCP tool names follow the pattern ``mcp__{server}__{tool}``, but that
string shape is ambiguous when server names contain underscores. Use the
exact server provenance captured at registration time rather than prefix
MCP tool names follow the pattern ``mcp_{server}_{tool}``, but that string
shape is ambiguous when server names contain underscores. Use the exact
server provenance captured at registration time rather than prefix
matching, then check whether that server's config includes
``supports_parallel_tool_calls: true``.
Returns False for non-MCP tools or tools from servers without the flag.
"""
if not tool_name.startswith(MCP_TOOL_NAME_PREFIX):
if not tool_name.startswith("mcp_"):
return False
with _lock:
server_name = _mcp_tool_server_names.get(tool_name)