Compare commits

...

1 Commits

Author SHA1 Message Date
emozilla
ccfa079252 feat(telemetry): local-first telemetry & observability
Add a built-in telemetry system that records what the agent does — workflows,
model calls, tool calls, errors — to the local machine, powers `/insights`, and
can export to an operator-chosen destination. Default-on locally; nothing leaves
the machine unless the user exports it or opts into the aggregate plane.

Three planes with a hard wall between them:
  - local: full-fidelity observability (real model/provider/tool names), on by
    default, never leaves the machine.
  - aggregate: opt-in metadata, default off. No uploader ships — consent is
    recorded via telemetry.consent_state, and `preview` shows what would be
    produced, computed locally.
  - trajectories: full message content, opt-in, exported only to the operator's
    own destination.

Mechanism:
  - Bundled `telemetry` plugin registers observational lifecycle hooks
    (on_session_start / post_api_request / post_tool_call / on_session_finalize).
    No core call sites are edited; hooks already carry the data.
  - Fire-and-forget emitter: emit() returns in microseconds, never blocks or
    raises into a model/tool call. A daemon thread writes events to an
    append-only JSONL log and the tel_* tables in state.db (its own sqlite
    connection, separate from SessionDB).
  - tel_runs / tel_model_calls / tel_tool_calls live in the declarative
    SCHEMA_SQL and are reconciled automatically; SCHEMA_VERSION 16 -> 17.
  - metrics derives rollups for /usage and /insights; rollup builds per-run
    summaries for `hermes telemetry preview`.

Consent is config, not a parallel command surface. The config file is the root
of trust: set telemetry.consent_state with `hermes config set`, or pin any
telemetry.* key (including allow_aggregate) via managed scope, which overrides
the user's value per key. `hermes telemetry` exposes only what config cannot:
status (report), preview (query), and export.

Export:
  - exporter_bulk writes telemetry (and, when the trajectories plane is enabled,
    session content) to ndjson/json.
  - otlp_exporter streams spans to a configured OpenTelemetry Collector over
    OTLP/HTTP. The SDK is an optional extra (hermes-agent[otlp]), lazily
    installed via tools.lazy_deps on first use.
  - Secrets are always redacted on every export path
    (redact_sensitive_text(force=True)); content export is gated by the
    trajectories plane, and PII scrubbing follows telemetry.content_redaction.
    OTLP auth headers reference environment variable names, never inline values.

No outbound emission to Nous. The aggregate uploader is intentionally not built.
2026-06-24 02:14:02 -04:00
37 changed files with 3920 additions and 4 deletions

View File

@@ -81,6 +81,19 @@ def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
return ["" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
def _fmt_ms(ms: float) -> str:
"""Compact human duration from milliseconds (e.g. 850ms, 2.4s, 1.5m)."""
try:
ms = float(ms or 0)
except (TypeError, ValueError):
return "0ms"
if ms < 1000:
return f"{int(ms)}ms"
if ms < 60_000:
return f"{ms / 1000:.1f}s"
return f"{ms / 60_000:.1f}m"
class InsightsEngine:
"""
Analyzes session history and produces usage insights.
@@ -138,6 +151,7 @@ class InsightsEngine:
},
"activity": {},
"top_sessions": [],
"telemetry": {},
}
# Compute insights
@@ -148,6 +162,7 @@ class InsightsEngine:
skills = self._compute_skill_breakdown(skill_usage)
activity = self._compute_activity_patterns(sessions)
top_sessions = self._compute_top_sessions(sessions)
telemetry = self._compute_telemetry(cutoff)
return {
"days": days,
@@ -161,8 +176,37 @@ class InsightsEngine:
"skills": skills,
"activity": activity,
"top_sessions": top_sessions,
"telemetry": telemetry,
}
# =========================================================================
# Telemetry (observability) — from the tel_* tables (local plane)
# =========================================================================
def _compute_telemetry(self, cutoff: float) -> Dict[str, Any]:
"""Roll up the local telemetry tables for the same window.
Reuses the engine's existing connection. Fully fail-soft: if the tel_*
tables are empty or absent (telemetry.local disabled, fresh install), this
returns an empty dict and the renderer skips the section.
"""
try:
from agent.telemetry import metrics
except Exception:
return {}
try:
since_ns = int(cutoff * 1e9)
if not metrics.has_data(conn=self._conn):
return {}
return {
"workflows": metrics.workflow_summary(since_ns=since_ns, conn=self._conn),
"model_calls": metrics.model_call_summary(since_ns=since_ns, conn=self._conn),
"tool_calls": metrics.tool_call_summary(conn=self._conn),
"errors": metrics.error_summary(conn=self._conn),
}
except Exception:
return {}
# =========================================================================
# Data gathering (SQL queries)
# =========================================================================
@@ -852,8 +896,80 @@ class InsightsEngine:
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
lines.append("")
# Telemetry / observability (local plane) — only when data exists
tel = report.get("telemetry") or {}
if tel:
self._append_telemetry_section(lines, tel)
return "\n".join(lines)
def _append_telemetry_section(self, lines: List[str], tel: Dict[str, Any]) -> None:
"""Render the observability rollups (workflows, tools, providers, errors)."""
wf = tel.get("workflows", {})
mc = tel.get("model_calls", {})
tc = tel.get("tool_calls", {})
errs = tel.get("errors", {}).get("by_class", {})
lines.append(" 📡 Observability (local telemetry)")
lines.append(" " + "" * 56)
total_runs = wf.get("total_runs", 0)
if total_runs:
sr = wf.get("success_rate", 0.0) * 100
p50 = wf.get("duration_ms_p50", 0)
p95 = wf.get("duration_ms_p95", 0)
lines.append(
f" Workflows: {total_runs:,} Success: {sr:.1f}% "
f"Duration p50/p95: {_fmt_ms(p50)} / {_fmt_ms(p95)}"
)
by_entry = wf.get("by_entrypoint", {})
if by_entry:
entry_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_entry.items(), key=lambda x: -x[1])
)
lines.append(f" Entrypoints: {entry_str}")
# Tool reliability
if tc.get("total"):
fail_pct = tc.get("failure_rate", 0.0) * 100
lines.append(
f" Tool calls: {tc['total']:,} Failure rate: {fail_pct:.1f}%"
)
tools = tc.get("by_tool", {})
fails = tc.get("failures_by_tool", {})
top = sorted(tools.items(), key=lambda x: -x[1])[:6]
if top:
parts = []
for name, n in top:
f = fails.get(name, 0)
parts.append(f"{name}: {n}" + (f" ({f} failed)" if f else ""))
lines.append(" " + " ".join(parts))
# Provider / model mix + cache (real names)
by_provider = mc.get("by_provider", {})
if by_provider:
prov_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_provider.items(), key=lambda x: -x[1])
)
lines.append(f" Providers: {prov_str}")
by_model = mc.get("by_model", {})
if by_model:
model_str = ", ".join(
f"{k}: {v}" for k, v in sorted(by_model.items(), key=lambda x: -x[1])[:8]
)
cache = mc.get("cache_hit_rate", 0.0) * 100
suffix = f" Cache hit: {cache:.1f}%" if cache else ""
lines.append(f" Models: {model_str}{suffix}")
# Error classes
if errs:
err_str = ", ".join(
f"{k}: {v}" for k, v in sorted(errs.items(), key=lambda x: -x[1])[:6]
)
lines.append(f" Errors: {err_str}")
lines.append("")
def format_gateway(self, report: Dict) -> str:
"""Format the insights report for gateway/messaging (shorter)."""
if report.get("empty"):

View File

@@ -0,0 +1,30 @@
"""Hermes telemetry & observability.
Local-first observability, on by default. The ``telemetry`` plugin registers Hermes
lifecycle hooks and hands typed events to the fire-and-forget ``emitter`` (queue ->
background writer -> JSONL + state.db ``tel_*`` index). The emitter never blocks or
raises into a model/tool call (the hot-path invariant).
Events record the observed model ids, provider names, and tool names. ``metrics``
derives rollups for /usage and /insights; ``rollup`` builds the per-run summaries shown
by ``hermes telemetry preview``. ``redaction`` + ``exporter_bulk`` + ``otlp_exporter``
handle export to an operator-chosen destination. ``policy`` holds the consent state
machine for the opt-in aggregate plane (no uploader ships).
"""
from __future__ import annotations
from . import emitter, events, metrics, policy, spans
emit = emitter.emit
get_emitter = emitter.get_emitter
__all__ = [
"emitter",
"events",
"metrics",
"policy",
"spans",
"emit",
"get_emitter",
]

317
agent/telemetry/emitter.py Normal file
View File

@@ -0,0 +1,317 @@
"""Local-plane telemetry emitter: fire-and-forget queue + background writer.
The emitter is the single seam between instrumentation (the telemetry plugin's hook
callbacks) and durable storage. Its contract is the hot-path invariant:
``emit()`` MUST return in O(microseconds), MUST NOT block on disk/network, and
MUST NEVER raise into the caller. A telemetry failure is logged locally and
dropped — it can never affect a model call, a tool call, or a session.
Mechanism:
* ``emit(event)`` does a non-blocking ``queue.put_nowait`` wrapped in a bare except.
On a full queue it drops the *oldest* event and counts the drop.
* A daemon thread drains the queue and writes each event to two places:
1. the append-only JSONL log (source of truth)
2. the ``tel_*`` SQLite tables in state.db (rebuildable index)
* The writer uses its own sqlite connection to state.db, separate from SessionDB,
so telemetry writes never contend with or corrupt session writes.
Local plane only. Nothing here uploads anywhere.
"""
from __future__ import annotations
import json
import logging
import queue
import sqlite3
import threading
import time
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_MAX_QUEUE = 10_000 # ring-buffer depth; oldest dropped when full
_DRAIN_BATCH = 256
def _default_dir() -> Path:
"""Resolve the telemetry dir under the active HERMES_HOME (profile-safe)."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "telemetry"
def _default_db_path() -> Path:
"""Resolve state.db under the active HERMES_HOME (profile-safe)."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "state.db"
# Map a telemetry event dict (its "event" tag) to (table, column-ordered insert).
# Only the columns the indexer knows about are written; unknown keys are ignored,
# so an event carrying extra fields never breaks the insert.
_TABLE_COLUMNS: Dict[str, tuple] = {
"run": (
"tel_runs",
("run_id", "trace_id", "session_id", "profile_id", "entrypoint",
"platform", "start_ns", "end_ns", "end_reason",
"model_call_count", "tool_call_count", "error_count",
"estimated_cost_usd", "cost_status"),
),
"model_call": (
"tel_model_calls",
("span_id", "run_id", "provider", "model", "base_url",
"input_tokens", "output_tokens", "cache_read_tokens",
"cache_write_tokens", "reasoning_tokens", "latency_ms", "ttft_ms",
"estimated_cost_usd", "cost_status", "cost_source", "end_reason",
"retry_count"),
),
"tool_call": (
"tel_tool_calls",
("span_id", "run_id", "tool_name", "backend",
"duration_ms", "result_class", "retry_count", "approval"),
),
"error": (
"tel_error_events",
("run_id", "error_class", "subsystem", "recovery", "ts_ns"),
),
}
class TelemetryEmitter:
"""Owns the queue, the writer thread, and the telemetry sqlite connection."""
def __init__(
self,
*,
events_path: Optional[Path] = None,
db_path: Optional[Path] = None,
enabled: bool = True,
) -> None:
self._dir = (events_path.parent if events_path else _default_dir())
self._events_path = events_path or (self._dir / "events.jsonl")
self._db_path = db_path or _default_db_path()
self._enabled = enabled
self._q: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=_MAX_QUEUE)
self._dropped = 0
self._written = 0
self._stop = threading.Event()
self._started = False
self._lock = threading.Lock()
self._conn: Optional[sqlite3.Connection] = None
self._thread: Optional[threading.Thread] = None
# Optional live subscribers (e.g. OTLP exporter). Called from the writer
# thread AFTER durable writes, fully fail-isolated — a subscriber that
# raises or blocks can never affect the JSONL/SQLite source of truth or
# the hot path. Each subscriber is callable(batch: list[dict]).
self._subscribers: list = []
# ── public API (hot path) ───────────────────────────────────────────────
def emit(self, event: Any) -> None:
"""Enqueue an event. Never blocks, never raises.
``event`` may be a dataclass with ``to_dict()`` or a plain dict.
"""
if not self._enabled:
return
try:
payload = event.to_dict() if hasattr(event, "to_dict") else dict(event)
payload.setdefault("ts_ns", time.time_ns())
self._ensure_started()
try:
self._q.put_nowait(payload)
except queue.Full:
# Drop oldest to make room — bounded memory, newest-wins.
try:
self._q.get_nowait()
self._dropped += 1
self._q.put_nowait(payload)
except Exception:
self._dropped += 1
except Exception: # the hot-path invariant: never propagate
logger.debug("telemetry emit failed", exc_info=True)
# ── lifecycle ───────────────────────────────────────────────────────────
def _ensure_started(self) -> None:
if self._started:
return
with self._lock:
if self._started:
return
try:
self._dir.mkdir(parents=True, exist_ok=True)
except Exception:
logger.debug("telemetry dir create failed", exc_info=True)
self._thread = threading.Thread(
target=self._run, name="hermes-telemetry-writer", daemon=True
)
self._thread.start()
self._started = True
def _open_conn(self) -> Optional[sqlite3.Connection]:
if self._conn is not None:
return self._conn
try:
conn = sqlite3.connect(str(self._db_path), isolation_level=None, timeout=5.0)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
self._conn = conn
except Exception:
logger.debug("telemetry db open failed", exc_info=True)
self._conn = None
return self._conn
def _run(self) -> None:
while not self._stop.is_set():
try:
first = self._q.get(timeout=0.5)
except queue.Empty:
continue
batch = [first]
while len(batch) < _DRAIN_BATCH:
try:
batch.append(self._q.get_nowait())
except queue.Empty:
break
self._write_batch(batch)
def _write_batch(self, batch) -> None:
# JSONL append (source of truth) — best effort.
try:
with open(self._events_path, "a", encoding="utf-8") as fh:
for ev in batch:
fh.write(json.dumps(ev, ensure_ascii=False) + "\n")
except Exception:
logger.debug("telemetry jsonl append failed", exc_info=True)
# SQLite index — best effort, per-event so one bad row can't lose the batch.
conn = self._open_conn()
if conn is None:
return
for ev in batch:
try:
self._index_one(conn, ev)
self._written += 1
except Exception:
logger.debug("telemetry index row failed", exc_info=True)
# Live fan-out (e.g. OTLP) — AFTER durable writes, fully fail-isolated.
# A slow/raising subscriber never affects JSONL/SQLite or the hot path.
for sub in self._subscribers:
try:
sub(batch)
except Exception:
logger.debug("telemetry subscriber failed", exc_info=True)
def subscribe(self, callback) -> None:
"""Register a live batch subscriber (callable(batch: list[dict])).
Called from the writer thread after durable writes. Used by the OTLP
exporter for continuous streaming. Fail-isolated; never on the hot path.
"""
if callback not in self._subscribers:
self._subscribers.append(callback)
def unsubscribe(self, callback) -> None:
try:
self._subscribers.remove(callback)
except ValueError:
pass
def _index_one(self, conn: sqlite3.Connection, ev: Dict[str, Any]) -> None:
kind = ev.get("event")
spec = _TABLE_COLUMNS.get(kind)
if spec is None:
return
table, cols = spec
values = [ev.get(c) for c in cols]
placeholders = ", ".join("?" for _ in cols)
collist = ", ".join(cols)
conn.execute(
f"INSERT OR REPLACE INTO {table} ({collist}) VALUES ({placeholders})",
values,
)
# ── introspection / shutdown (tests, CLI) ───────────────────────────────
def flush(self, timeout: float = 2.0) -> None:
"""Block until the queue drains (test/CLI helper, NOT the hot path)."""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self._q.empty():
# give the writer a tick to finish the in-flight batch
time.sleep(0.05)
if self._q.empty():
return
time.sleep(0.02)
def stats(self) -> Dict[str, int]:
return {
"queued": self._q.qsize(),
"written": self._written,
"dropped": self._dropped,
}
def close(self) -> None:
self._stop.set()
if self._thread is not None:
self._thread.join(timeout=2.0)
if self._conn is not None:
try:
self._conn.close()
except Exception:
pass
self._conn = None
self._started = False
# ── process-wide singleton ──────────────────────────────────────────────────
_EMITTER: Optional[TelemetryEmitter] = None
_EMITTER_LOCK = threading.Lock()
def get_emitter() -> TelemetryEmitter:
"""Return the process-wide emitter, honoring telemetry.local config."""
global _EMITTER
if _EMITTER is not None:
return _EMITTER
with _EMITTER_LOCK:
if _EMITTER is None:
enabled = _local_enabled()
_EMITTER = TelemetryEmitter(enabled=enabled)
return _EMITTER
def _local_enabled() -> bool:
try:
from hermes_cli.config import load_config
cfg = load_config()
tel = cfg.get("telemetry") if isinstance(cfg, dict) else {}
return bool((tel or {}).get("local", True))
except Exception:
return True
def emit(event: Any) -> None:
"""Module-level convenience: emit via the singleton."""
get_emitter().emit(event)
def reset_emitter_for_tests(emitter: Optional[TelemetryEmitter] = None) -> None:
"""Swap the singleton (tests only)."""
global _EMITTER
with _EMITTER_LOCK:
if _EMITTER is not None and emitter is not _EMITTER:
try:
_EMITTER.close()
except Exception:
pass
_EMITTER = emitter
__all__ = [
"TelemetryEmitter",
"get_emitter",
"emit",
"reset_emitter_for_tests",
]

99
agent/telemetry/events.py Normal file
View File

@@ -0,0 +1,99 @@
"""Typed local-plane telemetry events.
These dataclasses are the rows written to the local JSONL log and the ``tel_*``
SQLite tables. They record the values observed for each run — model id, provider, tool
name, token counts, durations — and stay on the machine unless explicitly exported.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Optional
# ── local-plane events (real values) ────────────────────────────────────────
def _now_ns() -> int:
return time.time_ns()
@dataclass(slots=True)
class RunEvent:
"""One top-level workflow execution (a trace root)."""
run_id: str
trace_id: str
entrypoint: str
session_id: Optional[str] = None
profile_id: Optional[str] = None
platform: Optional[str] = None
start_ns: int = field(default_factory=_now_ns)
end_ns: Optional[int] = None
end_reason: Optional[str] = None
model_call_count: int = 0
tool_call_count: int = 0
error_count: int = 0
estimated_cost_usd: Optional[float] = None
cost_status: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"event": "run", **asdict(self)}
@dataclass(slots=True)
class ModelCallEvent:
span_id: str
run_id: str
provider: Optional[str] = None # raw provider, e.g. "anthropic"
model: Optional[str] = None # raw model id, e.g. "claude-opus-4"
base_url: Optional[str] = None
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_write_tokens: int = 0
reasoning_tokens: int = 0
latency_ms: Optional[int] = None
ttft_ms: Optional[int] = None
estimated_cost_usd: Optional[float] = None
cost_status: Optional[str] = None
cost_source: Optional[str] = None
end_reason: Optional[str] = None
retry_count: int = 0
def to_dict(self) -> Dict[str, Any]:
return {"event": "model_call", **asdict(self)}
@dataclass(slots=True)
class ToolCallEvent:
span_id: str
run_id: str
tool_name: Optional[str] = None # raw tool name, e.g. "web_search"
backend: Optional[str] = None
duration_ms: Optional[int] = None
result_class: Optional[str] = None
retry_count: int = 0
approval: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"event": "tool_call", **asdict(self)}
@dataclass(slots=True)
class ErrorEvent:
run_id: Optional[str]
error_class: str
subsystem: str
recovery: Optional[str] = None
ts_ns: int = field(default_factory=_now_ns)
def to_dict(self) -> Dict[str, Any]:
return {"event": "error", **asdict(self)}
__all__ = [
"RunEvent",
"ModelCallEvent",
"ToolCallEvent",
"ErrorEvent",
]

View File

@@ -0,0 +1,139 @@
"""Export telemetry (and optionally session content) to a file or stream.
Two data domains, both written to an operator-chosen destination:
* Telemetry: the tel_* rows + events.jsonl (structural observability).
* Content (opt-in via the trajectories plane): sessions + messages, with every
content field (message body, reasoning, raw tool-call args) passed through the
redaction pipeline (secrets always stripped; PII per content_redaction).
Formats: ndjson (default) and json. OTLP streaming export lives in otlp_exporter.py.
Content export is gated by ``redaction.content_export_enabled``.
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional, TextIO
from . import redaction
_TEL_TABLES = (
"tel_runs", "tel_model_calls", "tel_tool_calls", "tel_error_events",
)
def _open(db_path: Optional[Path]) -> sqlite3.Connection:
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
return c
def _iter_telemetry(conn: sqlite3.Connection, since_ns: Optional[int]) -> Iterator[Dict[str, Any]]:
for table in _TEL_TABLES:
# only tel_runs has start_ns; window the rest by run join when needed.
if table == "tel_runs" and since_ns:
rows = conn.execute(
f"SELECT * FROM {table} WHERE start_ns >= ?", (int(since_ns),)
).fetchall()
else:
rows = conn.execute(f"SELECT * FROM {table}").fetchall()
for r in rows:
d = dict(r)
d["_kind"] = table
yield d
def _iter_content(
db_path: Optional[Path],
*,
config: Optional[Dict[str, Any]],
include_content: bool,
) -> Iterator[Dict[str, Any]]:
"""Yield session records. Message bodies included only when trajectories on."""
from hermes_state import SessionDB
content_mode = redaction.content_mode_for(config)
db = SessionDB(db_path=db_path) if db_path else SessionDB()
try:
for session in db.export_all():
msgs = session.get("messages", []) or []
red_msgs = [
redaction.redact_message(
m, content_mode=content_mode, include_content=include_content
)
for m in msgs
]
# Session-level metadata is structural; keep ids/model/counts, drop
# any free-text title only when content is excluded.
out = {
"_kind": "session",
"id": session.get("id"),
"source": session.get("source"),
"model": session.get("model"),
"started_at": session.get("started_at"),
"ended_at": session.get("ended_at"),
"message_count": session.get("message_count"),
"tool_call_count": session.get("tool_call_count"),
"messages": red_msgs,
}
if include_content and session.get("title"):
out["title"] = redaction.redact_for_export(
session["title"], content_mode=content_mode
)
yield out
finally:
db.close()
def export(
out: TextIO,
*,
fmt: str = "ndjson",
since_ns: Optional[int] = None,
include_content: bool = False,
config: Optional[Dict[str, Any]] = None,
db_path: Optional[Path] = None,
) -> Dict[str, int]:
"""Write telemetry (+ optional content) to ``out``. Returns counts.
``include_content`` is honored only when the trajectories plane is enabled in
``config``; otherwise content is forced off and only structural data is written.
"""
# Trajectories gate: a flag cannot override the consent plane.
content_allowed = include_content and redaction.content_export_enabled(config)
counts = {"telemetry": 0, "sessions": 0, "content_included": int(content_allowed)}
conn = _open(db_path)
records: List[Dict[str, Any]] = []
try:
for rec in _iter_telemetry(conn, since_ns):
counts["telemetry"] += 1
if fmt == "ndjson":
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
else:
records.append(rec)
finally:
conn.close()
# Content/session domain (separate connection via SessionDB).
for rec in _iter_content(db_path, config=config, include_content=content_allowed):
counts["sessions"] += 1
if fmt == "ndjson":
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
else:
records.append(rec)
if fmt != "ndjson":
json.dump({"records": records}, out, ensure_ascii=False, indent=2)
return counts
__all__ = ["export"]

219
agent/telemetry/metrics.py Normal file
View File

@@ -0,0 +1,219 @@
"""Derive metric rollups from the local telemetry tables.
Reads the ``tel_*`` tables in state.db and returns aggregates for /usage, /insights,
and local dashboards. Metrics are computed by querying the event log rather than being
emitted on the hot path.
Each function accepts either an open caller-owned ``conn`` (reused, not closed) or a
``db_path`` (opened and closed internally). InsightsEngine passes its existing
connection; a standalone dashboard passes a path.
"""
from __future__ import annotations
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Iterator, List, Optional
@contextmanager
def _cursor(
conn: Optional[sqlite3.Connection], db_path: Optional[Path]
) -> Iterator[sqlite3.Connection]:
"""Yield a Row-factory connection. Closes it only if we opened it."""
if conn is not None:
prev_factory = conn.row_factory
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.row_factory = prev_factory
return
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
try:
yield c
finally:
c.close()
def _since_clause(since_ns: Optional[int], col: str = "start_ns") -> str:
return f" WHERE {col} >= {int(since_ns)}" if since_ns else ""
def workflow_summary(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
"""Run-level counters + duration percentiles (local plane, exact)."""
with _cursor(conn, db_path) as c:
where = _since_clause(since_ns)
total = c.execute(f"SELECT COUNT(*) n FROM tel_runs{where}").fetchone()["n"]
by_reason = {
r["end_reason"] or "unknown": r["n"]
for r in c.execute(
f"SELECT end_reason, COUNT(*) n FROM tel_runs{where} GROUP BY end_reason"
).fetchall()
}
by_entry = {
r["entrypoint"] or "unknown": r["n"]
for r in c.execute(
f"SELECT entrypoint, COUNT(*) n FROM tel_runs{where} GROUP BY entrypoint"
).fetchall()
}
dur_where = (where + " AND end_ns IS NOT NULL") if where else " WHERE end_ns IS NOT NULL"
durations = [
(r["end_ns"] - r["start_ns"]) / 1e6
for r in c.execute(
f"SELECT start_ns, end_ns FROM tel_runs{dur_where}"
).fetchall()
]
return {
"total_runs": total,
"by_end_reason": by_reason,
"by_entrypoint": by_entry,
"duration_ms_p50": _pct(durations, 50),
"duration_ms_p95": _pct(durations, 95),
"success_rate": round(by_reason.get("completed", 0) / total, 4) if total else 0.0,
}
def model_call_summary(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
rows = c.execute(
"SELECT provider, model, COUNT(*) n, "
"SUM(input_tokens) inp, SUM(output_tokens) outp, "
"SUM(cache_read_tokens) cache, AVG(latency_ms) avg_latency "
"FROM tel_model_calls GROUP BY provider, model"
).fetchall()
by_provider: Dict[str, int] = {}
by_model: Dict[str, int] = {}
tokens = {"input": 0, "output": 0, "cache_read": 0}
breakdown: List[Dict[str, Any]] = []
for r in rows:
prov = r["provider"] or "unknown"
mdl = r["model"] or "unknown"
by_provider[prov] = by_provider.get(prov, 0) + r["n"]
by_model[mdl] = by_model.get(mdl, 0) + r["n"]
tokens["input"] += r["inp"] or 0
tokens["output"] += r["outp"] or 0
tokens["cache_read"] += r["cache"] or 0
breakdown.append({
"provider": r["provider"],
"model": r["model"],
"calls": r["n"],
"avg_latency_ms": round(r["avg_latency"] or 0, 1),
})
cache_total = tokens["cache_read"] + tokens["input"]
return {
"by_provider": by_provider,
"by_model": by_model,
"tokens": tokens,
"cache_hit_rate": round(tokens["cache_read"] / cache_total, 4) if cache_total else 0.0,
"breakdown": breakdown,
}
def tool_call_summary(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
by_tool = {
r["tool_name"] or "unknown": r["n"]
for r in c.execute(
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls GROUP BY tool_name"
).fetchall()
}
fails = {
r["tool_name"] or "unknown": r["n"]
for r in c.execute(
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls "
"WHERE result_class IN ('error','timeout','blocked') GROUP BY tool_name"
).fetchall()
}
total = sum(by_tool.values())
total_fail = sum(fails.values())
return {
"by_tool": by_tool,
"failures_by_tool": fails,
"total": total,
"failure_rate": round(total_fail / total, 4) if total else 0.0,
}
def error_summary(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
with _cursor(conn, db_path) as c:
return {
"by_class": {
r["error_class"] or "unknown": r["n"]
for r in c.execute(
"SELECT error_class, COUNT(*) n FROM tel_error_events GROUP BY error_class"
).fetchall()
},
}
def _pct(values: List[float], p: int) -> float:
if not values:
return 0.0
s = sorted(values)
k = (len(s) - 1) * (p / 100)
lo = int(k)
hi = min(lo + 1, len(s) - 1)
frac = k - lo
return round(s[lo] + (s[hi] - s[lo]) * frac, 2)
def overview(
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> Dict[str, Any]:
"""One call for a dashboard: all the rollups."""
return {
"workflows": workflow_summary(db_path, since_ns, conn=conn),
"model_calls": model_call_summary(db_path, since_ns, conn=conn),
"tool_calls": tool_call_summary(db_path, conn=conn),
"errors": error_summary(db_path, conn=conn),
}
def has_data(
db_path: Optional[Path] = None,
*,
conn: Optional[sqlite3.Connection] = None,
) -> bool:
"""True when any telemetry runs exist (cheap guard for /insights rendering)."""
try:
with _cursor(conn, db_path) as c:
return c.execute("SELECT 1 FROM tel_runs LIMIT 1").fetchone() is not None
except Exception:
return False
__all__ = [
"workflow_summary",
"model_call_summary",
"tool_call_summary",
"error_summary",
"overview",
"has_data",
]

View File

@@ -0,0 +1,282 @@
"""Export telemetry to an OpenTelemetry Collector over OTLP/HTTP.
Maps telemetry events (which carry trace_id/run_id/span_id/parent_span_id) to OTel
spans and sends them to the endpoint configured under ``telemetry.export.otlp``. Lets
an operator stream Hermes telemetry into their own observability stack.
Notes:
* The destination is operator-configured; this module only sends to that endpoint.
It does not import or interact with any aggregate-metrics path.
* ``opentelemetry-sdk`` + ``opentelemetry-exporter-otlp-proto-http`` are an optional
extra (``pip install hermes-agent[otlp]``), imported lazily so the dependency is
only required when OTLP export is actually used.
* ``headers_env`` maps a header name to an environment variable name; values are read
from the environment at export time and never logged or stored.
* The continuous subscriber runs in the emitter's writer thread after durable writes
and is fail-isolated, so an export error cannot affect a run.
Spans carry structural telemetry by default. Message content is included only when the
trajectories plane is enabled, and always passes through the export redaction pipeline.
"""
from __future__ import annotations
import logging
import os
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class OTLPUnavailable(RuntimeError):
"""Raised when the optional OpenTelemetry SDK isn't installed."""
def _require_sdk(*, auto_install: bool = True, prompt: bool = True):
"""Import the OTel SDK, lazily installing it on first use if needed.
Routes through tools.lazy_deps (feature 'export.otlp') so a missing SDK
triggers the standard venv install flow — same as every other optional
backend — gated by security.allow_lazy_installs and TTY-prompted. Falls back
to OTLPUnavailable (with a manual install hint) when the SDK can't be made
importable (lazy installs disabled, install failed, or auto_install=False).
``auto_install``: attempt the lazy install when missing (default True).
``prompt``: ask before installing when interactive (default True); pass
False from non-interactive contexts like the continuous streamer.
"""
if auto_install:
try:
from tools.lazy_deps import ensure as _lazy_ensure
_lazy_ensure("export.otlp", prompt=prompt)
except ImportError:
pass # lazy_deps unavailable — fall through to the import attempt
except Exception:
# FeatureUnavailable (lazy installs disabled / declined / failed) —
# fall through; the import below raises OTLPUnavailable with the hint.
pass
try:
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.trace import SpanKind
return {
"TracerProvider": TracerProvider,
"BatchSpanProcessor": BatchSpanProcessor,
"Resource": Resource,
"OTLPSpanExporter": OTLPSpanExporter,
"SpanKind": SpanKind,
}
except Exception as e: # ImportError or partial install
raise OTLPUnavailable(
"OTLP export requires the optional dependency. Install with:\n"
" pip install 'hermes-agent[otlp]'\n"
f"(import error: {e})"
)
def _resolve_headers(headers_env: Optional[Dict[str, str]]) -> Dict[str, str]:
"""Resolve {header_name: ENV_VAR_NAME} -> {header_name: value} from env.
The config stores environment variable names, not secret values; values are read
from the environment here. Missing variables are skipped (and noted at debug level
without the value).
"""
resolved: Dict[str, str] = {}
for header_name, env_name in (headers_env or {}).items():
val = os.environ.get(str(env_name))
if val:
resolved[str(header_name)] = val
else:
logger.debug("OTLP header %s: env var %s not set; skipping",
header_name, env_name)
return resolved
def _otlp_config(config: Dict[str, Any]) -> Dict[str, Any]:
tel = (config or {}).get("telemetry") or {}
export = tel.get("export") or {}
return export.get("otlp") or {}
def build_exporter(config: Dict[str, Any]):
"""Construct an OTLP span exporter from config. Raises OTLPUnavailable if no SDK."""
sdk = _require_sdk()
otlp = _otlp_config(config)
endpoint = otlp.get("endpoint")
if not endpoint:
raise ValueError("telemetry.export.otlp.endpoint is not set")
headers = _resolve_headers(otlp.get("headers_env"))
return sdk["OTLPSpanExporter"](endpoint=endpoint, headers=headers or None)
def _make_provider(config: Dict[str, Any]):
sdk = _require_sdk()
resource = sdk["Resource"].create({
"service.name": "hermes-agent",
"telemetry.plane": "local", # never aggregate
})
provider = sdk["TracerProvider"](resource=resource)
processor = sdk["BatchSpanProcessor"](build_exporter(config))
provider.add_span_processor(processor)
return provider, processor
# ── event -> span attribute mapping (real values) ───────────────────────────
def _span_attrs(ev: Dict[str, Any]) -> Dict[str, Any]:
"""Span attributes for an event — the real recorded values (local plane)."""
kind = ev.get("event")
attrs: Dict[str, Any] = {"hermes.event": kind or "unknown"}
keep_by_kind = {
"run": ("entrypoint", "platform", "end_reason",
"model_call_count", "tool_call_count", "error_count",
"estimated_cost_usd", "cost_status"),
"model_call": ("provider", "model", "base_url",
"input_tokens", "output_tokens", "cache_read_tokens",
"cache_write_tokens", "reasoning_tokens", "latency_ms",
"ttft_ms", "end_reason"),
"tool_call": ("tool_name", "backend", "duration_ms", "result_class"),
"error": ("error_class", "subsystem", "recovery"),
}
for col in keep_by_kind.get(kind, ()): # type: ignore[arg-type]
v = ev.get(col)
if v is not None:
attrs[f"hermes.{col}"] = v
return attrs
def export_batch(provider, batch: List[Dict[str, Any]]) -> int:
"""Map a batch of events to OTel spans. Returns spans created."""
tracer = provider.get_tracer("hermes.telemetry")
n = 0
for ev in batch:
try:
name = f"hermes.{ev.get('event', 'event')}"
span = tracer.start_span(name, attributes=_span_attrs(ev))
span.end()
n += 1
except Exception:
logger.debug("OTLP span map failed", exc_info=True)
return n
# ── one-shot drain (export current local rows) ──────────────────────────────
def export_once(
config: Dict[str, Any],
*,
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
) -> int:
"""Drain the local tel_* tables to the configured OTLP endpoint once."""
provider, processor = _make_provider(config)
try:
rows = _read_events(db_path, since_ns)
total = export_batch(provider, rows)
processor.force_flush()
return total
finally:
try:
provider.shutdown()
except Exception:
pass
def _read_events(db_path: Optional[Path], since_ns: Optional[int]) -> List[Dict[str, Any]]:
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
out: List[Dict[str, Any]] = []
try:
table_event = {
"tel_runs": "run", "tel_model_calls": "model_call",
"tel_tool_calls": "tool_call", "tel_error_events": "error",
}
for table, evkind in table_event.items():
where = ""
if table == "tel_runs" and since_ns:
where = f" WHERE start_ns >= {int(since_ns)}"
for r in c.execute(f"SELECT * FROM {table}{where}").fetchall():
d = dict(r)
d["event"] = evkind
out.append(d)
finally:
c.close()
return out
# ── continuous streaming subscriber ─────────────────────────────────────────
class OTLPStreamer:
"""A live subscriber that pushes each emitter batch to OTLP as it lands.
Register with ``emitter.subscribe(streamer)``. Fail-isolated by the emitter.
"""
def __init__(self, config: Dict[str, Any]):
self._provider, self._processor = _make_provider(config)
self.exported = 0
def __call__(self, batch: List[Dict[str, Any]]) -> None:
self.exported += export_batch(self._provider, batch)
def shutdown(self) -> None:
try:
self._processor.force_flush()
self._provider.shutdown()
except Exception:
pass
def is_available() -> bool:
"""True when the OTel SDK is already importable. Does NOT auto-install —
this is a pure check (e.g. for status display)."""
try:
_require_sdk(auto_install=False)
return True
except OTLPUnavailable:
return False
def is_enabled(config: Dict[str, Any]) -> bool:
otlp = _otlp_config(config)
return bool(otlp.get("enabled") and otlp.get("endpoint"))
def start_streaming(config: Dict[str, Any]) -> Optional[OTLPStreamer]:
"""If OTLP is enabled, attach a streamer to the singleton emitter.
Non-interactive context (startup): attempts a lazy install with prompt=False
so a configured-but-missing SDK is installed once (gated by
security.allow_lazy_installs), then streams. If it still can't load, logs and
no-ops — never blocks or raises into startup.
"""
if not is_enabled(config):
return None
try:
_require_sdk(prompt=False)
except OTLPUnavailable:
logger.warning("telemetry.export.otlp.enabled but the OTel SDK could not "
"be installed/imported; install 'hermes-agent[otlp]'")
return None
from agent.telemetry.emitter import get_emitter
streamer = OTLPStreamer(config)
get_emitter().subscribe(streamer)
return streamer
__all__ = [
"OTLPUnavailable",
"OTLPStreamer",
"build_exporter",
"export_once",
"export_batch",
"is_available",
"is_enabled",
"start_streaming",
]

107
agent/telemetry/policy.py Normal file
View File

@@ -0,0 +1,107 @@
"""Telemetry consent posture and the aggregate-plane gate.
Consent is a single field, ``telemetry.consent_state``:
* "unknown" — no choice recorded; never uploads (the default).
* "local" — declined the aggregate plane; local plane only.
* "aggregate" — opted in to the aggregate plane.
The config file is the source of truth: set ``telemetry.consent_state`` with
``hermes config set`` (or a managed-scope pin). There is no separate boolean mirror —
a single field cannot drift out of sync with itself, so a stray value can't
accidentally imply consent.
``allow_aggregate`` is the hard gate. An administrator pins
``telemetry.allow_aggregate: false`` through the managed-scope layer
(``/etc/hermes/config.yaml``), which takes precedence over the user's config; when it
is false, the aggregate plane is off regardless of ``consent_state``.
This module makes the decisions; it performs no I/O and contains no uploader. A future
uploader must call :func:`may_upload_aggregate` at its boundary.
"""
from __future__ import annotations
import uuid
from dataclasses import dataclass
from typing import Any, Dict
CONSENT_UNKNOWN = "unknown"
CONSENT_LOCAL = "local"
CONSENT_AGGREGATE = "aggregate"
_VALID_STATES = {CONSENT_UNKNOWN, CONSENT_LOCAL, CONSENT_AGGREGATE}
@dataclass(slots=True)
class TelemetryDecision:
"""The resolved telemetry posture for the current process."""
local_enabled: bool
aggregate_enabled: bool
consent_state: str
install_id: str
allow_aggregate: bool
def may_upload_aggregate(self) -> bool:
"""The single gate the uploader must consult before any network send."""
return self.allow_aggregate and self.consent_state == CONSENT_AGGREGATE
def _telemetry_cfg(config: Dict[str, Any]) -> Dict[str, Any]:
cfg = config.get("telemetry") if isinstance(config, dict) else None
return cfg if isinstance(cfg, dict) else {}
def ensure_install_id(config: Dict[str, Any]) -> str:
"""Return a stable install id, minting one if the config slot is empty.
Does not persist — the caller writes the returned value back to config.yaml. A
fresh uuid4 is used; clearing ``telemetry.install_id`` (e.g. with
``hermes config set telemetry.install_id ""``) causes the next call to mint anew.
"""
tel = _telemetry_cfg(config)
existing = tel.get("install_id")
if isinstance(existing, str) and existing.strip():
return existing
return str(uuid.uuid4())
def resolve(config: Dict[str, Any]) -> TelemetryDecision:
"""Resolve the effective telemetry posture from config.
``consent_state`` is the single source of truth for the aggregate opt-in.
``allow_aggregate`` (admin-pinnable via managed scope) hard-disables the aggregate
plane regardless of consent.
"""
tel = _telemetry_cfg(config)
local_enabled = bool(tel.get("local", True))
allow_aggregate = bool(tel.get("allow_aggregate", True))
state = tel.get("consent_state", CONSENT_UNKNOWN)
if state not in _VALID_STATES:
state = CONSENT_UNKNOWN
aggregate_enabled = allow_aggregate and state == CONSENT_AGGREGATE
return TelemetryDecision(
local_enabled=local_enabled,
aggregate_enabled=aggregate_enabled,
consent_state=state,
install_id=ensure_install_id(config),
allow_aggregate=allow_aggregate,
)
def may_upload_aggregate(config: Dict[str, Any]) -> bool:
"""Convenience gate for the uploader boundary."""
return resolve(config).may_upload_aggregate()
__all__ = [
"CONSENT_UNKNOWN",
"CONSENT_LOCAL",
"CONSENT_AGGREGATE",
"TelemetryDecision",
"resolve",
"may_upload_aggregate",
"ensure_install_id",
]

View File

@@ -0,0 +1,187 @@
"""Redaction applied to telemetry data on export.
Two independent controls:
* Secrets are always redacted, on every export and in every mode; no setting
disables this. Wraps ``agent/redact.py::redact_sensitive_text(force=True)``.
* Whether message bodies, reasoning, and raw tool arguments are exportable at all is
governed by the trajectories plane (``telemetry.trajectories.enabled``, default
off, admin-pinnable), not by a redaction mode. With trajectories off, content is
dropped. With it on, content is exportable and ``content_redaction`` (none|pii)
controls how much is scrubbed; secrets are still always stripped.
This applies to the local and trajectory export paths. It is unrelated to any
aggregate-metrics path.
"""
from __future__ import annotations
import re
from typing import Any, Dict, List, Optional
# Content-redaction strengths for any content that IS exported.
CONTENT_NONE = "none" # drop content entirely (structural telemetry only)
CONTENT_PII = "pii" # codec-aware PII redaction on exported content
CONTENT_MODES = {CONTENT_NONE, CONTENT_PII}
# ── PII patterns (applied only in CONTENT_PII mode, on content that is exported) ──
_EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
# E.164-ish and common separators; conservative to avoid nuking code/IDs.
_PHONE_RE = re.compile(
r"(?<!\w)(?:\+?\d{1,3}[\s.\-]?)?(?:\(\d{2,4}\)[\s.\-]?)?\d{3}[\s.\-]?\d{3,4}(?:[\s.\-]?\d{2,4})?(?!\w)"
)
# Long opaque hex/uuid-ish user identifiers.
_UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b")
def _secret_redact(text: Optional[str]) -> Optional[str]:
"""Always-on secret redaction. force=True so user config can't disable it."""
if text is None:
return None
try:
from agent.redact import redact_sensitive_text
return redact_sensitive_text(str(text), force=True)
except Exception:
# Fail CLOSED: if the redactor can't run, do not emit the raw string.
return "[redaction-unavailable]"
def _pii_redact(text: str) -> str:
text = _EMAIL_RE.sub("[email]", text)
text = _UUID_RE.sub("[id]", text)
text = _PHONE_RE.sub("[phone]", text)
return text
def redact_for_export(
text: Optional[str],
*,
content_mode: str = CONTENT_NONE,
) -> Optional[str]:
"""Redact a single content string for export.
Secrets are ALWAYS stripped. Then PII is stripped when content_mode is 'pii'.
Callers gate *whether content is exported at all* via the trajectories plane
(see ``content_export_enabled``); this function only scrubs content that the
caller has already decided to export.
"""
redacted = _secret_redact(text)
if redacted is None:
return None
if content_mode == CONTENT_PII:
redacted = _pii_redact(redacted)
return redacted
def content_export_enabled(config: Optional[Dict[str, Any]]) -> bool:
"""True only when the trajectories plane is explicitly enabled.
This is the consent gate for exporting message bodies / reasoning / raw tool
args. Default off. Admin-pinnable via managed scope (telemetry.trajectories.enabled).
"""
try:
tel = (config or {}).get("telemetry") or {}
traj = tel.get("trajectories") or {}
return bool(traj.get("enabled", False))
except Exception:
return False
def content_mode_for(config: Optional[Dict[str, Any]]) -> str:
try:
tel = (config or {}).get("telemetry") or {}
mode = tel.get("content_redaction", CONTENT_NONE)
return mode if mode in CONTENT_MODES else CONTENT_NONE
except Exception:
return CONTENT_NONE
# ── Codec-aware message redaction (NeMo pattern) ─────────────────────────────
# Redact the right fields of a provider message shape rather than regex-blasting
# the whole blob. Structure (roles, names, counts) is preserved; only the
# free-text content fields are scrubbed.
def redact_message(
msg: Dict[str, Any],
*,
content_mode: str = CONTENT_NONE,
include_content: bool = False,
) -> Dict[str, Any]:
"""Redact one chat message dict for export.
When include_content is False (trajectories off), content/reasoning/tool-arg
fields are dropped — only structural fields (role, tool name, counts) remain.
When True, those fields are kept but passed through redact_for_export.
"""
role = msg.get("role")
out: Dict[str, Any] = {"role": role}
# Always-structural fields.
if msg.get("tool_name") is not None:
out["tool_name"] = msg.get("tool_name")
if msg.get("name") is not None:
out["name"] = msg.get("name")
if not include_content:
# Structural only: record presence/size, not bytes.
c = msg.get("content")
if c is not None:
out["content_chars"] = len(str(c))
if msg.get("reasoning_content"):
out["reasoning_chars"] = len(str(msg["reasoning_content"]))
if msg.get("tool_calls"):
out["tool_call_count"] = _count_tool_calls(msg["tool_calls"])
return out
# Content included (trajectories enabled): scrub then keep.
if msg.get("content") is not None:
out["content"] = redact_for_export(msg["content"], content_mode=content_mode)
if msg.get("reasoning_content"):
out["reasoning_content"] = redact_for_export(
msg["reasoning_content"], content_mode=content_mode
)
if msg.get("tool_calls"):
out["tool_calls"] = _redact_tool_calls(msg["tool_calls"], content_mode=content_mode)
return out
def _count_tool_calls(tool_calls: Any) -> int:
try:
import json
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
return len(tc) if isinstance(tc, list) else (1 if tc else 0)
except Exception:
return 0
def _redact_tool_calls(tool_calls: Any, *, content_mode: str) -> Any:
"""Redact raw tool-call arguments (free text) while keeping function names."""
import json
try:
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
except Exception:
return "[unparseable-tool-calls]"
if not isinstance(tc, list):
return []
out: List[Dict[str, Any]] = []
for call in tc:
if not isinstance(call, dict):
continue
fn = (call.get("function") or {}) if isinstance(call.get("function"), dict) else {}
name = fn.get("name") or call.get("name")
args = fn.get("arguments")
red_args = redact_for_export(args, content_mode=content_mode) if args is not None else None
out.append({"name": name, "arguments": red_args})
return out
__all__ = [
"CONTENT_NONE",
"CONTENT_PII",
"CONTENT_MODES",
"redact_for_export",
"content_export_enabled",
"content_mode_for",
"redact_message",
]

145
agent/telemetry/rollup.py Normal file
View File

@@ -0,0 +1,145 @@
"""Build per-run summary events from the local telemetry tables.
Reads the ``tel_*`` tables and projects each completed run into a summary dict holding
the recorded values: provider, models used, tool names, token totals, duration, and
cost. Powers ``hermes telemetry preview``. No aggregation or bucketing is applied here.
"""
from __future__ import annotations
import platform
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
def _os_family() -> str:
s = platform.system().lower()
if s.startswith("lin"):
return "linux"
if s == "darwin":
return "macos"
if s.startswith("win"):
return "windows"
return "other"
def _hermes_version() -> str:
try:
from hermes_cli import __version__
return str(__version__)
except Exception:
return "0.0.0"
def _open(db_path: Optional[Path], conn: Optional[sqlite3.Connection]):
if conn is not None:
prev = conn.row_factory
conn.row_factory = sqlite3.Row
return conn, prev, False
if db_path is None:
from hermes_constants import get_hermes_home
db_path = get_hermes_home() / "state.db"
c = sqlite3.connect(str(db_path), timeout=5.0)
c.row_factory = sqlite3.Row
return c, None, True
def _run_events(c: sqlite3.Connection, since_ns: Optional[int]) -> List[Dict[str, Any]]:
"""Project completed runs into per-run summary dicts."""
where = " WHERE end_ns IS NOT NULL"
if since_ns:
where += f" AND start_ns >= {int(since_ns)}"
rows = c.execute(
"SELECT run_id, entrypoint, platform, end_reason, start_ns, end_ns, "
"model_call_count, tool_call_count, error_count, estimated_cost_usd "
"FROM tel_runs" + where
).fetchall()
events: List[Dict[str, Any]] = []
for r in rows:
# Models actually used in this run (real ids), with token totals.
models = [
{"provider": m["provider"], "model": m["model"],
"calls": m["n"], "input_tokens": int(m["inp"] or 0),
"output_tokens": int(m["outp"] or 0)}
for m in c.execute(
"SELECT provider, model, COUNT(*) n, SUM(input_tokens) inp, "
"SUM(output_tokens) outp FROM tel_model_calls WHERE run_id = ? "
"GROUP BY provider, model ORDER BY n DESC",
(r["run_id"],),
).fetchall()
]
tools = [
row["tool_name"]
for row in c.execute(
"SELECT DISTINCT tool_name FROM tel_tool_calls WHERE run_id = ?",
(r["run_id"],),
).fetchall()
if row["tool_name"]
]
trow = c.execute(
"SELECT SUM(input_tokens) inp, SUM(output_tokens) outp "
"FROM tel_model_calls WHERE run_id = ?",
(r["run_id"],),
).fetchone()
duration_ms = (r["end_ns"] - r["start_ns"]) / 1e6 if r["end_ns"] else None
events.append({
"event_name": "workflow_completed",
"run_id": r["run_id"],
"entrypoint": r["entrypoint"] or "cli",
"platform": r["platform"],
"end_reason": r["end_reason"] or "completed",
"models_used": models,
"tools_used": tools,
"model_call_count": r["model_call_count"] or 0,
"tool_call_count": r["tool_call_count"] or 0,
"error_count": r["error_count"] or 0,
"duration_ms": round(duration_ms, 1) if duration_ms is not None else None,
"input_tokens": int((trow["inp"] if trow else 0) or 0),
"output_tokens": int((trow["outp"] if trow else 0) or 0),
"estimated_cost_usd": r["estimated_cost_usd"],
})
return events
def build_aggregate_events(
*,
install_id: str,
db_path: Optional[Path] = None,
since_ns: Optional[int] = None,
conn: Optional[sqlite3.Connection] = None,
include_heartbeat: bool = True,
) -> List[Dict[str, Any]]:
"""Return per-run summary events plus an optional heartbeat."""
c, prev_factory, owned = _open(db_path, conn)
try:
events = _run_events(c, since_ns)
if include_heartbeat:
events.append({
"event_name": "heartbeat",
"install_id": install_id,
"hermes_version": _hermes_version(),
"os_family": _os_family(),
"entrypoint": "cli",
})
return events
finally:
if owned:
c.close()
elif prev_factory is not None:
c.row_factory = prev_factory
def summarize(events: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Counts by event_name + field coverage, for status/preview output."""
by_name: Dict[str, int] = {}
fields = set()
for e in events:
name = e.get("event_name", "?")
by_name[name] = by_name.get(name, 0) + 1
fields.update(e.keys())
return {"total": len(events), "by_event_name": by_name, "fields_present": sorted(fields)}
__all__ = ["build_aggregate_events", "summarize"]

83
agent/telemetry/spans.py Normal file
View File

@@ -0,0 +1,83 @@
"""Trace / run / span id propagation via contextvars.
Telemetry events share IDs so a workflow can be reconstructed: one ``trace_id`` per
workflow, one ``run_id`` per top-level execution, ``span_id`` per timed operation, and
``parent_span_id`` for nesting. These live in contextvars so async tool calls and
spawned subagents inherit the lineage automatically.
Provides helpers to start/clear a run context and mint child span ids. The telemetry
plugin sets the run context on session start and reads it in each hook callback.
Nothing here writes to storage — it only carries ids.
"""
from __future__ import annotations
import contextvars
import uuid
from dataclasses import dataclass
from typing import Optional
_trace_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_trace_id", default=None
)
_run_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_run_id", default=None
)
_parent_span_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
"hermes_tel_parent_span_id", default=None
)
def new_id() -> str:
return uuid.uuid4().hex
@dataclass(slots=True)
class RunContext:
trace_id: str
run_id: str
def start_run(trace_id: Optional[str] = None, run_id: Optional[str] = None) -> RunContext:
"""Begin a run context, minting ids when not supplied. Sets contextvars."""
tid = trace_id or new_id()
rid = run_id or new_id()
_trace_id.set(tid)
_run_id.set(rid)
_parent_span_id.set(None)
return RunContext(trace_id=tid, run_id=rid)
def current_trace_id() -> Optional[str]:
return _trace_id.get()
def current_run_id() -> Optional[str]:
return _run_id.get()
def current_parent_span_id() -> Optional[str]:
return _parent_span_id.get()
def new_span_id() -> str:
"""Mint a span id (does not alter the parent pointer)."""
return new_id()
def clear_run() -> None:
_trace_id.set(None)
_run_id.set(None)
_parent_span_id.set(None)
__all__ = [
"RunContext",
"new_id",
"start_run",
"current_trace_id",
"current_run_id",
"current_parent_span_id",
"new_span_id",
"clear_run",
]

View File

@@ -1159,6 +1159,49 @@ display:
# # Routing/delivery still uses the original values internally.
# redact_pii: false
# =============================================================================
# Telemetry & Observability
# =============================================================================
# Three planes with a hard wall between them:
# local — full-fidelity observability you own. Default ON.
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
# trajectories — content trajectories for training. Separate consent (later).
#
# The local plane records real values (actual models, providers, tool names) —
# your own data, on your machine. The aggregate plane is opt-in and has no
# uploader today; if one ships it would summarize at that egress boundary.
#
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
# value. To hard-forbid egress on a locked-down deployment, pin
# `telemetry.allow_aggregate: false` there.
# telemetry:
# # Local plane: event log + SQLite index in state.db. Never leaves your
# # machine unless you export it or opt into the aggregate plane.
# local: true
# # Hard gate for the aggregate plane. When false, the aggregate plane is off
# # regardless of consent_state. Pin false via managed scope to forbid egress.
# allow_aggregate: true
# # Aggregate-plane consent (the opt-in). No uploader ships yet.
# # unknown (no choice — never uploads) | local (declined) | aggregate (opted in)
# consent_state: unknown
# # Stable install id (aggregate plane only). Empty = mint on first use;
# # clear it to rotate.
# install_id: ""
# # Local event-log retention before rotation (days).
# retention_days: 90
# # Keep secret redaction on even at full local capture.
# redact_secrets: true
# # Content redaction for exports / support bundles: none | pii.
# content_redaction: none
# # Exporters. The OTLP exporter sends spans to a configured Collector endpoint;
# # header values reference environment variable names, not inline secrets.
# export:
# otlp:
# enabled: false
# endpoint: null
# headers_env: {} # e.g. {Authorization: MY_OTLP_TOKEN_ENVVAR}
# =============================================================================
# Shell-script hooks
# =============================================================================

View File

@@ -307,6 +307,13 @@ nested agent work or security lifecycle events.
## Existing Consumers
The bundled **`telemetry`** plugin is the built-in local-first telemetry plane: it
records runs, model calls, and tool calls to a local event log + `state.db`, powers
`hermes insights`, and supports export to your own file or OpenTelemetry Collector. It
is on by default and never sends data to Nous unless you opt in. See
[`telemetry.md`](./telemetry.md) for the full feature, the `hermes telemetry` commands,
and enterprise export.
The bundled Langfuse plugin demonstrates direct hook-based observability for
turns, provider requests, and tool calls.

View File

@@ -0,0 +1,209 @@
# Telemetry & Observability
Hermes ships with a built-in, local-first telemetry system. It records what your
agent does — workflows, model calls, tool calls, errors — to your own machine, powers
`/insights`, and (when *you* enable it) exports everything to your own observability
stack. It is private by default and never sends your data to Nous unless you explicitly
opt in to anonymous aggregate metrics.
This page explains the whole feature: the three planes, what's captured, the
`hermes telemetry` commands, and how an enterprise streams or exports all of its data
to its own infrastructure.
> Looking for the **plugin hook contract** (how to write your own observer plugin)?
> That's in [`README.md`](./README.md). This page is about the built-in telemetry
> plane and its CLI.
## The three planes
Telemetry is organized into three planes with a hard wall between them:
| Plane | What it holds | Default | Destination |
| --- | --- | --- | --- |
| **local** | Full-fidelity observability — runs, model/tool calls (real model & provider names), durations, errors | **on** | your machine only |
| **aggregate** | Opt-in metadata to Nous (no uploader ships yet) | **off** (opt-in) | Nous, only if you enable it |
| **trajectories** | Full message content / reasoning / raw tool args | **off** (opt-in) | your own export destinations only |
The local plane is the one you'll use day to day — it records the real values that
happened (actual model ids, providers, tool names). The aggregate plane is the only
thing that could ever leave for Nous; it is opt-in, default-off, and has no uploader
today. The trajectories plane unlocks full-content export to *your own* destinations —
it is never wired to Nous.
## Local plane — always-on observability
The local plane is implemented as a bundled `telemetry` plugin that listens to Hermes
lifecycle hooks (model calls, tool calls, session start/finalize) and writes events to:
- an append-only JSONL log at `~/.hermes/telemetry/events.jsonl` (the source of truth)
- indexed `tel_*` tables in `state.db` (for fast queries and rollups)
Writes are fire-and-forget on a background thread: telemetry can never block, slow, or
fail a model call or tool call. If the local plane is disabled (`telemetry.local: false`)
the plugin does not load at all.
### Seeing your local data
```bash
hermes insights # usage report — now includes an "Observability" section
hermes telemetry status # planes, consent, export posture, local data volume
```
The `insights` Observability section shows workflow counts and success rate, duration
p50/p95, tool failure rates by category, provider/model-class mix, and cache hit rate —
all computed locally with exact values.
## `hermes telemetry` commands
```text
hermes telemetry status Show planes, consent state, export posture, local volume
hermes telemetry preview Show the aggregate events that would be produced (local)
hermes telemetry export Export local telemetry to a file/stream or OTLP endpoint
```
Consent and the install id are plain config, not separate verbs — set them with
`hermes config set` (or a managed-scope pin):
```bash
hermes config set telemetry.consent_state aggregate # opt in to the aggregate plane
hermes config set telemetry.consent_state local # opt out (local plane stays on)
hermes config set telemetry.install_id "" # reset the install id (mints a new one)
```
### Aggregate plane (opt-in)
The aggregate plane is **off by default** and has **no uploader today** — nothing is
sent to Nous. Consent lives in `telemetry.consent_state` (`unknown` / `local` /
`aggregate`); setting it to `aggregate` records the opt-in for if/when an uploader ships.
If one is built, it would summarize at that egress boundary.
`hermes telemetry preview` shows your recent runs as they'd be summarized — computed and
shown **locally only**, with the real model and tool names from your own telemetry. It's
a local inspection surface, not an upload.
## Enterprise: getting all of your data
Everything below sends data to **your own** destination — a file, your SIEM, or your own
OpenTelemetry Collector. None of it goes to Nous.
### Bulk export to a file
```bash
# Structural telemetry only (default — no message content)
hermes telemetry export --out telemetry.ndjson
# JSON instead of NDJSON, last 7 days only
hermes telemetry export --out dump.json --format json --since 7
```
By default the export is **structural** — runs, model/tool-call metadata, session shells
with message *counts* but no message bodies.
### Including content (trajectories plane)
To export full message content, enable the trajectories plane. This is a deliberate,
separate consent — it's how an enterprise opts into exporting work-product content to its
own store:
```yaml
# config.yaml
telemetry:
trajectories:
enabled: true # unlocks content export to YOUR destination
content_redaction: pii # "none" | "pii"
```
```bash
hermes telemetry export --out full.ndjson --include-content
```
`--include-content` is a no-op unless the trajectories plane is enabled — the consent
plane governs, not the flag.
### Live streaming to your OpenTelemetry Collector / SIEM (OTLP)
Hermes can stream telemetry to your own OTLP endpoint. This requires the optional `otlp`
extra:
```bash
pip install 'hermes-agent[otlp]'
```
```yaml
# config.yaml
telemetry:
export:
otlp:
enabled: true
endpoint: "https://collector.your-corp.internal:4318/v1/traces"
headers_env: # secrets by reference — env var NAMES, not values
Authorization: CORP_OTLP_TOKEN
```
```bash
export CORP_OTLP_TOKEN="..." # the actual token lives in the environment
hermes telemetry export --otlp # drain current telemetry to your collector
```
Span attributes are structural by default. For authentication, the config holds the
*name* of an environment variable rather than the secret itself; the value is read at
export time and is never written to config or logged.
## Redaction
Two independent controls govern what content looks like on export:
| Control | Values | Effect |
| --- | --- | --- |
| Secret redaction | always on | API keys, tokens, auth headers, connection strings are **always** stripped on every export path. Cannot be disabled. |
| `content_redaction` | `none` \| `pii` | When content is exported, `pii` additionally redacts emails, phone numbers, and id-shaped strings. |
Secret redaction is always on — even at full content fidelity — because a SIEM or
warehouse full of live credentials is a bigger attack target than the data it holds. It
fails closed: if the redactor can't run, the raw string is not emitted.
## Configuration reference
```yaml
telemetry:
local: true # local plane (default on)
allow_aggregate: true # hard gate; pin false to forbid the aggregate plane entirely
consent_state: unknown # aggregate opt-in: unknown | local | aggregate
install_id: "" # stable anon id; "" mints one; clear to rotate
retention_days: 90 # local event-log retention
redact_secrets: true # always-on secret redaction (kept on by design)
content_redaction: none # none | pii
trajectories:
enabled: false # unlocks full-content export to your destination
export:
otlp:
enabled: false
endpoint: null
headers_env: {} # {HeaderName: ENV_VAR_NAME}
```
### Enterprise policy via managed scope
Any `telemetry.*` key can be pinned by an administrator through Hermes' managed-scope
layer (`/etc/hermes/config.yaml`), which wins over the user's value on a per-key basis.
There is no telemetry-specific policy block — to lock down a fleet, pin the keys you care
about. Common examples:
- `telemetry.allow_aggregate: false` — the aggregate plane stays off even if
`consent_state` is set to `aggregate`.
- `telemetry.export.otlp.endpoint` — point every install at the corporate collector.
- `telemetry.trajectories.enabled` — centrally decide whether content export is allowed.
When a key is managed, attempts to change it are rejected by managed scope with a message
naming the source. `hermes telemetry status` shows the current export posture (endpoint
host, whether the auth env var is set, content gate, redaction modes) — it never prints
secret values.
## Privacy summary
- Local telemetry never leaves your machine.
- The aggregate plane (the only thing that could go to Nous) is opt-in, default-off,
and has no uploader today — nothing is sent.
- All export surfaces (file, OTLP) point at *your* destinations.
- Secrets are always redacted on export; content export is off until you enable the
trajectories plane; PII redaction is a knob.

View File

@@ -1803,7 +1803,61 @@ DEFAULT_CONFIG = {
"privacy": {
"redact_pii": False, # When True, hash user IDs and strip phone numbers from LLM context
},
# Telemetry & observability. Three planes with a hard wall between them:
# local — full-fidelity local observability the user owns (real
# models, providers, tool names). Default ON.
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
# trajectories — content trajectories for training. Separate consent (not here yet).
#
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
# value on a per-key basis. There is no telemetry-specific policy block — pin
# e.g. `telemetry.allow_aggregate: false` in managed scope to hard-forbid egress.
"telemetry": {
# Local plane: event log + SQLite index in state.db. The user's own data;
# never leaves the machine unless they export it or opt into aggregate.
"local": True,
# Hard gate for the aggregate plane. When False, the aggregate plane is off
# regardless of consent_state. Intended to be pinned False by an administrator
# via managed scope on locked-down or air-gapped deployments. Default True
# (consent_state still governs the opt-in).
"allow_aggregate": True,
# Aggregate-plane consent — the single source of truth for the opt-in:
# "unknown" (no choice made — never uploads), "local" (declined),
# "aggregate" (opted in). Set with `hermes config set telemetry.consent_state`
# or a managed-scope pin. Non-interactive installs sit at "unknown".
"consent_state": "unknown",
# Stable install identifier (aggregate plane only). Empty string means "mint a
# fresh UUID on first use"; clear it to rotate. Never sent on the local plane.
"install_id": "",
# Local event-log retention before rotation (days). Local plane only.
"retention_days": 90,
# Keep secret redaction on even at full local capture (a SIEM full of live
# credentials is a bigger attack target). Admin may override via managed scope.
"redact_secrets": True,
# Content redaction for exports / support bundles: "none" | "pii".
"content_redaction": "none",
# Trajectories plane: full message content / reasoning / raw tool args.
# Off by default. When enabled, full content becomes exportable to the
# configured destination — always secret-redacted, and PII-redacted per
# content_redaction. This is the consent gate for content export and is
# admin-pinnable via managed scope. It does not enable any upload.
"trajectories": {
"enabled": False,
},
# Exporters. The OTLP exporter sends to a configured Collector endpoint;
# endpoint headers reference environment variable names rather than inline
# secrets.
"export": {
"otlp": {
"enabled": False,
"endpoint": None,
"headers_env": {}, # {"Authorization": "MY_OTLP_TOKEN_ENVVAR"}
},
},
},
# Text-to-speech configuration
# Each provider supports an optional `max_text_length:` override for the
# per-request input-character cap. Omit it to use the provider's documented

View File

@@ -295,6 +295,7 @@ from hermes_cli.subcommands.memory import build_memory_parser
from hermes_cli.subcommands.acp import build_acp_parser
from hermes_cli.subcommands.tools import build_tools_parser
from hermes_cli.subcommands.insights import build_insights_parser
from hermes_cli.subcommands.telemetry import build_telemetry_parser
from hermes_cli.subcommands.skills import build_skills_parser
from hermes_cli.subcommands.pairing import build_pairing_parser
from hermes_cli.subcommands.plugins import build_plugins_parser
@@ -11531,7 +11532,7 @@ _BUILTIN_SUBCOMMANDS = frozenset(
"model", "pairing", "plugins", "portal", "postinstall", "profile", "proxy",
"prompt-size",
"send", "sessions", "setup",
"skills", "slack", "status", "tools", "uninstall", "update",
"skills", "slack", "status", "telemetry", "tools", "uninstall", "update",
"version", "webhook", "whatsapp", "whatsapp-cloud", "chat", "secrets", "security",
# Help-ish invocations — plugin commands not being listed in
# top-level --help is an acceptable trade-off for skipping an
@@ -11963,6 +11964,186 @@ def cmd_insights(args):
print(f"Error generating insights: {e}")
def cmd_telemetry(args):
"""Local-only telemetry control + inspection. No uploader exists yet."""
import json as _json
import time
from hermes_cli.config import load_config, save_config
from agent.telemetry import policy, rollup, metrics
action = getattr(args, "telemetry_action", None) or "status"
config = load_config()
decision = policy.resolve(config)
def _persist_install_id():
# Make sure a minted id is written back so it stays stable.
config.setdefault("telemetry", {})["install_id"] = decision.install_id
save_config(config)
if action == "status":
print("Telemetry status")
print("" * 56)
print(f" Local plane: {'on' if decision.local_enabled else 'off'} "
f"(telemetry.local)")
print(f" Aggregate plane: {'on' if decision.aggregate_enabled else 'off'} "
f"(opt-in; consent_state={decision.consent_state})")
if decision.consent_state != policy.CONSENT_AGGREGATE and decision.allow_aggregate:
print(" opt in: hermes config set telemetry.consent_state aggregate")
if not decision.allow_aggregate:
print(" ⚠ allow_aggregate is false (egress hard-disabled)")
print(f" Install id: {decision.install_id}")
print(" Upload: DISABLED — no server yet. Aggregate is computed "
"locally only.")
print()
# Export posture — where YOUR data can flow (never Nous). Values only;
# lock-state is managed scope's concern, surfaced by its own tooling.
try:
from agent.telemetry import otlp_exporter, redaction
otlp = otlp_exporter._otlp_config(config)
print(" Export")
if otlp.get("enabled") and otlp.get("endpoint"):
_host = str(otlp.get("endpoint"))
print(f" OTLP: enabled → {_host}")
# Show the header name + env var + whether it's set, never the value.
hdrs = otlp.get("headers_env") or {}
if hdrs:
import os as _os
for _h, _env in hdrs.items():
_state = "set" if _os.environ.get(str(_env)) else "NOT set"
print(f" auth: header '{_h}' ← ${_env} ({_state})")
_sdk = "installed" if otlp_exporter.is_available() else "MISSING — pip install 'hermes-agent[otlp]'"
print(f" SDK: {_sdk}")
else:
print(" OTLP: disabled (telemetry.export.otlp.enabled)")
# Content gate (trajectories plane) + redaction posture.
if redaction.content_export_enabled(config):
print(f" Content export: on (trajectories plane) — message "
f"content exportable")
else:
print(" Content export: off (trajectories plane) — structural "
"telemetry only")
print(" Secret redaction: on (always)")
print(f" PII redaction: {redaction.content_mode_for(config)}")
print(" Bulk export: hermes telemetry export --out FILE [--otlp]")
except Exception:
pass
print()
# Local data volume
try:
ov = metrics.overview()
runs = ov["workflows"]["total_runs"]
mcs = sum(ov["model_calls"]["by_provider"].values())
tcs = ov["tool_calls"]["total"]
print(f" Local data: {runs:,} workflows · {mcs:,} model calls · {tcs:,} tool calls")
except Exception:
print(" Local data: (none yet)")
print("\n Inspect what would be shared: hermes telemetry preview")
return
if action == "preview":
_persist_install_id()
since_ns = int((time.time() - args.days * 86400) * 1e9)
events = rollup.build_aggregate_events(
install_id=decision.install_id, since_ns=since_ns
)
summary = rollup.summarize(events)
print("Telemetry preview — computed locally, NOT uploaded")
print("" * 56)
print(f" Window: last {args.days} days Events: {summary['total']}")
for name, n in sorted(summary["by_event_name"].items()):
print(f" {name}: {n}")
print(" Shows the actual models, tools, and counts from local telemetry.")
print()
shown = events[: args.limit]
if getattr(args, "json", False):
print(_json.dumps(shown, indent=2))
else:
for e in shown:
if e.get("event_name") == "heartbeat":
print(f" • heartbeat hermes={e.get('hermes_version')} os={e.get('os_family')}")
continue
bits = [f"{e.get('event_name')}"]
for k in ("entrypoint", "platform", "end_reason", "duration_ms",
"estimated_cost_usd"):
if e.get(k) is not None:
bits.append(f"{k}={e[k]}")
models = e.get("models_used") or []
if models:
bits.append("models=" + ",".join(
f"{m.get('model') or m.get('provider') or '?'}" for m in models))
if e.get("tools_used"):
bits.append("tools=" + ",".join(e["tools_used"]))
print("" + " ".join(bits))
if len(events) > len(shown):
print(f" ... and {len(events) - len(shown)} more (use --limit)")
return
if action == "export":
import sys as _sys
from agent.telemetry import exporter_bulk, redaction
since_ns = int((time.time() - args.since * 86400) * 1e9) if getattr(args, "since", 0) else None
want_content = getattr(args, "include_content", False)
# OTLP path: stream spans to the configured Collector endpoint.
if getattr(args, "otlp", False):
from agent.telemetry import otlp_exporter
if not otlp_exporter.is_enabled(config):
print("telemetry.export.otlp is not enabled/endpoint not set. "
"Set telemetry.export.otlp.enabled + .endpoint.", file=_sys.stderr)
return
# The OTel SDK is an optional dep; export_once() lazily installs it on
# first use (gated by security.allow_lazy_installs, TTY-prompted),
# same as every other optional backend. OTLPUnavailable is the
# fallback when it can't be installed.
try:
n = otlp_exporter.export_once(config, since_ns=since_ns)
except otlp_exporter.OTLPUnavailable as e:
print(str(e), file=_sys.stderr)
return
except Exception as e:
print(f"OTLP export failed: {e}", file=_sys.stderr)
return
print(f"Exported {n} spans to the OTLP endpoint "
f"({(otlp_exporter._otlp_config(config) or {}).get('endpoint')}).",
file=_sys.stderr)
return
content_ok = redaction.content_export_enabled(config)
if want_content and not content_ok:
print("⚠ --include-content ignored: telemetry.trajectories.enabled is false.")
print(" Enable the trajectories plane (admin/config) to export message content.")
print(" Exporting structural telemetry only.")
if not getattr(args, "out", None):
print("--out is required (or use --otlp).", file=_sys.stderr)
return
out_path = args.out
if out_path == "-":
counts = exporter_bulk.export(
_sys.stdout, fmt=args.fmt, since_ns=since_ns,
include_content=want_content, config=config,
)
else:
with open(out_path, "w", encoding="utf-8") as fh:
counts = exporter_bulk.export(
fh, fmt=args.fmt, since_ns=since_ns,
include_content=want_content, config=config,
)
# status to stderr so stdout stays clean for `--out -`
msg = (f"Exported {counts['telemetry']} telemetry records"
+ (f" + {counts['sessions']} sessions" if counts["sessions"] else "")
+ (" (content INCLUDED, redacted)" if counts["content_included"]
else " (structural only)")
+ (f" -> {out_path}" if out_path != "-" else ""))
print(msg, file=_sys.stderr)
return
print(f"Unknown telemetry action: {action}")
print("Use: status | preview | export")
def cmd_skills(args):
# Route 'config' action to skills_config module
if getattr(args, "skills_action", None) == "config":
@@ -12979,6 +13160,9 @@ def main():
# =========================================================================
build_insights_parser(subparsers, cmd_insights=cmd_insights)
# telemetry command (parser in hermes_cli/subcommands/telemetry.py)
build_telemetry_parser(subparsers, cmd_telemetry=cmd_telemetry)
# =========================================================================
# claw command (parser built in hermes_cli/subcommands/claw.py)
# =========================================================================

View File

@@ -204,6 +204,23 @@ def _env_enabled(name: str) -> bool:
return env_var_enabled(name)
def _telemetry_local_enabled() -> bool:
"""True when the local telemetry plane is enabled (telemetry.local, default true).
Gates auto-loading of the bundled ``telemetry`` plugin. Reads config defensively
so a malformed/missing telemetry section defaults to on (the design default).
"""
try:
from hermes_cli.config import load_config
config = load_config()
tel = cfg_get(config, "telemetry", default={})
if not isinstance(tel, dict):
return True
return bool(tel.get("local", True))
except Exception:
return True
def _get_disabled_plugins() -> set:
"""Read the disabled plugins list from config.yaml.
@@ -1326,6 +1343,24 @@ class PluginManager:
self._load_plugin(manifest)
continue
# The bundled telemetry plugin auto-loads when the local plane is
# enabled (telemetry.local, default true). It's observational
# (lifecycle hooks -> local event log), writes nothing to the
# network, and powers /usage and /insights, so it should be on out
# of the box without an opt-in. A user who sets telemetry.local:
# false opts out and it is skipped like any disabled plugin.
if (
manifest.source == "bundled"
and (manifest.key or manifest.name) == "telemetry"
):
if _telemetry_local_enabled():
self._load_plugin(manifest)
else:
loaded = LoadedPlugin(manifest=manifest, enabled=False)
loaded.error = "disabled (telemetry.local is false)"
self._plugins[lookup_key] = loaded
continue
# Everything else (standalone, user-installed backends,
# entry-point plugins) is opt-in via plugins.enabled.
# Accept both the path-derived key and the legacy bare name

View File

@@ -0,0 +1,54 @@
"""``hermes telemetry`` subcommand parser.
Telemetry control and inspection. ``preview`` shows the per-run summary events that
would be produced for the aggregate plane; there is no uploader, so it terminates as a
local view.
The handler is injected to avoid importing ``main`` (mirrors the insights subcommand).
"""
from __future__ import annotations
from typing import Callable
def build_telemetry_parser(subparsers, *, cmd_telemetry: Callable) -> None:
"""Attach the ``telemetry`` subcommand (with actions) to ``subparsers``."""
p = subparsers.add_parser(
"telemetry",
help="Inspect local telemetry and export it",
description=(
"Local-first telemetry. The local plane records observability on this "
"machine. The aggregate plane is opt-in (set telemetry.consent_state via "
"`hermes config set`); it has no uploader and is shown only via `preview`."
),
)
sub = p.add_subparsers(dest="telemetry_action")
sub.add_parser("status", help="Show telemetry planes, consent state, and local data volume")
prev = sub.add_parser(
"preview",
help="Show the aggregate events that would be produced (computed locally, not uploaded)",
)
prev.add_argument("--days", type=int, default=30, help="Window to roll up (default: 30)")
prev.add_argument("--limit", type=int, default=10, help="Max events to print (default: 10)")
prev.add_argument("--json", action="store_true", help="Print raw JSON events")
exp = sub.add_parser(
"export",
help="Export local telemetry (and optional content) to a file, stream, or OTLP endpoint",
)
exp.add_argument("--out", help="Output file path (use - for stdout). Not needed with --otlp.")
exp.add_argument("--format", dest="fmt", choices=["ndjson", "json"], default="ndjson",
help="Output format (default: ndjson)")
exp.add_argument("--since", type=int, default=0,
help="Only telemetry from the last N days (0 = all)")
exp.add_argument("--include-content", action="store_true",
help="Include session/message content (requires telemetry.trajectories.enabled). "
"Secrets always redacted; PII per telemetry.content_redaction.")
exp.add_argument("--otlp", action="store_true",
help="Export to the configured OTLP endpoint (telemetry.export.otlp.*) "
"instead of a file. Requires the optional 'otlp' extra.")
p.set_defaults(func=cmd_telemetry)

View File

@@ -116,7 +116,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 16
SCHEMA_VERSION = 17 # v17: tel_* telemetry tables (local-plane observability, raw values)
# ---------------------------------------------------------------------------
# WAL-compatibility fallback
@@ -597,6 +597,114 @@ CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
CREATE INDEX IF NOT EXISTS idx_compression_locks_expires ON compression_locks(expires_at);
-- ── Telemetry (local plane: local observability) ───────────────────────────
-- Event/span layer co-located in state.db so runs JOIN to sessions/messages.
-- The append-only JSONL log at ~/.hermes/telemetry/events.jsonl is the source
-- of truth; these tables are a rebuildable index. They hold the user's own data
-- (real model ids, provider/tool names) and never leave the machine unless the
-- user exports them.
CREATE TABLE IF NOT EXISTS tel_runs (
run_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
session_id TEXT,
profile_id TEXT,
entrypoint TEXT NOT NULL,
platform TEXT,
start_ns INTEGER NOT NULL,
end_ns INTEGER,
end_reason TEXT,
model_call_count INTEGER DEFAULT 0,
tool_call_count INTEGER DEFAULT 0,
error_count INTEGER DEFAULT 0,
estimated_cost_usd REAL,
cost_status TEXT,
schema_v INTEGER NOT NULL DEFAULT 1
);
CREATE TABLE IF NOT EXISTS tel_spans (
span_id TEXT PRIMARY KEY,
trace_id TEXT NOT NULL,
run_id TEXT NOT NULL,
parent_span_id TEXT,
name TEXT NOT NULL,
kind TEXT,
start_ns INTEGER NOT NULL,
end_ns INTEGER,
status TEXT,
attrs_json TEXT
);
CREATE TABLE IF NOT EXISTS tel_model_calls (
span_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
provider TEXT, -- raw provider, e.g. "anthropic"
model TEXT, -- raw model id, e.g. "claude-opus-4"
base_url TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
latency_ms INTEGER,
ttft_ms INTEGER,
estimated_cost_usd REAL,
cost_status TEXT,
cost_source TEXT,
end_reason TEXT,
retry_count INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tel_tool_calls (
span_id TEXT PRIMARY KEY,
run_id TEXT NOT NULL,
tool_name TEXT, -- raw tool name (e.g. "web_search")
backend TEXT,
duration_ms INTEGER,
result_class TEXT,
retry_count INTEGER DEFAULT 0,
approval TEXT
);
CREATE TABLE IF NOT EXISTS tel_gateway_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
platform TEXT, direction TEXT, result TEXT,
voice INTEGER DEFAULT 0, attachments INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS tel_cron_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
kind TEXT, result TEXT
);
CREATE TABLE IF NOT EXISTS tel_skill_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
action TEXT, skill_name TEXT -- skill_name the local plane only
);
CREATE TABLE IF NOT EXISTS tel_memory_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
action TEXT, result TEXT
);
CREATE TABLE IF NOT EXISTS tel_feedback_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
kind TEXT
);
CREATE TABLE IF NOT EXISTS tel_error_events (
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
error_class TEXT, subsystem TEXT, recovery TEXT -- enums only; NO raw message/stack
);
CREATE INDEX IF NOT EXISTS ix_tel_runs_trace ON tel_runs(trace_id);
CREATE INDEX IF NOT EXISTS ix_tel_runs_session ON tel_runs(session_id);
CREATE INDEX IF NOT EXISTS ix_tel_runs_start ON tel_runs(start_ns);
CREATE INDEX IF NOT EXISTS ix_tel_spans_run ON tel_spans(run_id);
CREATE INDEX IF NOT EXISTS ix_tel_spans_trace ON tel_spans(trace_id);
CREATE INDEX IF NOT EXISTS ix_tel_model_run ON tel_model_calls(run_id);
CREATE INDEX IF NOT EXISTS ix_tel_tool_run ON tel_tool_calls(run_id);
"""
# Indexes that reference columns added in later schema versions must be

View File

@@ -0,0 +1,319 @@
"""Telemetry plugin — wires Hermes lifecycle hooks to the local-plane emitter.
This is the *only* instrumentation seam. It registers observational hooks (which core
already invokes fail-open) and translates each into a typed local-plane telemetry
event handed to ``agent.telemetry.emitter``. There are zero edits to core call sites:
the hooks already carry model/provider/usage/duration/tool data.
Everything here is best-effort and fail-open — a raised exception in a hook callback is
swallowed by core, and we additionally guard each callback so a telemetry bug can never
disturb a session. No content, no network: local plane only.
Hooks consumed:
on_session_start -> begin a run context (trace_id/run_id), buffer a run row
post_api_request -> one model_call event (tokens, latency, raw provider/model)
api_request_error -> one error event
post_tool_call -> one tool_call event (raw tool name, duration, result class)
on_session_finalize -> finalize the run row (end_reason, counts, cost)
subagent_start/stop -> (reserved) lineage markers
"""
from __future__ import annotations
import logging
import threading
import time
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Per-run accumulators keyed by run_id, so on_session_finalize can roll up counts.
_runs: Dict[str, Dict[str, Any]] = {}
_runs_lock = threading.Lock()
def _safe(fn):
"""Decorator: never let a telemetry hook raise into core."""
def wrapper(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception:
logger.debug("telemetry hook %s failed", getattr(fn, "__name__", "?"), exc_info=True)
return None
wrapper.__name__ = getattr(fn, "__name__", "wrapper")
return wrapper
def _run_key(session_id: Optional[str], task_id: Optional[str]) -> str:
return session_id or task_id or "default"
# ── on_session_start ────────────────────────────────────────────────────────
@_safe
def _on_session_start(**kw: Any) -> None:
from agent.telemetry import spans
session_id = kw.get("session_id") or ""
platform = kw.get("platform") or kw.get("source") or ""
ctx = spans.start_run()
key = _run_key(session_id, kw.get("task_id"))
with _runs_lock:
_runs[key] = {
"run_id": ctx.run_id,
"trace_id": ctx.trace_id,
"session_id": session_id or None,
"entrypoint": _entrypoint_for(platform, kw.get("source")),
"platform": platform or None,
"start_ns": time.time_ns(),
"model_call_count": 0,
"tool_call_count": 0,
"error_count": 0,
}
def _ensure_run(session_id: Optional[str], task_id: Optional[str], platform: str = "") -> Dict[str, Any]:
"""Return the run accumulator, lazily creating one if session_start was missed."""
from agent.telemetry import spans
key = _run_key(session_id, task_id)
with _runs_lock:
run = _runs.get(key)
if run is None:
rid = spans.current_run_id() or spans.new_id()
tid = spans.current_trace_id() or spans.new_id()
run = {
"run_id": rid,
"trace_id": tid,
"session_id": session_id or None,
"entrypoint": _entrypoint_for(platform),
"platform": platform or None,
"start_ns": time.time_ns(),
"model_call_count": 0,
"tool_call_count": 0,
"error_count": 0,
}
_runs[key] = run
return run
def _entrypoint_for(platform: Optional[str], source: Optional[str] = None) -> str:
"""Coarse entrypoint label (cli / gateway / tui / api / cron …).
This is a workflow *surface* label, not model/tool anonymization — it answers
"where did this run come from", which is genuinely categorical.
"""
s = (source or platform or "").lower()
if s in ("", "chat", "interactive", "cli", "desktop"):
return "cli"
if s in ("telegram", "discord", "slack", "whatsapp", "signal", "matrix",
"email", "sms", "teams", "feishu", "wecom", "line", "google_chat"):
return "gateway"
if s == "tui":
return "tui"
if s in ("api", "api_server", "openai_api"):
return "api"
if s == "cron":
return "cron"
if s == "batch":
return "batch"
if s == "acp":
return "acp"
return "cli"
# ── post_api_request -> model_call event ────────────────────────────────────
@_safe
def _on_post_api_request(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import ModelCallEvent
session_id = kw.get("session_id") or ""
platform = kw.get("platform") or ""
run = _ensure_run(session_id, kw.get("task_id"), platform)
usage = kw.get("usage") or {}
duration = kw.get("api_duration")
latency_ms = int(duration * 1000) if isinstance(duration, (int, float)) else None
evt = ModelCallEvent(
span_id=spans.new_span_id(),
run_id=run["run_id"],
provider=kw.get("provider"), # raw
model=kw.get("model"), # raw
base_url=kw.get("base_url"),
input_tokens=int(usage.get("input_tokens") or 0),
output_tokens=int(usage.get("output_tokens") or 0),
cache_read_tokens=int(usage.get("cache_read_tokens") or 0),
cache_write_tokens=int(usage.get("cache_write_tokens") or 0),
reasoning_tokens=int(usage.get("reasoning_tokens") or 0),
latency_ms=latency_ms,
end_reason="completed",
)
with _runs_lock:
run["model_call_count"] += 1
emitter.emit(evt)
# ── api_request_error -> error event ────────────────────────────────────────
@_safe
def _on_api_request_error(**kw: Any) -> None:
from agent.telemetry import emitter
from agent.telemetry.events import ErrorEvent
session_id = kw.get("session_id") or ""
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
error_class = _coarse_error_class(kw.get("error_type") or kw.get("error") or "")
with _runs_lock:
run["error_count"] += 1
emitter.emit(ErrorEvent(
run_id=run["run_id"],
error_class=error_class,
subsystem="model_api",
recovery=None,
))
def _coarse_error_class(raw: Any) -> str:
s = str(raw).lower()
if "timeout" in s:
return "provider_timeout"
if "rate" in s and "limit" in s:
return "rate_limit"
if any(k in s for k in ("auth", "401", "403", "unauthorized", "forbidden")):
return "auth"
if any(k in s for k in ("connection", "network", "dns", "socket")):
return "network"
if "context" in s and ("length" in s or "overflow" in s or "token" in s):
return "context_overflow"
if any(k in s for k in ("500", "502", "503", "server error", "provider")):
return "provider_error"
return "unknown"
# ── post_tool_call -> tool_call event ───────────────────────────────────────
@_safe
def _on_post_tool_call(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import ToolCallEvent
session_id = kw.get("session_id") or ""
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
function_name = kw.get("function_name") or kw.get("tool_name")
duration_ms = kw.get("duration_ms")
result = kw.get("result")
result_class = _tool_result_class(result)
with _runs_lock:
run["tool_call_count"] += 1
if result_class == "error":
run["error_count"] += 1
emitter.emit(ToolCallEvent(
span_id=spans.new_span_id(),
run_id=run["run_id"],
tool_name=function_name, # raw tool name
duration_ms=int(duration_ms) if isinstance(duration_ms, (int, float)) else None,
result_class=result_class,
))
def _tool_result_class(result: Any) -> str:
"""Classify a tool result without retaining content — error vs ok vs blocked."""
try:
import json
if isinstance(result, str):
r = result.strip()
if r.startswith("{"):
obj = json.loads(r)
if isinstance(obj, dict):
if obj.get("error") or obj.get("blocked"):
return "blocked" if obj.get("blocked") else "error"
if obj.get("timeout"):
return "timeout"
return "ok"
if isinstance(result, dict):
if result.get("error"):
return "error"
return "ok"
except Exception:
return "ok"
return "ok"
# ── on_session_finalize -> finalize the run row ─────────────────────────────
@_safe
def _on_session_finalize(**kw: Any) -> None:
from agent.telemetry import emitter, spans
from agent.telemetry.events import RunEvent
session_id = kw.get("session_id") or ""
key = _run_key(session_id, kw.get("task_id"))
with _runs_lock:
run = _runs.pop(key, None)
if run is None:
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
with _runs_lock:
_runs.pop(key, None)
end_reason = _coarse_end_reason(kw)
emitter.emit(RunEvent(
run_id=run["run_id"],
trace_id=run["trace_id"],
entrypoint=run.get("entrypoint", "cli"),
session_id=run.get("session_id"),
platform=run.get("platform"),
start_ns=run.get("start_ns", time.time_ns()),
end_ns=time.time_ns(),
end_reason=end_reason,
model_call_count=run.get("model_call_count", 0),
tool_call_count=run.get("tool_call_count", 0),
error_count=run.get("error_count", 0),
estimated_cost_usd=_as_float(kw.get("estimated_cost_usd")),
cost_status=kw.get("cost_status"),
))
spans.clear_run()
def _coarse_end_reason(kw: Dict[str, Any]) -> str:
if kw.get("interrupted"):
return "interrupted"
if kw.get("failed"):
return "failed"
reason = str(kw.get("turn_exit_reason") or "").lower()
if "max_iteration" in reason:
return "max_iterations"
if "timeout" in reason:
return "timeout"
return "completed"
def _as_float(v: Any) -> Optional[float]:
try:
return float(v) if v is not None else None
except (TypeError, ValueError):
return None
# ── subagent lineage (reserved) ─────────────────────────────────────────────
@_safe
def _on_subagent_start(**kw: Any) -> None:
# Subagents inherit the run context via contextvars; no explicit handling needed.
return None
@_safe
def _on_subagent_stop(**kw: Any) -> None:
return None
# ── registration ────────────────────────────────────────────────────────────
def register(ctx) -> None:
ctx.register_hook("on_session_start", _on_session_start)
ctx.register_hook("post_api_request", _on_post_api_request)
ctx.register_hook("api_request_error", _on_api_request_error)
ctx.register_hook("post_tool_call", _on_post_tool_call)
ctx.register_hook("on_session_finalize", _on_session_finalize)
ctx.register_hook("subagent_start", _on_subagent_start)
ctx.register_hook("subagent_stop", _on_subagent_stop)
logger.debug("telemetry plugin registered 7 hooks")

View File

@@ -0,0 +1,12 @@
name: telemetry
version: 1.0.0
description: "Local-first telemetry & observability. Records runs, model calls, tool calls, and errors to a local event log + state.db index via plugin hooks — no agent action, no content, no network. Powers /usage, /insights, and local dashboards."
author: NousResearch
hooks:
- post_api_request
- api_request_error
- post_tool_call
- on_session_start
- on_session_finalize
- subagent_start
- subagent_stop

View File

@@ -154,6 +154,10 @@ edge-tts = ["edge-tts==7.2.7"]
modal = ["modal==1.3.4"]
daytona = ["daytona==0.155.0"]
hindsight = ["hindsight-client==0.6.1"]
# OTLP telemetry export (optional). Provides the OpenTelemetry SDK + OTLP/HTTP
# exporter so `hermes telemetry export --otlp` can send spans to a Collector.
# Lazy-imported by agent/telemetry/otlp_exporter.py; never a core dependency.
otlp = ["opentelemetry-sdk==1.30.0", "opentelemetry-exporter-otlp-proto-http==1.30.0"]
dev = ["debugpy==1.8.20", "pytest==9.0.2", "pytest-asyncio==1.3.0", "mcp==1.26.0", "starlette==1.0.1", "ty==0.0.21", "ruff==0.15.10", "setuptools==81.0.0"] # starlette: CVE-2026-48710; setuptools: latest <82 (torch >=2.11 caps setuptools<82)
messaging = ["python-telegram-bot[webhooks]==22.6", "discord.py[voice]==2.7.1", "aiohttp==3.13.4", "brotlicffi==1.2.0.1", "slack-bolt==1.27.0", "slack-sdk==3.40.1", "qrcode==7.4.2"] # aiohttp: CVE-2026-34513/34518/34519/34520/34525
cron = [] # croniter is now a core dependency; this extra kept for back-compat

View File

@@ -760,7 +760,11 @@ class TestPluginHooks:
mgr.discover_and_load()
assert mgr.has_hook("pre_api_request") is True
assert mgr.has_hook("post_api_request") is False
# Negative sentinel: a hook no plugin (user or bundled) registers, proving
# that registering pre_api_request doesn't conjure an unrelated hook.
# (post_api_request is now registered by the bundled telemetry plugin, so
# it is no longer a valid "nothing registered this" sentinel.)
assert mgr.has_hook("transform_terminal_output") is False
results = mgr.invoke_hook(
"pre_api_request",
session_id="s1",

View File

View File

@@ -0,0 +1,94 @@
"""`hermes telemetry` handler smoke tests (local-only; no upload)."""
from __future__ import annotations
import sqlite3
import time
import types
import pytest
import hermes_state
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# state.db with tel_* + seeded data
db = tmp_path / "state.db"
hermes_state.SessionDB(db_path=db)
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import RunEvent, ModelCallEvent, ToolCallEvent
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=now - 60_000_000, end_ns=now, model_call_count=1,
tool_call_count=1, estimated_cost_usd=0.3))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4",
input_tokens=20000, output_tokens=2000))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search",
result_class="ok"))
em.flush()
em.close()
yield tmp_path
def _run(action, **kw):
from hermes_cli.main import cmd_telemetry
args = types.SimpleNamespace(telemetry_action=action, days=30, limit=10, json=False)
for k, v in kw.items():
setattr(args, k, v)
cmd_telemetry(args)
def test_status_runs(home, capsys):
_run("status")
out = capsys.readouterr().out
assert "Telemetry status" in out
assert "Upload:" in out and "DISABLED" in out
assert "Local data:" in out
def test_preview_shows_real_values(home, capsys):
_run("preview")
out = capsys.readouterr().out
assert "NOT uploaded" in out
assert "workflow_completed" in out
# real model + tool names ARE shown (this is the user's own local data)
assert "claude-opus-4" in out
assert "web_search" in out
def test_status_reflects_consent_set_via_config(home, capsys):
# Opting in is a plain config write now (no `enable` verb). status should
# reflect consent_state=aggregate as the aggregate plane being on.
from hermes_cli.config import load_config, save_config
cfg = load_config()
cfg.setdefault("telemetry", {})["consent_state"] = "aggregate"
save_config(cfg)
_run("status")
out = capsys.readouterr().out
assert "consent_state=aggregate" in out
assert "Aggregate plane: on" in out
def test_status_shows_optin_hint_when_unknown(home, capsys):
_run("status")
out = capsys.readouterr().out
assert "Aggregate plane: off" in out
assert "config set telemetry.consent_state aggregate" in out
def test_allow_aggregate_false_keeps_plane_off_in_status(home, capsys):
# Even with consent opted in, a managed allow_aggregate:false wins.
from hermes_cli.config import load_config, save_config
cfg = load_config()
tel = cfg.setdefault("telemetry", {})
tel["consent_state"] = "aggregate"
tel["allow_aggregate"] = False
save_config(cfg)
_run("status")
out = capsys.readouterr().out
assert "Aggregate plane: off" in out
assert "allow_aggregate is false" in out

View File

@@ -0,0 +1,107 @@
"""Emitter tests — the hot-path invariant is the one that matters most.
Invariant: emit() never blocks, never raises, and a broken writer cannot slow or
break the caller. Plus: JSONL + SQLite round-trip, and the SQLite index is rebuildable
from the JSONL source of truth.
"""
from __future__ import annotations
import sqlite3
import time
import hermes_state
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _fresh_db(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
return db
def test_emit_is_fast_even_when_writer_is_broken(tmp_path, monkeypatch):
"""The core guarantee: a writer that raises AND sleeps cannot stall emit()."""
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
# Sabotage the row indexer to raise after a long sleep.
def broken(_conn, _ev):
time.sleep(5.0)
raise RuntimeError("writer exploded")
monkeypatch.setattr(em, "_index_one", broken)
start = time.monotonic()
for i in range(50):
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=10))
elapsed = time.monotonic() - start
# 50 emits must complete in well under the writer's single 5s sleep.
assert elapsed < 1.0, f"emit() blocked: {elapsed:.2f}s"
em.close()
def test_emit_never_raises_on_bad_event(tmp_path):
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
# Non-serializable / wrong-shaped inputs must not raise out of emit().
em.emit(object()) # no to_dict, not a mapping
em.emit({"event": "run"}) # minimal dict
em.close()
def test_jsonl_and_sqlite_roundtrip(tmp_path):
db = _fresh_db(tmp_path)
jsonl = tmp_path / "telemetry" / "events.jsonl"
em = TelemetryEmitter(events_path=jsonl, db_path=db)
em.emit(RunEvent(run_id="run1", trace_id="t1", entrypoint="cli", end_reason="completed"))
em.emit(ModelCallEvent(span_id="m1", run_id="run1", provider="anthropic",
model="claude-opus-4", input_tokens=100, output_tokens=20))
em.emit(ToolCallEvent(span_id="tc1", run_id="run1", tool_name="web_search",
duration_ms=120, result_class="ok"))
em.flush()
em.close()
# JSONL has all three lines
lines = [l for l in jsonl.read_text(encoding="utf-8").splitlines() if l.strip()]
assert len(lines) == 3
# SQLite index has the rows in the right tables
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 1
assert conn.execute("SELECT COUNT(*) FROM tel_tool_calls").fetchone()[0] == 1
row = conn.execute("SELECT provider, model, input_tokens FROM tel_model_calls").fetchone()
assert row == ("anthropic", "claude-opus-4", 100)
conn.close()
def test_unknown_event_kind_is_ignored_not_fatal(tmp_path):
db = _fresh_db(tmp_path)
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
em.emit({"event": "totally_unknown", "foo": "bar"})
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli"))
em.flush()
em.close()
conn = sqlite3.connect(db)
# The unknown event is in JSONL but skipped by the indexer; the known one indexes.
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
conn.close()
def test_disabled_emitter_writes_nothing(tmp_path):
db = _fresh_db(tmp_path)
jsonl = tmp_path / "telemetry" / "events.jsonl"
em = TelemetryEmitter(events_path=jsonl, db_path=db, enabled=False)
em.emit(RunEvent(run_id="r3", trace_id="t3", entrypoint="cli"))
em.flush()
em.close()
assert not jsonl.exists()
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 0
conn.close()

View File

@@ -0,0 +1,113 @@
"""Export redaction pipeline tests — the security-critical layer.
Invariants:
* Secrets ALWAYS stripped, every mode, no flag disables it.
* Content gated by the trajectories plane, not a redaction mode.
* PII stripped in 'pii' mode; structure preserved (codec-aware).
"""
from __future__ import annotations
import json
from agent.telemetry import redaction as R
# ── secrets are always redacted ─────────────────────────────────────────────
def test_secrets_stripped_in_none_mode():
text = "here is sk-ant-api03-SECRETKEY123 and a token"
out = R.redact_for_export(text, content_mode=R.CONTENT_NONE)
assert "SECRETKEY123" not in out
def test_secrets_stripped_in_pii_mode():
text = "Authorization: Bearer abcdef123456789secret"
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
assert "abcdef123456789secret" not in out
def test_secret_redactor_fails_closed(monkeypatch):
# If the underlying redactor raises, we must NOT return the raw string.
import agent.redact as ar
monkeypatch.setattr(ar, "redact_sensitive_text", lambda *a, **k: (_ for _ in ()).throw(RuntimeError()))
out = R.redact_for_export("sk-secret-value", content_mode=R.CONTENT_NONE)
assert "sk-secret-value" not in out
assert out == "[redaction-unavailable]"
# ── PII ─────────────────────────────────────────────────────────────────────
def test_pii_mode_strips_email_and_phone():
text = "contact alice@example.com or +1 415 555 1234"
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
assert "alice@example.com" not in out
assert "[email]" in out
assert "555" not in out or "[phone]" in out
def test_none_mode_keeps_nonsecret_text_but_drops_via_message_path():
# redact_for_export(none) scrubs secrets but doesn't strip ordinary words;
# content *dropping* happens at the message layer (trajectories gate).
out = R.redact_for_export("just ordinary words", content_mode=R.CONTENT_NONE)
assert "ordinary" in out
# ── trajectories gate (content_export_enabled) ──────────────────────────────
def test_content_export_disabled_by_default():
assert R.content_export_enabled({}) is False
assert R.content_export_enabled({"telemetry": {}}) is False
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": False}}}) is False
def test_content_export_enabled_when_trajectories_on():
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": True}}}) is True
# ── codec-aware message redaction ───────────────────────────────────────────
def test_message_structural_only_when_content_excluded():
msg = {"role": "user", "content": "my email is bob@x.com and key sk-12345"}
out = R.redact_message(msg, include_content=False)
assert out["role"] == "user"
assert "content" not in out # body dropped entirely
assert out["content_chars"] == len(msg["content"]) # only the size remains
assert "bob@x.com" not in json.dumps(out)
def test_message_content_included_is_redacted():
msg = {"role": "user", "content": "email bob@x.com secret sk-ant-SECRET999"}
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
assert "content" in out
assert "SECRET999" not in out["content"] # secret gone
assert "bob@x.com" not in out["content"] # pii gone
assert "[email]" in out["content"]
def test_tool_calls_redacted_names_kept_args_scrubbed():
msg = {
"role": "assistant",
"tool_calls": json.dumps([
{"function": {"name": "web_search", "arguments": '{"q": "email me at z@z.com"}'}}
]),
}
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
tc = out["tool_calls"]
assert tc[0]["name"] == "web_search" # structure/name preserved
assert "z@z.com" not in json.dumps(tc) # arg pii scrubbed
def test_tool_calls_counted_when_content_excluded():
msg = {
"role": "assistant",
"tool_calls": json.dumps([
{"function": {"name": "a", "arguments": "{}"}},
{"function": {"name": "b", "arguments": "{}"}},
]),
}
out = R.redact_message(msg, include_content=False)
assert out["tool_call_count"] == 2
assert "tool_calls" not in out
def test_content_mode_for_reads_config():
assert R.content_mode_for({"telemetry": {"content_redaction": "pii"}}) == "pii"
assert R.content_mode_for({"telemetry": {"content_redaction": "bogus"}}) == "none"
assert R.content_mode_for({}) == "none"

View File

@@ -0,0 +1,102 @@
"""Bulk export tests — telemetry always, content only behind the trajectories gate."""
from __future__ import annotations
import io
import json
import sqlite3
import time
import hermes_state
from agent.telemetry import exporter_bulk
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _seed(tmp_path, with_secret_content=True):
db = tmp_path / "state.db"
sdb = hermes_state.SessionDB(db_path=db)
# telemetry
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=now - 60_000_000, end_ns=now, model_call_count=1, tool_call_count=1))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-sonnet-4", input_tokens=1000, output_tokens=100))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
em.flush()
em.close()
# session + message content (with an embedded secret + email)
sdb.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
if with_secret_content:
sdb.append_message("s1", role="user",
content="my key is AKIAIOSFODNN7EXAMPLE and email me at carol@corp.com")
sdb.append_message("s1", role="assistant", content="ok")
sdb.close()
return db
def test_telemetry_exported_content_excluded_by_default(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=False, config={}, db_path=db)
assert counts["telemetry"] >= 3
assert counts["content_included"] == 0
text = buf.getvalue()
# message bodies must NOT be present
assert "carol@corp.com" not in text
assert "AKIAIOSFODNN7EXAMPLE" not in text
# but a session record (structural) is present with message structure
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
sess = [r for r in lines if r.get("_kind") == "session"]
assert sess and sess[0]["messages"]
assert "content" not in sess[0]["messages"][0] # structural only
assert "content_chars" in sess[0]["messages"][0]
def test_include_content_ignored_without_trajectories(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
# request content but trajectories disabled -> forced off
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True,
config={"telemetry": {"trajectories": {"enabled": False}}}, db_path=db)
assert counts["content_included"] == 0
assert "carol@corp.com" not in buf.getvalue()
def test_content_included_when_trajectories_on_but_redacted(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
cfg = {"telemetry": {"trajectories": {"enabled": True}, "content_redaction": "pii"}}
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True, config=cfg, db_path=db)
assert counts["content_included"] == 1
text = buf.getvalue()
# content present but secret + pii scrubbed
assert "AKIAIOSFODNN7EXAMPLE" not in text # secret always gone
assert "carol@corp.com" not in text # pii gone in pii mode
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
sess = [r for r in lines if r.get("_kind") == "session"][0]
# the user message now has a (redacted) content field
user_msg = [m for m in sess["messages"] if m["role"] == "user"][0]
assert "content" in user_msg
assert "[email]" in user_msg["content"]
def test_json_format_roundtrips(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
exporter_bulk.export(buf, fmt="json", include_content=False, config={}, db_path=db)
obj = json.loads(buf.getvalue())
assert "records" in obj
assert any(r["_kind"] == "tel_runs" for r in obj["records"])
def test_since_window_filters_runs(tmp_path):
db = _seed(tmp_path)
buf = io.StringIO()
# since 1ns ago in the future-ish -> the run (start ~60ms ago) excluded
future_ns = int((time.time() + 1) * 1e9)
counts = exporter_bulk.export(buf, fmt="ndjson", since_ns=future_ns, config={}, db_path=db)
lines = [json.loads(l) for l in buf.getvalue().splitlines() if l.strip()]
runs = [r for r in lines if r.get("_kind") == "tel_runs"]
assert runs == []

View File

@@ -0,0 +1,98 @@
"""Export configuration visibility in `hermes telemetry status`.
The status Export block reports the current export configuration. Whether a key is
locked is handled by the managed-scope layer, not repeated here; the allow_aggregate
gate is covered by a test so a managed pin can't be regressed.
"""
from __future__ import annotations
import time
import types
import pytest
import hermes_state
@pytest.fixture
def home(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
hermes_state.SessionDB(db_path=tmp_path / "state.db")
yield tmp_path
def _status(capsys):
from hermes_cli.main import cmd_telemetry
cmd_telemetry(types.SimpleNamespace(telemetry_action="status"))
return capsys.readouterr().out
def test_export_block_default_shows_otlp_disabled(home, capsys):
out = _status(capsys)
assert "Export" in out
assert "OTLP:" in out and "disabled" in out
assert "Content export: off" in out
assert "Secret redaction: on (always)" in out
def test_export_block_shows_endpoint_host_never_token(home, capsys, monkeypatch):
from hermes_cli.config import load_config, save_config
monkeypatch.setenv("CORP_OTLP_TOKEN", "supersecret-do-not-print")
c = load_config()
t = c.setdefault("telemetry", {})
t.setdefault("export", {})["otlp"] = {
"enabled": True,
"endpoint": "https://collector.corp:4318/v1/traces",
"headers_env": {"Authorization": "CORP_OTLP_TOKEN"},
}
save_config(c)
out = _status(capsys)
# endpoint host present
assert "https://collector.corp:4318/v1/traces" in out
# env var name + set-state present; the VALUE never printed
assert "CORP_OTLP_TOKEN" in out
assert "(set)" in out
assert "supersecret-do-not-print" not in out
def test_export_block_reflects_trajectories_gate(home, capsys):
from hermes_cli.config import load_config, save_config
c = load_config()
c.setdefault("telemetry", {})["trajectories"] = {"enabled": True}
save_config(c)
out = _status(capsys)
assert "Content export: on (trajectories plane)" in out
def test_token_env_not_set_shows_not_set(home, capsys):
from hermes_cli.config import load_config, save_config
c = load_config()
t = c.setdefault("telemetry", {})
t.setdefault("export", {})["otlp"] = {
"enabled": True,
"endpoint": "https://x:4318/v1/traces",
"headers_env": {"Authorization": "TOTALLY_UNSET_ENV_VAR_XYZ"},
}
save_config(c)
out = _status(capsys)
assert "(NOT set)" in out
def test_allow_aggregate_pin_blocks_opt_in(home):
"""A managed allow_aggregate:false pin overrides a consent_state opt-in.
Consent is set in config (as a user or managed-scope pin would); the hard gate
still wins, so the aggregate plane resolves off and may_upload stays false.
"""
from hermes_cli.config import load_config, save_config
from agent.telemetry import policy
c = load_config()
tel = c.setdefault("telemetry", {})
tel["consent_state"] = "aggregate"
tel["allow_aggregate"] = False
save_config(c)
d = policy.resolve(load_config())
assert d.allow_aggregate is False
assert d.aggregate_enabled is False
assert d.may_upload_aggregate() is False

View File

@@ -0,0 +1,82 @@
"""Insights ↔ telemetry integration: the observability section in /insights output."""
from __future__ import annotations
import time
import pytest
from hermes_state import SessionDB
from agent.insights import InsightsEngine
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
@pytest.fixture()
def db(tmp_path):
session_db = SessionDB(db_path=tmp_path / "ins_tel.db")
yield session_db
session_db.close()
def _seed_telemetry(db_path):
em = TelemetryEmitter(events_path=db_path.parent / "tel" / "events.jsonl", db_path=db_path)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
platform="telegram", end_reason="completed",
start_ns=now - 90_000_000, end_ns=now))
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli",
end_reason="failed", start_ns=now - 11_000_000, end_ns=now))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=5000, output_tokens=800,
cache_read_tokens=1000, latency_ms=2200))
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search", result_class="ok"))
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate", result_class="error"))
em.flush()
em.close()
def test_report_includes_telemetry_when_present(db):
# A session so generate() isn't the empty branch
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
_seed_telemetry(db.db_path)
engine = InsightsEngine(db)
report = engine.generate(days=30)
tel = report.get("telemetry")
assert tel, "telemetry section missing"
assert tel["workflows"]["total_runs"] == 2
assert tel["workflows"]["success_rate"] == 0.5
assert tel["tool_calls"]["total"] == 2
assert tel["tool_calls"]["failure_rate"] == 0.5
assert tel["model_calls"]["by_provider"]["anthropic"] == 1
def test_terminal_output_renders_observability_section(db):
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
_seed_telemetry(db.db_path)
engine = InsightsEngine(db)
out = engine.format_terminal(engine.generate(days=30))
assert "Observability" in out
assert "Workflows:" in out
assert "Failure rate:" in out
assert "Providers:" in out
def test_telemetry_section_absent_when_no_tel_rows(db):
# Session present, but no telemetry events seeded.
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report.get("telemetry") == {}
out = engine.format_terminal(report)
assert "Observability" not in out
def test_empty_report_has_telemetry_key(db):
# No sessions at all -> empty branch still carries the key (renderer-safe).
engine = InsightsEngine(db)
report = engine.generate(days=30)
assert report.get("empty") is True
assert report.get("telemetry") == {}

View File

@@ -0,0 +1,171 @@
"""OTLP exporter tests.
Skip cleanly when the optional OTel SDK isn't installed. When it is, verify:
* event -> OTel span attribute mapping
* headers_env resolves the value from the named environment variable, not config
* a failing or slow subscriber never breaks the emitter hot path
* is_enabled / is_available gating
"""
from __future__ import annotations
import sqlite3
import time
import pytest
import hermes_state
from agent.telemetry import otlp_exporter as OE
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
otel = pytest.importorskip("opentelemetry.sdk.trace", reason="otlp extra not installed")
def _in_memory_provider():
"""A TracerProvider with an in-memory span exporter (no network)."""
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
provider = TracerProvider()
mem = InMemorySpanExporter()
provider.add_span_processor(SimpleSpanProcessor(mem))
return provider, mem
def test_event_maps_to_span_with_real_attrs():
provider, mem = _in_memory_provider()
batch = [
{"event": "run", "entrypoint": "gateway", "platform": "telegram",
"end_reason": "completed", "model_call_count": 2, "tool_call_count": 3},
{"event": "model_call", "provider": "anthropic", "model": "claude-opus-4",
"input_tokens": 5000, "output_tokens": 800},
{"event": "tool_call", "tool_name": "web_search", "result_class": "ok"},
]
n = OE.export_batch(provider, batch)
assert n == 3
spans = mem.get_finished_spans()
names = {s.name for s in spans}
assert names == {"hermes.run", "hermes.model_call", "hermes.tool_call"}
# real values present as span attributes
run = [s for s in spans if s.name == "hermes.run"][0]
assert run.attributes["hermes.entrypoint"] == "gateway"
assert run.attributes["hermes.platform"] == "telegram"
model = [s for s in spans if s.name == "hermes.model_call"][0]
assert model.attributes["hermes.model"] == "claude-opus-4"
assert model.attributes["hermes.provider"] == "anthropic"
tool = [s for s in spans if s.name == "hermes.tool_call"][0]
assert tool.attributes["hermes.tool_name"] == "web_search"
def test_headers_resolve_from_env_not_value(monkeypatch):
monkeypatch.setenv("MY_OTLP_TOKEN", "supersecretvalue")
resolved = OE._resolve_headers({"Authorization": "MY_OTLP_TOKEN"})
assert resolved == {"Authorization": "supersecretvalue"}
# missing env var -> skipped, not crashed
assert OE._resolve_headers({"X": "NOPE_NOT_SET"}) == {}
def test_is_enabled_requires_endpoint_and_flag():
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}) is True
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True}}}}) is False
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": False, "endpoint": "http://x"}}}}) is False
assert OE.is_enabled({}) is False
def test_require_sdk_routes_through_lazy_install(monkeypatch):
# _require_sdk(auto_install=True) should call lazy_deps.ensure('export.otlp').
import tools.lazy_deps as ld
calls = []
monkeypatch.setattr(ld, "ensure", lambda feature, **kw: calls.append((feature, kw)))
OE._require_sdk(auto_install=True, prompt=False)
assert calls == [("export.otlp", {"prompt": False})]
def test_is_available_does_not_install(monkeypatch):
# A pure availability check must NEVER trigger an install.
import tools.lazy_deps as ld
calls = []
monkeypatch.setattr(ld, "ensure", lambda *a, **k: calls.append(a))
OE.is_available()
assert calls == []
def test_export_otlp_feature_specs_match_pyproject():
# The LAZY_DEPS entry must track the [otlp] extra in pyproject.toml.
from tools.lazy_deps import feature_specs
import pathlib, re
specs = set(feature_specs("export.otlp"))
pyproject = pathlib.Path(__file__).resolve().parents[2] / "pyproject.toml"
text = pyproject.read_text(encoding="utf-8")
m = re.search(r"^otlp\s*=\s*\[([^\]]*)\]", text, re.MULTILINE)
assert m, "otlp extra not found in pyproject.toml"
extra = set(re.findall(r'"([^"]+)"', m.group(1)))
assert specs == extra, f"LAZY_DEPS {specs} != pyproject extra {extra}"
def test_streamer_subscription_receives_events(tmp_path, monkeypatch):
# Wire an OTLPStreamer-like subscriber via the in-memory provider.
provider, mem = _in_memory_provider()
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
def subscriber(batch):
OE.export_batch(provider, batch)
em.subscribe(subscriber)
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed"))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic", model="claude-opus-4"))
em.flush()
em.close()
spans = mem.get_finished_spans()
assert {s.name for s in spans} == {"hermes.run", "hermes.model_call"}
def test_failing_subscriber_never_breaks_hot_path(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
def bad_subscriber(batch):
time.sleep(0.2)
raise RuntimeError("OTLP collector down")
em.subscribe(bad_subscriber)
start = time.monotonic()
for i in range(30):
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=1))
elapsed = time.monotonic() - start
# emit() returns immediately regardless of the broken subscriber
assert elapsed < 1.0
em.flush()
# durable writes still happened despite the subscriber raising
conn = sqlite3.connect(db)
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 30
conn.close()
em.close()
def test_export_once_reads_db_and_returns_count(tmp_path, monkeypatch):
db = tmp_path / "state.db"
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
start_ns=time.time_ns(), end_ns=time.time_ns()))
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
em.flush(); em.close()
# Patch the provider builder to an in-memory one (no network). The processor
# stand-in only needs force_flush(); provider.shutdown() works on the real one.
provider, mem = _in_memory_provider()
class _Proc:
def force_flush(self, *a, **k):
return True
monkeypatch.setattr(OE, "_make_provider", lambda config: (provider, _Proc()))
n = OE.export_once({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}, db_path=db)
assert n == 2
assert len(mem.get_finished_spans()) == 2

View File

@@ -0,0 +1,53 @@
"""Telemetry plugin auto-load gating: on by default, off when telemetry.local=false."""
from __future__ import annotations
import hermes_cli.plugins as plugins_mod
def test_local_enabled_defaults_true(monkeypatch):
monkeypatch.setattr(plugins_mod, "load_config", lambda: {}, raising=False)
# With no telemetry section, the default is on.
monkeypatch.setattr(
"hermes_cli.config.load_config", lambda: {}, raising=False
)
assert plugins_mod._telemetry_local_enabled() is True
def test_local_disabled_when_config_false(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": {"local": False}},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is False
def test_local_enabled_when_config_true(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": {"local": True}},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is True
def test_malformed_config_defaults_on(monkeypatch):
monkeypatch.setattr(
"hermes_cli.config.load_config",
lambda: {"telemetry": "not a dict"},
raising=False,
)
assert plugins_mod._telemetry_local_enabled() is True
def test_plugin_manifest_is_discoverable():
"""The bundled telemetry plugin.yaml exists and declares the lifecycle hooks."""
from pathlib import Path
import hermes_cli.plugins as p
bundled = p.get_bundled_plugins_dir()
manifest = bundled / "telemetry" / "plugin.yaml"
assert manifest.exists(), f"missing {manifest}"
text = manifest.read_text(encoding="utf-8")
for hook in ("post_api_request", "post_tool_call", "on_session_finalize"):
assert hook in text

View File

@@ -0,0 +1,135 @@
"""Telemetry plugin hook tests — feed realistic kwargs, assert tel_* rows + no leak."""
from __future__ import annotations
import sqlite3
import pytest
import hermes_state
from agent.telemetry import emitter as emitter_mod
from agent.telemetry.emitter import TelemetryEmitter
@pytest.fixture
def wired(tmp_path, monkeypatch):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
emitter_mod.reset_emitter_for_tests(em)
# reset the plugin's per-run accumulators between tests
import plugins.telemetry as plug
plug._runs.clear()
yield db, em, plug
em.flush()
em.close()
emitter_mod.reset_emitter_for_tests(None)
def test_full_session_lifecycle_produces_rows(wired):
db, em, plug = wired
plug._on_session_start(session_id="sess1", platform="telegram")
plug._on_post_api_request(
session_id="sess1", platform="telegram",
provider="anthropic", base_url=None, model="claude-opus-4",
api_duration=2.5,
usage={"input_tokens": 5000, "output_tokens": 800, "cache_read_tokens": 1000,
"cache_write_tokens": 0, "reasoning_tokens": 0},
)
plug._on_post_tool_call(
session_id="sess1", platform="telegram",
function_name="web_search", duration_ms=812, result="{\"data\": \"...\"}",
)
plug._on_session_finalize(
session_id="sess1", platform="telegram",
turn_exit_reason="completed", estimated_cost_usd=0.042, cost_status="known",
)
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
run = conn.execute("SELECT * FROM tel_runs").fetchone()
assert run is not None
assert run["entrypoint"] == "gateway"
assert run["platform"] == "telegram"
assert run["end_reason"] == "completed"
assert run["model_call_count"] == 1
assert run["tool_call_count"] == 1
assert abs(run["estimated_cost_usd"] - 0.042) < 1e-9
mc = conn.execute("SELECT * FROM tel_model_calls").fetchone()
assert mc["provider"] == "anthropic"
assert mc["model"] == "claude-opus-4"
assert mc["input_tokens"] == 5000
tc = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
assert tc["tool_name"] == "web_search"
assert tc["result_class"] == "ok"
conn.close()
def test_tool_error_result_classified_and_counted(wired):
db, em, plug = wired
plug._on_session_start(session_id="s2", platform="cli")
plug._on_post_tool_call(
session_id="s2", function_name="terminal", duration_ms=10,
result="{\"error\": \"command failed\"}",
)
plug._on_session_finalize(session_id="s2", turn_exit_reason="completed")
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
tc = conn.execute("SELECT result_class, tool_name FROM tel_tool_calls").fetchone()
assert tc["result_class"] == "error"
assert tc["tool_name"] == "terminal"
run = conn.execute("SELECT error_count FROM tel_runs").fetchone()
assert run["error_count"] == 1
conn.close()
def test_api_error_recorded(wired):
db, em, plug = wired
plug._on_session_start(session_id="s3", platform="cli")
plug._on_api_request_error(session_id="s3", error_type="provider timeout after 60s")
plug._on_session_finalize(session_id="s3", failed=True)
em.flush()
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
err = conn.execute("SELECT error_class, subsystem FROM tel_error_events").fetchone()
assert err["error_class"] == "provider_timeout"
assert err["subsystem"] == "model_api"
run = conn.execute("SELECT end_reason FROM tel_runs").fetchone()
assert run["end_reason"] == "failed"
conn.close()
def test_hooks_never_raise_on_garbage_kwargs(wired):
_, em, plug = wired
# Missing everything — must be swallowed by the _safe wrapper.
plug._on_post_api_request()
plug._on_post_tool_call()
plug._on_session_finalize()
plug._on_api_request_error()
em.flush()
def test_no_message_content_in_tool_rows(wired):
"""The tool hook receives a result blob; only the classification persists, not content."""
db, em, plug = wired
plug._on_session_start(session_id="s4", platform="cli")
secret = "{\"data\": \"USER SECRET sk-ABCDEF and /Users/alice/file.txt\"}"
plug._on_post_tool_call(session_id="s4", function_name="web_search",
duration_ms=5, result=secret)
plug._on_session_finalize(session_id="s4")
em.flush()
# The whole tel_tool_calls row must contain none of the result content.
conn = sqlite3.connect(db)
row = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
conn.close()
blob = " ".join(str(x) for x in row)
assert "sk-ABCDEF" not in blob
assert "/Users/alice" not in blob
assert "SECRET" not in blob

View File

@@ -0,0 +1,61 @@
"""Consent posture + org-policy enforcement tests.
Consent is a single field (``telemetry.consent_state``); the aggregate opt-in is
expressed by setting it to ``"aggregate"`` (via ``hermes config set`` or a managed-scope
pin). ``allow_aggregate`` is the hard gate.
"""
from __future__ import annotations
from agent.telemetry import policy
def _cfg(**telemetry):
return {"telemetry": telemetry}
def test_default_posture_is_local_only():
d = policy.resolve(_cfg(local=True, consent_state="unknown"))
assert d.local_enabled is True
assert d.aggregate_enabled is False
assert d.may_upload_aggregate() is False
def test_unknown_consent_never_uploads():
# A headless box with no choice recorded: stays unknown, never uploads.
d = policy.resolve(_cfg(local=True, consent_state="unknown"))
assert d.may_upload_aggregate() is False
def test_opted_in_uploads():
d = policy.resolve(_cfg(local=True, consent_state="aggregate"))
assert d.aggregate_enabled is True
assert d.may_upload_aggregate() is True
def test_declined_does_not_upload():
d = policy.resolve(_cfg(local=True, consent_state="local"))
assert d.may_upload_aggregate() is False
def test_allow_aggregate_false_overrides_opt_in():
# An admin pins telemetry.allow_aggregate: false via managed scope.
cfg = _cfg(local=True, consent_state="aggregate", allow_aggregate=False)
d = policy.resolve(cfg)
assert d.allow_aggregate is False
assert d.aggregate_enabled is False
assert d.may_upload_aggregate() is False # the hard gate wins
def test_invalid_consent_state_treated_as_unknown():
d = policy.resolve(_cfg(local=True, consent_state="bogus"))
assert d.consent_state == "unknown"
assert d.may_upload_aggregate() is False
def test_install_id_minted_when_empty_and_stable_when_set():
cfg = _cfg(install_id="")
minted = policy.ensure_install_id(cfg)
assert minted and len(minted) >= 32 # uuid4
cfg2 = _cfg(install_id="fixed-id")
assert policy.ensure_install_id(cfg2) == "fixed-id"

View File

@@ -0,0 +1,88 @@
"""rollup tests: tel_* -> per-run summary events with REAL values (local only)."""
from __future__ import annotations
import sqlite3
import time
import hermes_state
from agent.telemetry import rollup
from agent.telemetry.emitter import TelemetryEmitter
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
def _seed(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
now = time.time_ns()
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
platform="telegram", end_reason="completed",
start_ns=now - 90_000_000, end_ns=now,
model_call_count=2, tool_call_count=2, estimated_cost_usd=2.1))
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=60000, output_tokens=8000))
em.emit(ModelCallEvent(span_id="m2", run_id="r1", provider="anthropic",
model="claude-opus-4", input_tokens=5000, output_tokens=500))
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search",
result_class="ok"))
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate",
result_class="ok"))
# an in-progress run (no end_ns) must be excluded
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli", start_ns=now))
em.flush()
em.close()
return db
def test_builds_one_event_per_completed_run_with_real_values(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db,
include_heartbeat=False)
wf = [e for e in events if e["event_name"] == "workflow_completed"]
assert len(wf) == 1 # r2 (no end_ns) excluded
e = wf[0]
assert e["entrypoint"] == "gateway"
assert e["platform"] == "telegram"
# REAL model id + provider, not a bucket/class
models = {m["model"] for m in e["models_used"]}
assert models == {"claude-opus-4"}
assert e["models_used"][0]["provider"] == "anthropic"
assert sorted(e["tools_used"]) == ["browser_navigate", "web_search"]
# real token totals, not buckets
assert e["input_tokens"] == 65000
assert e["output_tokens"] == 8500
def test_real_model_and_tool_names_present(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
blob = " ".join(str(v) for e in events for v in e.values())
assert "claude-opus-4" in blob
assert "web_search" in blob
def test_heartbeat_included_by_default(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
assert any(e["event_name"] == "heartbeat" for e in events)
def test_summarize_counts_by_event_name(tmp_path):
db = _seed(tmp_path)
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
s = rollup.summarize(events)
assert s["total"] == len(events)
assert s["by_event_name"]["workflow_completed"] == 1
assert s["by_event_name"]["heartbeat"] == 1
def test_empty_db_yields_only_heartbeat(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.close()
events = rollup.build_aggregate_events(install_id="x", db_path=db)
assert [e["event_name"] for e in events] == ["heartbeat"]

View File

@@ -0,0 +1,46 @@
"""Schema tests: tel_* tables exist after init; SCHEMA_VERSION bumped."""
from __future__ import annotations
import sqlite3
import hermes_state
TEL_TABLES = {
"tel_runs", "tel_spans", "tel_model_calls", "tel_tool_calls",
"tel_gateway_events", "tel_cron_events", "tel_skill_events",
"tel_memory_events", "tel_feedback_events", "tel_error_events",
}
def test_schema_version_is_17_or_higher():
assert hermes_state.SCHEMA_VERSION >= 17
def test_tel_tables_present_in_schema_sql():
for tbl in TEL_TABLES:
assert f"CREATE TABLE IF NOT EXISTS {tbl}" in hermes_state.SCHEMA_SQL
def test_tel_tables_created_on_executescript(tmp_path):
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
rows = {
r[0]
for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
}
conn.close()
assert TEL_TABLES.issubset(rows), f"missing: {TEL_TABLES - rows}"
def test_executescript_is_idempotent(tmp_path):
# IF NOT EXISTS means re-running on an existing DB is a no-op, not an error.
db = tmp_path / "state.db"
conn = sqlite3.connect(db)
conn.executescript(hermes_state.SCHEMA_SQL)
conn.executescript(hermes_state.SCHEMA_SQL) # second run must not raise
conn.close()

View File

@@ -116,6 +116,15 @@ LAZY_DEPS: dict[str, tuple[str, ...]] = {
# ─── Image generation backends ─────────────────────────────────────────
"image.fal": ("fal-client==0.13.1",),
# ─── Observability ─────────────────────────────────────────────────────
# OTLP telemetry export. Lazily installed on first use of
# `hermes telemetry export --otlp`. Tracks the `otlp` extra in
# pyproject.toml — bump both together.
"export.otlp": (
"opentelemetry-sdk==1.30.0",
"opentelemetry-exporter-otlp-proto-http==1.30.0",
),
# ─── Memory providers ──────────────────────────────────────────────────
"memory.honcho": ("honcho-ai==2.0.1",),
"memory.hindsight": ("hindsight-client==0.6.1",),