mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-25 03:13:46 +08:00
Compare commits
1 Commits
ethie/fast
...
feat/telem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ccfa079252 |
@@ -81,6 +81,19 @@ def _bar_chart(values: List[int], max_width: int = 20) -> List[str]:
|
||||
return ["█" * max(1, int(v / peak * max_width)) if v > 0 else "" for v in values]
|
||||
|
||||
|
||||
def _fmt_ms(ms: float) -> str:
|
||||
"""Compact human duration from milliseconds (e.g. 850ms, 2.4s, 1.5m)."""
|
||||
try:
|
||||
ms = float(ms or 0)
|
||||
except (TypeError, ValueError):
|
||||
return "0ms"
|
||||
if ms < 1000:
|
||||
return f"{int(ms)}ms"
|
||||
if ms < 60_000:
|
||||
return f"{ms / 1000:.1f}s"
|
||||
return f"{ms / 60_000:.1f}m"
|
||||
|
||||
|
||||
class InsightsEngine:
|
||||
"""
|
||||
Analyzes session history and produces usage insights.
|
||||
@@ -138,6 +151,7 @@ class InsightsEngine:
|
||||
},
|
||||
"activity": {},
|
||||
"top_sessions": [],
|
||||
"telemetry": {},
|
||||
}
|
||||
|
||||
# Compute insights
|
||||
@@ -148,6 +162,7 @@ class InsightsEngine:
|
||||
skills = self._compute_skill_breakdown(skill_usage)
|
||||
activity = self._compute_activity_patterns(sessions)
|
||||
top_sessions = self._compute_top_sessions(sessions)
|
||||
telemetry = self._compute_telemetry(cutoff)
|
||||
|
||||
return {
|
||||
"days": days,
|
||||
@@ -161,8 +176,37 @@ class InsightsEngine:
|
||||
"skills": skills,
|
||||
"activity": activity,
|
||||
"top_sessions": top_sessions,
|
||||
"telemetry": telemetry,
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# Telemetry (observability) — from the tel_* tables (local plane)
|
||||
# =========================================================================
|
||||
|
||||
def _compute_telemetry(self, cutoff: float) -> Dict[str, Any]:
|
||||
"""Roll up the local telemetry tables for the same window.
|
||||
|
||||
Reuses the engine's existing connection. Fully fail-soft: if the tel_*
|
||||
tables are empty or absent (telemetry.local disabled, fresh install), this
|
||||
returns an empty dict and the renderer skips the section.
|
||||
"""
|
||||
try:
|
||||
from agent.telemetry import metrics
|
||||
except Exception:
|
||||
return {}
|
||||
try:
|
||||
since_ns = int(cutoff * 1e9)
|
||||
if not metrics.has_data(conn=self._conn):
|
||||
return {}
|
||||
return {
|
||||
"workflows": metrics.workflow_summary(since_ns=since_ns, conn=self._conn),
|
||||
"model_calls": metrics.model_call_summary(since_ns=since_ns, conn=self._conn),
|
||||
"tool_calls": metrics.tool_call_summary(conn=self._conn),
|
||||
"errors": metrics.error_summary(conn=self._conn),
|
||||
}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
# =========================================================================
|
||||
# Data gathering (SQL queries)
|
||||
# =========================================================================
|
||||
@@ -852,8 +896,80 @@ class InsightsEngine:
|
||||
lines.append(f" {ts['label']:<20} {ts['value']:<18} ({ts['date']}, {ts['session_id']})")
|
||||
lines.append("")
|
||||
|
||||
# Telemetry / observability (local plane) — only when data exists
|
||||
tel = report.get("telemetry") or {}
|
||||
if tel:
|
||||
self._append_telemetry_section(lines, tel)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _append_telemetry_section(self, lines: List[str], tel: Dict[str, Any]) -> None:
|
||||
"""Render the observability rollups (workflows, tools, providers, errors)."""
|
||||
wf = tel.get("workflows", {})
|
||||
mc = tel.get("model_calls", {})
|
||||
tc = tel.get("tool_calls", {})
|
||||
errs = tel.get("errors", {}).get("by_class", {})
|
||||
|
||||
lines.append(" 📡 Observability (local telemetry)")
|
||||
lines.append(" " + "─" * 56)
|
||||
|
||||
total_runs = wf.get("total_runs", 0)
|
||||
if total_runs:
|
||||
sr = wf.get("success_rate", 0.0) * 100
|
||||
p50 = wf.get("duration_ms_p50", 0)
|
||||
p95 = wf.get("duration_ms_p95", 0)
|
||||
lines.append(
|
||||
f" Workflows: {total_runs:,} Success: {sr:.1f}% "
|
||||
f"Duration p50/p95: {_fmt_ms(p50)} / {_fmt_ms(p95)}"
|
||||
)
|
||||
by_entry = wf.get("by_entrypoint", {})
|
||||
if by_entry:
|
||||
entry_str = ", ".join(
|
||||
f"{k}: {v}" for k, v in sorted(by_entry.items(), key=lambda x: -x[1])
|
||||
)
|
||||
lines.append(f" Entrypoints: {entry_str}")
|
||||
|
||||
# Tool reliability
|
||||
if tc.get("total"):
|
||||
fail_pct = tc.get("failure_rate", 0.0) * 100
|
||||
lines.append(
|
||||
f" Tool calls: {tc['total']:,} Failure rate: {fail_pct:.1f}%"
|
||||
)
|
||||
tools = tc.get("by_tool", {})
|
||||
fails = tc.get("failures_by_tool", {})
|
||||
top = sorted(tools.items(), key=lambda x: -x[1])[:6]
|
||||
if top:
|
||||
parts = []
|
||||
for name, n in top:
|
||||
f = fails.get(name, 0)
|
||||
parts.append(f"{name}: {n}" + (f" ({f} failed)" if f else ""))
|
||||
lines.append(" " + " ".join(parts))
|
||||
|
||||
# Provider / model mix + cache (real names)
|
||||
by_provider = mc.get("by_provider", {})
|
||||
if by_provider:
|
||||
prov_str = ", ".join(
|
||||
f"{k}: {v}" for k, v in sorted(by_provider.items(), key=lambda x: -x[1])
|
||||
)
|
||||
lines.append(f" Providers: {prov_str}")
|
||||
by_model = mc.get("by_model", {})
|
||||
if by_model:
|
||||
model_str = ", ".join(
|
||||
f"{k}: {v}" for k, v in sorted(by_model.items(), key=lambda x: -x[1])[:8]
|
||||
)
|
||||
cache = mc.get("cache_hit_rate", 0.0) * 100
|
||||
suffix = f" Cache hit: {cache:.1f}%" if cache else ""
|
||||
lines.append(f" Models: {model_str}{suffix}")
|
||||
|
||||
# Error classes
|
||||
if errs:
|
||||
err_str = ", ".join(
|
||||
f"{k}: {v}" for k, v in sorted(errs.items(), key=lambda x: -x[1])[:6]
|
||||
)
|
||||
lines.append(f" Errors: {err_str}")
|
||||
|
||||
lines.append("")
|
||||
|
||||
def format_gateway(self, report: Dict) -> str:
|
||||
"""Format the insights report for gateway/messaging (shorter)."""
|
||||
if report.get("empty"):
|
||||
|
||||
30
agent/telemetry/__init__.py
Normal file
30
agent/telemetry/__init__.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""Hermes telemetry & observability.
|
||||
|
||||
Local-first observability, on by default. The ``telemetry`` plugin registers Hermes
|
||||
lifecycle hooks and hands typed events to the fire-and-forget ``emitter`` (queue ->
|
||||
background writer -> JSONL + state.db ``tel_*`` index). The emitter never blocks or
|
||||
raises into a model/tool call (the hot-path invariant).
|
||||
|
||||
Events record the observed model ids, provider names, and tool names. ``metrics``
|
||||
derives rollups for /usage and /insights; ``rollup`` builds the per-run summaries shown
|
||||
by ``hermes telemetry preview``. ``redaction`` + ``exporter_bulk`` + ``otlp_exporter``
|
||||
handle export to an operator-chosen destination. ``policy`` holds the consent state
|
||||
machine for the opt-in aggregate plane (no uploader ships).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from . import emitter, events, metrics, policy, spans
|
||||
|
||||
emit = emitter.emit
|
||||
get_emitter = emitter.get_emitter
|
||||
|
||||
__all__ = [
|
||||
"emitter",
|
||||
"events",
|
||||
"metrics",
|
||||
"policy",
|
||||
"spans",
|
||||
"emit",
|
||||
"get_emitter",
|
||||
]
|
||||
317
agent/telemetry/emitter.py
Normal file
317
agent/telemetry/emitter.py
Normal file
@@ -0,0 +1,317 @@
|
||||
"""Local-plane telemetry emitter: fire-and-forget queue + background writer.
|
||||
|
||||
The emitter is the single seam between instrumentation (the telemetry plugin's hook
|
||||
callbacks) and durable storage. Its contract is the hot-path invariant:
|
||||
|
||||
``emit()`` MUST return in O(microseconds), MUST NOT block on disk/network, and
|
||||
MUST NEVER raise into the caller. A telemetry failure is logged locally and
|
||||
dropped — it can never affect a model call, a tool call, or a session.
|
||||
|
||||
Mechanism:
|
||||
* ``emit(event)`` does a non-blocking ``queue.put_nowait`` wrapped in a bare except.
|
||||
On a full queue it drops the *oldest* event and counts the drop.
|
||||
* A daemon thread drains the queue and writes each event to two places:
|
||||
1. the append-only JSONL log (source of truth)
|
||||
2. the ``tel_*`` SQLite tables in state.db (rebuildable index)
|
||||
* The writer uses its own sqlite connection to state.db, separate from SessionDB,
|
||||
so telemetry writes never contend with or corrupt session writes.
|
||||
|
||||
Local plane only. Nothing here uploads anywhere.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import queue
|
||||
import sqlite3
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_MAX_QUEUE = 10_000 # ring-buffer depth; oldest dropped when full
|
||||
_DRAIN_BATCH = 256
|
||||
|
||||
|
||||
def _default_dir() -> Path:
|
||||
"""Resolve the telemetry dir under the active HERMES_HOME (profile-safe)."""
|
||||
from hermes_constants import get_hermes_home
|
||||
return get_hermes_home() / "telemetry"
|
||||
|
||||
|
||||
def _default_db_path() -> Path:
|
||||
"""Resolve state.db under the active HERMES_HOME (profile-safe)."""
|
||||
from hermes_constants import get_hermes_home
|
||||
return get_hermes_home() / "state.db"
|
||||
|
||||
|
||||
# Map a telemetry event dict (its "event" tag) to (table, column-ordered insert).
|
||||
# Only the columns the indexer knows about are written; unknown keys are ignored,
|
||||
# so an event carrying extra fields never breaks the insert.
|
||||
_TABLE_COLUMNS: Dict[str, tuple] = {
|
||||
"run": (
|
||||
"tel_runs",
|
||||
("run_id", "trace_id", "session_id", "profile_id", "entrypoint",
|
||||
"platform", "start_ns", "end_ns", "end_reason",
|
||||
"model_call_count", "tool_call_count", "error_count",
|
||||
"estimated_cost_usd", "cost_status"),
|
||||
),
|
||||
"model_call": (
|
||||
"tel_model_calls",
|
||||
("span_id", "run_id", "provider", "model", "base_url",
|
||||
"input_tokens", "output_tokens", "cache_read_tokens",
|
||||
"cache_write_tokens", "reasoning_tokens", "latency_ms", "ttft_ms",
|
||||
"estimated_cost_usd", "cost_status", "cost_source", "end_reason",
|
||||
"retry_count"),
|
||||
),
|
||||
"tool_call": (
|
||||
"tel_tool_calls",
|
||||
("span_id", "run_id", "tool_name", "backend",
|
||||
"duration_ms", "result_class", "retry_count", "approval"),
|
||||
),
|
||||
"error": (
|
||||
"tel_error_events",
|
||||
("run_id", "error_class", "subsystem", "recovery", "ts_ns"),
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class TelemetryEmitter:
|
||||
"""Owns the queue, the writer thread, and the telemetry sqlite connection."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
events_path: Optional[Path] = None,
|
||||
db_path: Optional[Path] = None,
|
||||
enabled: bool = True,
|
||||
) -> None:
|
||||
self._dir = (events_path.parent if events_path else _default_dir())
|
||||
self._events_path = events_path or (self._dir / "events.jsonl")
|
||||
self._db_path = db_path or _default_db_path()
|
||||
self._enabled = enabled
|
||||
self._q: "queue.Queue[Dict[str, Any]]" = queue.Queue(maxsize=_MAX_QUEUE)
|
||||
self._dropped = 0
|
||||
self._written = 0
|
||||
self._stop = threading.Event()
|
||||
self._started = False
|
||||
self._lock = threading.Lock()
|
||||
self._conn: Optional[sqlite3.Connection] = None
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
# Optional live subscribers (e.g. OTLP exporter). Called from the writer
|
||||
# thread AFTER durable writes, fully fail-isolated — a subscriber that
|
||||
# raises or blocks can never affect the JSONL/SQLite source of truth or
|
||||
# the hot path. Each subscriber is callable(batch: list[dict]).
|
||||
self._subscribers: list = []
|
||||
|
||||
# ── public API (hot path) ───────────────────────────────────────────────
|
||||
def emit(self, event: Any) -> None:
|
||||
"""Enqueue an event. Never blocks, never raises.
|
||||
|
||||
``event`` may be a dataclass with ``to_dict()`` or a plain dict.
|
||||
"""
|
||||
if not self._enabled:
|
||||
return
|
||||
try:
|
||||
payload = event.to_dict() if hasattr(event, "to_dict") else dict(event)
|
||||
payload.setdefault("ts_ns", time.time_ns())
|
||||
self._ensure_started()
|
||||
try:
|
||||
self._q.put_nowait(payload)
|
||||
except queue.Full:
|
||||
# Drop oldest to make room — bounded memory, newest-wins.
|
||||
try:
|
||||
self._q.get_nowait()
|
||||
self._dropped += 1
|
||||
self._q.put_nowait(payload)
|
||||
except Exception:
|
||||
self._dropped += 1
|
||||
except Exception: # the hot-path invariant: never propagate
|
||||
logger.debug("telemetry emit failed", exc_info=True)
|
||||
|
||||
# ── lifecycle ───────────────────────────────────────────────────────────
|
||||
def _ensure_started(self) -> None:
|
||||
if self._started:
|
||||
return
|
||||
with self._lock:
|
||||
if self._started:
|
||||
return
|
||||
try:
|
||||
self._dir.mkdir(parents=True, exist_ok=True)
|
||||
except Exception:
|
||||
logger.debug("telemetry dir create failed", exc_info=True)
|
||||
self._thread = threading.Thread(
|
||||
target=self._run, name="hermes-telemetry-writer", daemon=True
|
||||
)
|
||||
self._thread.start()
|
||||
self._started = True
|
||||
|
||||
def _open_conn(self) -> Optional[sqlite3.Connection]:
|
||||
if self._conn is not None:
|
||||
return self._conn
|
||||
try:
|
||||
conn = sqlite3.connect(str(self._db_path), isolation_level=None, timeout=5.0)
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
self._conn = conn
|
||||
except Exception:
|
||||
logger.debug("telemetry db open failed", exc_info=True)
|
||||
self._conn = None
|
||||
return self._conn
|
||||
|
||||
def _run(self) -> None:
|
||||
while not self._stop.is_set():
|
||||
try:
|
||||
first = self._q.get(timeout=0.5)
|
||||
except queue.Empty:
|
||||
continue
|
||||
batch = [first]
|
||||
while len(batch) < _DRAIN_BATCH:
|
||||
try:
|
||||
batch.append(self._q.get_nowait())
|
||||
except queue.Empty:
|
||||
break
|
||||
self._write_batch(batch)
|
||||
|
||||
def _write_batch(self, batch) -> None:
|
||||
# JSONL append (source of truth) — best effort.
|
||||
try:
|
||||
with open(self._events_path, "a", encoding="utf-8") as fh:
|
||||
for ev in batch:
|
||||
fh.write(json.dumps(ev, ensure_ascii=False) + "\n")
|
||||
except Exception:
|
||||
logger.debug("telemetry jsonl append failed", exc_info=True)
|
||||
|
||||
# SQLite index — best effort, per-event so one bad row can't lose the batch.
|
||||
conn = self._open_conn()
|
||||
if conn is None:
|
||||
return
|
||||
for ev in batch:
|
||||
try:
|
||||
self._index_one(conn, ev)
|
||||
self._written += 1
|
||||
except Exception:
|
||||
logger.debug("telemetry index row failed", exc_info=True)
|
||||
|
||||
# Live fan-out (e.g. OTLP) — AFTER durable writes, fully fail-isolated.
|
||||
# A slow/raising subscriber never affects JSONL/SQLite or the hot path.
|
||||
for sub in self._subscribers:
|
||||
try:
|
||||
sub(batch)
|
||||
except Exception:
|
||||
logger.debug("telemetry subscriber failed", exc_info=True)
|
||||
|
||||
def subscribe(self, callback) -> None:
|
||||
"""Register a live batch subscriber (callable(batch: list[dict])).
|
||||
|
||||
Called from the writer thread after durable writes. Used by the OTLP
|
||||
exporter for continuous streaming. Fail-isolated; never on the hot path.
|
||||
"""
|
||||
if callback not in self._subscribers:
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback) -> None:
|
||||
try:
|
||||
self._subscribers.remove(callback)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def _index_one(self, conn: sqlite3.Connection, ev: Dict[str, Any]) -> None:
|
||||
kind = ev.get("event")
|
||||
spec = _TABLE_COLUMNS.get(kind)
|
||||
if spec is None:
|
||||
return
|
||||
table, cols = spec
|
||||
values = [ev.get(c) for c in cols]
|
||||
placeholders = ", ".join("?" for _ in cols)
|
||||
collist = ", ".join(cols)
|
||||
conn.execute(
|
||||
f"INSERT OR REPLACE INTO {table} ({collist}) VALUES ({placeholders})",
|
||||
values,
|
||||
)
|
||||
|
||||
# ── introspection / shutdown (tests, CLI) ───────────────────────────────
|
||||
def flush(self, timeout: float = 2.0) -> None:
|
||||
"""Block until the queue drains (test/CLI helper, NOT the hot path)."""
|
||||
deadline = time.monotonic() + timeout
|
||||
while time.monotonic() < deadline:
|
||||
if self._q.empty():
|
||||
# give the writer a tick to finish the in-flight batch
|
||||
time.sleep(0.05)
|
||||
if self._q.empty():
|
||||
return
|
||||
time.sleep(0.02)
|
||||
|
||||
def stats(self) -> Dict[str, int]:
|
||||
return {
|
||||
"queued": self._q.qsize(),
|
||||
"written": self._written,
|
||||
"dropped": self._dropped,
|
||||
}
|
||||
|
||||
def close(self) -> None:
|
||||
self._stop.set()
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=2.0)
|
||||
if self._conn is not None:
|
||||
try:
|
||||
self._conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._conn = None
|
||||
self._started = False
|
||||
|
||||
|
||||
# ── process-wide singleton ──────────────────────────────────────────────────
|
||||
_EMITTER: Optional[TelemetryEmitter] = None
|
||||
_EMITTER_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def get_emitter() -> TelemetryEmitter:
|
||||
"""Return the process-wide emitter, honoring telemetry.local config."""
|
||||
global _EMITTER
|
||||
if _EMITTER is not None:
|
||||
return _EMITTER
|
||||
with _EMITTER_LOCK:
|
||||
if _EMITTER is None:
|
||||
enabled = _local_enabled()
|
||||
_EMITTER = TelemetryEmitter(enabled=enabled)
|
||||
return _EMITTER
|
||||
|
||||
|
||||
def _local_enabled() -> bool:
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
tel = cfg.get("telemetry") if isinstance(cfg, dict) else {}
|
||||
return bool((tel or {}).get("local", True))
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def emit(event: Any) -> None:
|
||||
"""Module-level convenience: emit via the singleton."""
|
||||
get_emitter().emit(event)
|
||||
|
||||
|
||||
def reset_emitter_for_tests(emitter: Optional[TelemetryEmitter] = None) -> None:
|
||||
"""Swap the singleton (tests only)."""
|
||||
global _EMITTER
|
||||
with _EMITTER_LOCK:
|
||||
if _EMITTER is not None and emitter is not _EMITTER:
|
||||
try:
|
||||
_EMITTER.close()
|
||||
except Exception:
|
||||
pass
|
||||
_EMITTER = emitter
|
||||
|
||||
|
||||
__all__ = [
|
||||
"TelemetryEmitter",
|
||||
"get_emitter",
|
||||
"emit",
|
||||
"reset_emitter_for_tests",
|
||||
]
|
||||
99
agent/telemetry/events.py
Normal file
99
agent/telemetry/events.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Typed local-plane telemetry events.
|
||||
|
||||
These dataclasses are the rows written to the local JSONL log and the ``tel_*``
|
||||
SQLite tables. They record the values observed for each run — model id, provider, tool
|
||||
name, token counts, durations — and stay on the machine unless explicitly exported.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
# ── local-plane events (real values) ────────────────────────────────────────
|
||||
|
||||
|
||||
def _now_ns() -> int:
|
||||
return time.time_ns()
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RunEvent:
|
||||
"""One top-level workflow execution (a trace root)."""
|
||||
run_id: str
|
||||
trace_id: str
|
||||
entrypoint: str
|
||||
session_id: Optional[str] = None
|
||||
profile_id: Optional[str] = None
|
||||
platform: Optional[str] = None
|
||||
start_ns: int = field(default_factory=_now_ns)
|
||||
end_ns: Optional[int] = None
|
||||
end_reason: Optional[str] = None
|
||||
model_call_count: int = 0
|
||||
tool_call_count: int = 0
|
||||
error_count: int = 0
|
||||
estimated_cost_usd: Optional[float] = None
|
||||
cost_status: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"event": "run", **asdict(self)}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ModelCallEvent:
|
||||
span_id: str
|
||||
run_id: str
|
||||
provider: Optional[str] = None # raw provider, e.g. "anthropic"
|
||||
model: Optional[str] = None # raw model id, e.g. "claude-opus-4"
|
||||
base_url: Optional[str] = None
|
||||
input_tokens: int = 0
|
||||
output_tokens: int = 0
|
||||
cache_read_tokens: int = 0
|
||||
cache_write_tokens: int = 0
|
||||
reasoning_tokens: int = 0
|
||||
latency_ms: Optional[int] = None
|
||||
ttft_ms: Optional[int] = None
|
||||
estimated_cost_usd: Optional[float] = None
|
||||
cost_status: Optional[str] = None
|
||||
cost_source: Optional[str] = None
|
||||
end_reason: Optional[str] = None
|
||||
retry_count: int = 0
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"event": "model_call", **asdict(self)}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ToolCallEvent:
|
||||
span_id: str
|
||||
run_id: str
|
||||
tool_name: Optional[str] = None # raw tool name, e.g. "web_search"
|
||||
backend: Optional[str] = None
|
||||
duration_ms: Optional[int] = None
|
||||
result_class: Optional[str] = None
|
||||
retry_count: int = 0
|
||||
approval: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"event": "tool_call", **asdict(self)}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ErrorEvent:
|
||||
run_id: Optional[str]
|
||||
error_class: str
|
||||
subsystem: str
|
||||
recovery: Optional[str] = None
|
||||
ts_ns: int = field(default_factory=_now_ns)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"event": "error", **asdict(self)}
|
||||
|
||||
|
||||
__all__ = [
|
||||
"RunEvent",
|
||||
"ModelCallEvent",
|
||||
"ToolCallEvent",
|
||||
"ErrorEvent",
|
||||
]
|
||||
139
agent/telemetry/exporter_bulk.py
Normal file
139
agent/telemetry/exporter_bulk.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Export telemetry (and optionally session content) to a file or stream.
|
||||
|
||||
Two data domains, both written to an operator-chosen destination:
|
||||
|
||||
* Telemetry: the tel_* rows + events.jsonl (structural observability).
|
||||
* Content (opt-in via the trajectories plane): sessions + messages, with every
|
||||
content field (message body, reasoning, raw tool-call args) passed through the
|
||||
redaction pipeline (secrets always stripped; PII per content_redaction).
|
||||
|
||||
Formats: ndjson (default) and json. OTLP streaming export lives in otlp_exporter.py.
|
||||
|
||||
Content export is gated by ``redaction.content_export_enabled``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, List, Optional, TextIO
|
||||
|
||||
from . import redaction
|
||||
|
||||
_TEL_TABLES = (
|
||||
"tel_runs", "tel_model_calls", "tel_tool_calls", "tel_error_events",
|
||||
)
|
||||
|
||||
|
||||
def _open(db_path: Optional[Path]) -> sqlite3.Connection:
|
||||
if db_path is None:
|
||||
from hermes_constants import get_hermes_home
|
||||
db_path = get_hermes_home() / "state.db"
|
||||
c = sqlite3.connect(str(db_path), timeout=5.0)
|
||||
c.row_factory = sqlite3.Row
|
||||
return c
|
||||
|
||||
|
||||
def _iter_telemetry(conn: sqlite3.Connection, since_ns: Optional[int]) -> Iterator[Dict[str, Any]]:
|
||||
for table in _TEL_TABLES:
|
||||
# only tel_runs has start_ns; window the rest by run join when needed.
|
||||
if table == "tel_runs" and since_ns:
|
||||
rows = conn.execute(
|
||||
f"SELECT * FROM {table} WHERE start_ns >= ?", (int(since_ns),)
|
||||
).fetchall()
|
||||
else:
|
||||
rows = conn.execute(f"SELECT * FROM {table}").fetchall()
|
||||
for r in rows:
|
||||
d = dict(r)
|
||||
d["_kind"] = table
|
||||
yield d
|
||||
|
||||
|
||||
def _iter_content(
|
||||
db_path: Optional[Path],
|
||||
*,
|
||||
config: Optional[Dict[str, Any]],
|
||||
include_content: bool,
|
||||
) -> Iterator[Dict[str, Any]]:
|
||||
"""Yield session records. Message bodies included only when trajectories on."""
|
||||
from hermes_state import SessionDB
|
||||
|
||||
content_mode = redaction.content_mode_for(config)
|
||||
db = SessionDB(db_path=db_path) if db_path else SessionDB()
|
||||
try:
|
||||
for session in db.export_all():
|
||||
msgs = session.get("messages", []) or []
|
||||
red_msgs = [
|
||||
redaction.redact_message(
|
||||
m, content_mode=content_mode, include_content=include_content
|
||||
)
|
||||
for m in msgs
|
||||
]
|
||||
# Session-level metadata is structural; keep ids/model/counts, drop
|
||||
# any free-text title only when content is excluded.
|
||||
out = {
|
||||
"_kind": "session",
|
||||
"id": session.get("id"),
|
||||
"source": session.get("source"),
|
||||
"model": session.get("model"),
|
||||
"started_at": session.get("started_at"),
|
||||
"ended_at": session.get("ended_at"),
|
||||
"message_count": session.get("message_count"),
|
||||
"tool_call_count": session.get("tool_call_count"),
|
||||
"messages": red_msgs,
|
||||
}
|
||||
if include_content and session.get("title"):
|
||||
out["title"] = redaction.redact_for_export(
|
||||
session["title"], content_mode=content_mode
|
||||
)
|
||||
yield out
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def export(
|
||||
out: TextIO,
|
||||
*,
|
||||
fmt: str = "ndjson",
|
||||
since_ns: Optional[int] = None,
|
||||
include_content: bool = False,
|
||||
config: Optional[Dict[str, Any]] = None,
|
||||
db_path: Optional[Path] = None,
|
||||
) -> Dict[str, int]:
|
||||
"""Write telemetry (+ optional content) to ``out``. Returns counts.
|
||||
|
||||
``include_content`` is honored only when the trajectories plane is enabled in
|
||||
``config``; otherwise content is forced off and only structural data is written.
|
||||
"""
|
||||
# Trajectories gate: a flag cannot override the consent plane.
|
||||
content_allowed = include_content and redaction.content_export_enabled(config)
|
||||
counts = {"telemetry": 0, "sessions": 0, "content_included": int(content_allowed)}
|
||||
|
||||
conn = _open(db_path)
|
||||
records: List[Dict[str, Any]] = []
|
||||
try:
|
||||
for rec in _iter_telemetry(conn, since_ns):
|
||||
counts["telemetry"] += 1
|
||||
if fmt == "ndjson":
|
||||
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
||||
else:
|
||||
records.append(rec)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# Content/session domain (separate connection via SessionDB).
|
||||
for rec in _iter_content(db_path, config=config, include_content=content_allowed):
|
||||
counts["sessions"] += 1
|
||||
if fmt == "ndjson":
|
||||
out.write(json.dumps(rec, ensure_ascii=False) + "\n")
|
||||
else:
|
||||
records.append(rec)
|
||||
|
||||
if fmt != "ndjson":
|
||||
json.dump({"records": records}, out, ensure_ascii=False, indent=2)
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
__all__ = ["export"]
|
||||
219
agent/telemetry/metrics.py
Normal file
219
agent/telemetry/metrics.py
Normal file
@@ -0,0 +1,219 @@
|
||||
"""Derive metric rollups from the local telemetry tables.
|
||||
|
||||
Reads the ``tel_*`` tables in state.db and returns aggregates for /usage, /insights,
|
||||
and local dashboards. Metrics are computed by querying the event log rather than being
|
||||
emitted on the hot path.
|
||||
|
||||
Each function accepts either an open caller-owned ``conn`` (reused, not closed) or a
|
||||
``db_path`` (opened and closed internally). InsightsEngine passes its existing
|
||||
connection; a standalone dashboard passes a path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, List, Optional
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _cursor(
|
||||
conn: Optional[sqlite3.Connection], db_path: Optional[Path]
|
||||
) -> Iterator[sqlite3.Connection]:
|
||||
"""Yield a Row-factory connection. Closes it only if we opened it."""
|
||||
if conn is not None:
|
||||
prev_factory = conn.row_factory
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
yield conn
|
||||
finally:
|
||||
conn.row_factory = prev_factory
|
||||
return
|
||||
if db_path is None:
|
||||
from hermes_constants import get_hermes_home
|
||||
db_path = get_hermes_home() / "state.db"
|
||||
c = sqlite3.connect(str(db_path), timeout=5.0)
|
||||
c.row_factory = sqlite3.Row
|
||||
try:
|
||||
yield c
|
||||
finally:
|
||||
c.close()
|
||||
|
||||
|
||||
def _since_clause(since_ns: Optional[int], col: str = "start_ns") -> str:
|
||||
return f" WHERE {col} >= {int(since_ns)}" if since_ns else ""
|
||||
|
||||
|
||||
def workflow_summary(
|
||||
db_path: Optional[Path] = None,
|
||||
since_ns: Optional[int] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Run-level counters + duration percentiles (local plane, exact)."""
|
||||
with _cursor(conn, db_path) as c:
|
||||
where = _since_clause(since_ns)
|
||||
total = c.execute(f"SELECT COUNT(*) n FROM tel_runs{where}").fetchone()["n"]
|
||||
by_reason = {
|
||||
r["end_reason"] or "unknown": r["n"]
|
||||
for r in c.execute(
|
||||
f"SELECT end_reason, COUNT(*) n FROM tel_runs{where} GROUP BY end_reason"
|
||||
).fetchall()
|
||||
}
|
||||
by_entry = {
|
||||
r["entrypoint"] or "unknown": r["n"]
|
||||
for r in c.execute(
|
||||
f"SELECT entrypoint, COUNT(*) n FROM tel_runs{where} GROUP BY entrypoint"
|
||||
).fetchall()
|
||||
}
|
||||
dur_where = (where + " AND end_ns IS NOT NULL") if where else " WHERE end_ns IS NOT NULL"
|
||||
durations = [
|
||||
(r["end_ns"] - r["start_ns"]) / 1e6
|
||||
for r in c.execute(
|
||||
f"SELECT start_ns, end_ns FROM tel_runs{dur_where}"
|
||||
).fetchall()
|
||||
]
|
||||
return {
|
||||
"total_runs": total,
|
||||
"by_end_reason": by_reason,
|
||||
"by_entrypoint": by_entry,
|
||||
"duration_ms_p50": _pct(durations, 50),
|
||||
"duration_ms_p95": _pct(durations, 95),
|
||||
"success_rate": round(by_reason.get("completed", 0) / total, 4) if total else 0.0,
|
||||
}
|
||||
|
||||
|
||||
def model_call_summary(
|
||||
db_path: Optional[Path] = None,
|
||||
since_ns: Optional[int] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> Dict[str, Any]:
|
||||
with _cursor(conn, db_path) as c:
|
||||
rows = c.execute(
|
||||
"SELECT provider, model, COUNT(*) n, "
|
||||
"SUM(input_tokens) inp, SUM(output_tokens) outp, "
|
||||
"SUM(cache_read_tokens) cache, AVG(latency_ms) avg_latency "
|
||||
"FROM tel_model_calls GROUP BY provider, model"
|
||||
).fetchall()
|
||||
by_provider: Dict[str, int] = {}
|
||||
by_model: Dict[str, int] = {}
|
||||
tokens = {"input": 0, "output": 0, "cache_read": 0}
|
||||
breakdown: List[Dict[str, Any]] = []
|
||||
for r in rows:
|
||||
prov = r["provider"] or "unknown"
|
||||
mdl = r["model"] or "unknown"
|
||||
by_provider[prov] = by_provider.get(prov, 0) + r["n"]
|
||||
by_model[mdl] = by_model.get(mdl, 0) + r["n"]
|
||||
tokens["input"] += r["inp"] or 0
|
||||
tokens["output"] += r["outp"] or 0
|
||||
tokens["cache_read"] += r["cache"] or 0
|
||||
breakdown.append({
|
||||
"provider": r["provider"],
|
||||
"model": r["model"],
|
||||
"calls": r["n"],
|
||||
"avg_latency_ms": round(r["avg_latency"] or 0, 1),
|
||||
})
|
||||
cache_total = tokens["cache_read"] + tokens["input"]
|
||||
return {
|
||||
"by_provider": by_provider,
|
||||
"by_model": by_model,
|
||||
"tokens": tokens,
|
||||
"cache_hit_rate": round(tokens["cache_read"] / cache_total, 4) if cache_total else 0.0,
|
||||
"breakdown": breakdown,
|
||||
}
|
||||
|
||||
|
||||
def tool_call_summary(
|
||||
db_path: Optional[Path] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> Dict[str, Any]:
|
||||
with _cursor(conn, db_path) as c:
|
||||
by_tool = {
|
||||
r["tool_name"] or "unknown": r["n"]
|
||||
for r in c.execute(
|
||||
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls GROUP BY tool_name"
|
||||
).fetchall()
|
||||
}
|
||||
fails = {
|
||||
r["tool_name"] or "unknown": r["n"]
|
||||
for r in c.execute(
|
||||
"SELECT tool_name, COUNT(*) n FROM tel_tool_calls "
|
||||
"WHERE result_class IN ('error','timeout','blocked') GROUP BY tool_name"
|
||||
).fetchall()
|
||||
}
|
||||
total = sum(by_tool.values())
|
||||
total_fail = sum(fails.values())
|
||||
return {
|
||||
"by_tool": by_tool,
|
||||
"failures_by_tool": fails,
|
||||
"total": total,
|
||||
"failure_rate": round(total_fail / total, 4) if total else 0.0,
|
||||
}
|
||||
|
||||
|
||||
def error_summary(
|
||||
db_path: Optional[Path] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> Dict[str, Any]:
|
||||
with _cursor(conn, db_path) as c:
|
||||
return {
|
||||
"by_class": {
|
||||
r["error_class"] or "unknown": r["n"]
|
||||
for r in c.execute(
|
||||
"SELECT error_class, COUNT(*) n FROM tel_error_events GROUP BY error_class"
|
||||
).fetchall()
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _pct(values: List[float], p: int) -> float:
|
||||
if not values:
|
||||
return 0.0
|
||||
s = sorted(values)
|
||||
k = (len(s) - 1) * (p / 100)
|
||||
lo = int(k)
|
||||
hi = min(lo + 1, len(s) - 1)
|
||||
frac = k - lo
|
||||
return round(s[lo] + (s[hi] - s[lo]) * frac, 2)
|
||||
|
||||
|
||||
def overview(
|
||||
db_path: Optional[Path] = None,
|
||||
since_ns: Optional[int] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""One call for a dashboard: all the rollups."""
|
||||
return {
|
||||
"workflows": workflow_summary(db_path, since_ns, conn=conn),
|
||||
"model_calls": model_call_summary(db_path, since_ns, conn=conn),
|
||||
"tool_calls": tool_call_summary(db_path, conn=conn),
|
||||
"errors": error_summary(db_path, conn=conn),
|
||||
}
|
||||
|
||||
|
||||
def has_data(
|
||||
db_path: Optional[Path] = None,
|
||||
*,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
) -> bool:
|
||||
"""True when any telemetry runs exist (cheap guard for /insights rendering)."""
|
||||
try:
|
||||
with _cursor(conn, db_path) as c:
|
||||
return c.execute("SELECT 1 FROM tel_runs LIMIT 1").fetchone() is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
__all__ = [
|
||||
"workflow_summary",
|
||||
"model_call_summary",
|
||||
"tool_call_summary",
|
||||
"error_summary",
|
||||
"overview",
|
||||
"has_data",
|
||||
]
|
||||
282
agent/telemetry/otlp_exporter.py
Normal file
282
agent/telemetry/otlp_exporter.py
Normal file
@@ -0,0 +1,282 @@
|
||||
"""Export telemetry to an OpenTelemetry Collector over OTLP/HTTP.
|
||||
|
||||
Maps telemetry events (which carry trace_id/run_id/span_id/parent_span_id) to OTel
|
||||
spans and sends them to the endpoint configured under ``telemetry.export.otlp``. Lets
|
||||
an operator stream Hermes telemetry into their own observability stack.
|
||||
|
||||
Notes:
|
||||
* The destination is operator-configured; this module only sends to that endpoint.
|
||||
It does not import or interact with any aggregate-metrics path.
|
||||
* ``opentelemetry-sdk`` + ``opentelemetry-exporter-otlp-proto-http`` are an optional
|
||||
extra (``pip install hermes-agent[otlp]``), imported lazily so the dependency is
|
||||
only required when OTLP export is actually used.
|
||||
* ``headers_env`` maps a header name to an environment variable name; values are read
|
||||
from the environment at export time and never logged or stored.
|
||||
* The continuous subscriber runs in the emitter's writer thread after durable writes
|
||||
and is fail-isolated, so an export error cannot affect a run.
|
||||
|
||||
Spans carry structural telemetry by default. Message content is included only when the
|
||||
trajectories plane is enabled, and always passes through the export redaction pipeline.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OTLPUnavailable(RuntimeError):
|
||||
"""Raised when the optional OpenTelemetry SDK isn't installed."""
|
||||
|
||||
|
||||
def _require_sdk(*, auto_install: bool = True, prompt: bool = True):
|
||||
"""Import the OTel SDK, lazily installing it on first use if needed.
|
||||
|
||||
Routes through tools.lazy_deps (feature 'export.otlp') so a missing SDK
|
||||
triggers the standard venv install flow — same as every other optional
|
||||
backend — gated by security.allow_lazy_installs and TTY-prompted. Falls back
|
||||
to OTLPUnavailable (with a manual install hint) when the SDK can't be made
|
||||
importable (lazy installs disabled, install failed, or auto_install=False).
|
||||
|
||||
``auto_install``: attempt the lazy install when missing (default True).
|
||||
``prompt``: ask before installing when interactive (default True); pass
|
||||
False from non-interactive contexts like the continuous streamer.
|
||||
"""
|
||||
if auto_install:
|
||||
try:
|
||||
from tools.lazy_deps import ensure as _lazy_ensure
|
||||
_lazy_ensure("export.otlp", prompt=prompt)
|
||||
except ImportError:
|
||||
pass # lazy_deps unavailable — fall through to the import attempt
|
||||
except Exception:
|
||||
# FeatureUnavailable (lazy installs disabled / declined / failed) —
|
||||
# fall through; the import below raises OTLPUnavailable with the hint.
|
||||
pass
|
||||
try:
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import BatchSpanProcessor
|
||||
from opentelemetry.sdk.resources import Resource
|
||||
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
|
||||
OTLPSpanExporter,
|
||||
)
|
||||
from opentelemetry.trace import SpanKind
|
||||
return {
|
||||
"TracerProvider": TracerProvider,
|
||||
"BatchSpanProcessor": BatchSpanProcessor,
|
||||
"Resource": Resource,
|
||||
"OTLPSpanExporter": OTLPSpanExporter,
|
||||
"SpanKind": SpanKind,
|
||||
}
|
||||
except Exception as e: # ImportError or partial install
|
||||
raise OTLPUnavailable(
|
||||
"OTLP export requires the optional dependency. Install with:\n"
|
||||
" pip install 'hermes-agent[otlp]'\n"
|
||||
f"(import error: {e})"
|
||||
)
|
||||
|
||||
|
||||
def _resolve_headers(headers_env: Optional[Dict[str, str]]) -> Dict[str, str]:
|
||||
"""Resolve {header_name: ENV_VAR_NAME} -> {header_name: value} from env.
|
||||
|
||||
The config stores environment variable names, not secret values; values are read
|
||||
from the environment here. Missing variables are skipped (and noted at debug level
|
||||
without the value).
|
||||
"""
|
||||
resolved: Dict[str, str] = {}
|
||||
for header_name, env_name in (headers_env or {}).items():
|
||||
val = os.environ.get(str(env_name))
|
||||
if val:
|
||||
resolved[str(header_name)] = val
|
||||
else:
|
||||
logger.debug("OTLP header %s: env var %s not set; skipping",
|
||||
header_name, env_name)
|
||||
return resolved
|
||||
|
||||
|
||||
def _otlp_config(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
tel = (config or {}).get("telemetry") or {}
|
||||
export = tel.get("export") or {}
|
||||
return export.get("otlp") or {}
|
||||
|
||||
|
||||
def build_exporter(config: Dict[str, Any]):
|
||||
"""Construct an OTLP span exporter from config. Raises OTLPUnavailable if no SDK."""
|
||||
sdk = _require_sdk()
|
||||
otlp = _otlp_config(config)
|
||||
endpoint = otlp.get("endpoint")
|
||||
if not endpoint:
|
||||
raise ValueError("telemetry.export.otlp.endpoint is not set")
|
||||
headers = _resolve_headers(otlp.get("headers_env"))
|
||||
return sdk["OTLPSpanExporter"](endpoint=endpoint, headers=headers or None)
|
||||
|
||||
|
||||
def _make_provider(config: Dict[str, Any]):
|
||||
sdk = _require_sdk()
|
||||
resource = sdk["Resource"].create({
|
||||
"service.name": "hermes-agent",
|
||||
"telemetry.plane": "local", # never aggregate
|
||||
})
|
||||
provider = sdk["TracerProvider"](resource=resource)
|
||||
processor = sdk["BatchSpanProcessor"](build_exporter(config))
|
||||
provider.add_span_processor(processor)
|
||||
return provider, processor
|
||||
|
||||
|
||||
# ── event -> span attribute mapping (real values) ───────────────────────────
|
||||
def _span_attrs(ev: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Span attributes for an event — the real recorded values (local plane)."""
|
||||
kind = ev.get("event")
|
||||
attrs: Dict[str, Any] = {"hermes.event": kind or "unknown"}
|
||||
keep_by_kind = {
|
||||
"run": ("entrypoint", "platform", "end_reason",
|
||||
"model_call_count", "tool_call_count", "error_count",
|
||||
"estimated_cost_usd", "cost_status"),
|
||||
"model_call": ("provider", "model", "base_url",
|
||||
"input_tokens", "output_tokens", "cache_read_tokens",
|
||||
"cache_write_tokens", "reasoning_tokens", "latency_ms",
|
||||
"ttft_ms", "end_reason"),
|
||||
"tool_call": ("tool_name", "backend", "duration_ms", "result_class"),
|
||||
"error": ("error_class", "subsystem", "recovery"),
|
||||
}
|
||||
for col in keep_by_kind.get(kind, ()): # type: ignore[arg-type]
|
||||
v = ev.get(col)
|
||||
if v is not None:
|
||||
attrs[f"hermes.{col}"] = v
|
||||
return attrs
|
||||
|
||||
|
||||
def export_batch(provider, batch: List[Dict[str, Any]]) -> int:
|
||||
"""Map a batch of events to OTel spans. Returns spans created."""
|
||||
tracer = provider.get_tracer("hermes.telemetry")
|
||||
n = 0
|
||||
for ev in batch:
|
||||
try:
|
||||
name = f"hermes.{ev.get('event', 'event')}"
|
||||
span = tracer.start_span(name, attributes=_span_attrs(ev))
|
||||
span.end()
|
||||
n += 1
|
||||
except Exception:
|
||||
logger.debug("OTLP span map failed", exc_info=True)
|
||||
return n
|
||||
|
||||
|
||||
# ── one-shot drain (export current local rows) ──────────────────────────────
|
||||
def export_once(
|
||||
config: Dict[str, Any],
|
||||
*,
|
||||
db_path: Optional[Path] = None,
|
||||
since_ns: Optional[int] = None,
|
||||
) -> int:
|
||||
"""Drain the local tel_* tables to the configured OTLP endpoint once."""
|
||||
provider, processor = _make_provider(config)
|
||||
try:
|
||||
rows = _read_events(db_path, since_ns)
|
||||
total = export_batch(provider, rows)
|
||||
processor.force_flush()
|
||||
return total
|
||||
finally:
|
||||
try:
|
||||
provider.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _read_events(db_path: Optional[Path], since_ns: Optional[int]) -> List[Dict[str, Any]]:
|
||||
if db_path is None:
|
||||
from hermes_constants import get_hermes_home
|
||||
db_path = get_hermes_home() / "state.db"
|
||||
c = sqlite3.connect(str(db_path), timeout=5.0)
|
||||
c.row_factory = sqlite3.Row
|
||||
out: List[Dict[str, Any]] = []
|
||||
try:
|
||||
table_event = {
|
||||
"tel_runs": "run", "tel_model_calls": "model_call",
|
||||
"tel_tool_calls": "tool_call", "tel_error_events": "error",
|
||||
}
|
||||
for table, evkind in table_event.items():
|
||||
where = ""
|
||||
if table == "tel_runs" and since_ns:
|
||||
where = f" WHERE start_ns >= {int(since_ns)}"
|
||||
for r in c.execute(f"SELECT * FROM {table}{where}").fetchall():
|
||||
d = dict(r)
|
||||
d["event"] = evkind
|
||||
out.append(d)
|
||||
finally:
|
||||
c.close()
|
||||
return out
|
||||
|
||||
|
||||
# ── continuous streaming subscriber ─────────────────────────────────────────
|
||||
class OTLPStreamer:
|
||||
"""A live subscriber that pushes each emitter batch to OTLP as it lands.
|
||||
|
||||
Register with ``emitter.subscribe(streamer)``. Fail-isolated by the emitter.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self._provider, self._processor = _make_provider(config)
|
||||
self.exported = 0
|
||||
|
||||
def __call__(self, batch: List[Dict[str, Any]]) -> None:
|
||||
self.exported += export_batch(self._provider, batch)
|
||||
|
||||
def shutdown(self) -> None:
|
||||
try:
|
||||
self._processor.force_flush()
|
||||
self._provider.shutdown()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def is_available() -> bool:
|
||||
"""True when the OTel SDK is already importable. Does NOT auto-install —
|
||||
this is a pure check (e.g. for status display)."""
|
||||
try:
|
||||
_require_sdk(auto_install=False)
|
||||
return True
|
||||
except OTLPUnavailable:
|
||||
return False
|
||||
|
||||
|
||||
def is_enabled(config: Dict[str, Any]) -> bool:
|
||||
otlp = _otlp_config(config)
|
||||
return bool(otlp.get("enabled") and otlp.get("endpoint"))
|
||||
|
||||
|
||||
def start_streaming(config: Dict[str, Any]) -> Optional[OTLPStreamer]:
|
||||
"""If OTLP is enabled, attach a streamer to the singleton emitter.
|
||||
|
||||
Non-interactive context (startup): attempts a lazy install with prompt=False
|
||||
so a configured-but-missing SDK is installed once (gated by
|
||||
security.allow_lazy_installs), then streams. If it still can't load, logs and
|
||||
no-ops — never blocks or raises into startup.
|
||||
"""
|
||||
if not is_enabled(config):
|
||||
return None
|
||||
try:
|
||||
_require_sdk(prompt=False)
|
||||
except OTLPUnavailable:
|
||||
logger.warning("telemetry.export.otlp.enabled but the OTel SDK could not "
|
||||
"be installed/imported; install 'hermes-agent[otlp]'")
|
||||
return None
|
||||
from agent.telemetry.emitter import get_emitter
|
||||
streamer = OTLPStreamer(config)
|
||||
get_emitter().subscribe(streamer)
|
||||
return streamer
|
||||
|
||||
|
||||
__all__ = [
|
||||
"OTLPUnavailable",
|
||||
"OTLPStreamer",
|
||||
"build_exporter",
|
||||
"export_once",
|
||||
"export_batch",
|
||||
"is_available",
|
||||
"is_enabled",
|
||||
"start_streaming",
|
||||
]
|
||||
107
agent/telemetry/policy.py
Normal file
107
agent/telemetry/policy.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Telemetry consent posture and the aggregate-plane gate.
|
||||
|
||||
Consent is a single field, ``telemetry.consent_state``:
|
||||
|
||||
* "unknown" — no choice recorded; never uploads (the default).
|
||||
* "local" — declined the aggregate plane; local plane only.
|
||||
* "aggregate" — opted in to the aggregate plane.
|
||||
|
||||
The config file is the source of truth: set ``telemetry.consent_state`` with
|
||||
``hermes config set`` (or a managed-scope pin). There is no separate boolean mirror —
|
||||
a single field cannot drift out of sync with itself, so a stray value can't
|
||||
accidentally imply consent.
|
||||
|
||||
``allow_aggregate`` is the hard gate. An administrator pins
|
||||
``telemetry.allow_aggregate: false`` through the managed-scope layer
|
||||
(``/etc/hermes/config.yaml``), which takes precedence over the user's config; when it
|
||||
is false, the aggregate plane is off regardless of ``consent_state``.
|
||||
|
||||
This module makes the decisions; it performs no I/O and contains no uploader. A future
|
||||
uploader must call :func:`may_upload_aggregate` at its boundary.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict
|
||||
|
||||
CONSENT_UNKNOWN = "unknown"
|
||||
CONSENT_LOCAL = "local"
|
||||
CONSENT_AGGREGATE = "aggregate"
|
||||
_VALID_STATES = {CONSENT_UNKNOWN, CONSENT_LOCAL, CONSENT_AGGREGATE}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TelemetryDecision:
|
||||
"""The resolved telemetry posture for the current process."""
|
||||
local_enabled: bool
|
||||
aggregate_enabled: bool
|
||||
consent_state: str
|
||||
install_id: str
|
||||
allow_aggregate: bool
|
||||
|
||||
def may_upload_aggregate(self) -> bool:
|
||||
"""The single gate the uploader must consult before any network send."""
|
||||
return self.allow_aggregate and self.consent_state == CONSENT_AGGREGATE
|
||||
|
||||
|
||||
def _telemetry_cfg(config: Dict[str, Any]) -> Dict[str, Any]:
|
||||
cfg = config.get("telemetry") if isinstance(config, dict) else None
|
||||
return cfg if isinstance(cfg, dict) else {}
|
||||
|
||||
|
||||
def ensure_install_id(config: Dict[str, Any]) -> str:
|
||||
"""Return a stable install id, minting one if the config slot is empty.
|
||||
|
||||
Does not persist — the caller writes the returned value back to config.yaml. A
|
||||
fresh uuid4 is used; clearing ``telemetry.install_id`` (e.g. with
|
||||
``hermes config set telemetry.install_id ""``) causes the next call to mint anew.
|
||||
"""
|
||||
tel = _telemetry_cfg(config)
|
||||
existing = tel.get("install_id")
|
||||
if isinstance(existing, str) and existing.strip():
|
||||
return existing
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def resolve(config: Dict[str, Any]) -> TelemetryDecision:
|
||||
"""Resolve the effective telemetry posture from config.
|
||||
|
||||
``consent_state`` is the single source of truth for the aggregate opt-in.
|
||||
``allow_aggregate`` (admin-pinnable via managed scope) hard-disables the aggregate
|
||||
plane regardless of consent.
|
||||
"""
|
||||
tel = _telemetry_cfg(config)
|
||||
|
||||
local_enabled = bool(tel.get("local", True))
|
||||
allow_aggregate = bool(tel.get("allow_aggregate", True))
|
||||
state = tel.get("consent_state", CONSENT_UNKNOWN)
|
||||
if state not in _VALID_STATES:
|
||||
state = CONSENT_UNKNOWN
|
||||
|
||||
aggregate_enabled = allow_aggregate and state == CONSENT_AGGREGATE
|
||||
|
||||
return TelemetryDecision(
|
||||
local_enabled=local_enabled,
|
||||
aggregate_enabled=aggregate_enabled,
|
||||
consent_state=state,
|
||||
install_id=ensure_install_id(config),
|
||||
allow_aggregate=allow_aggregate,
|
||||
)
|
||||
|
||||
|
||||
def may_upload_aggregate(config: Dict[str, Any]) -> bool:
|
||||
"""Convenience gate for the uploader boundary."""
|
||||
return resolve(config).may_upload_aggregate()
|
||||
|
||||
|
||||
__all__ = [
|
||||
"CONSENT_UNKNOWN",
|
||||
"CONSENT_LOCAL",
|
||||
"CONSENT_AGGREGATE",
|
||||
"TelemetryDecision",
|
||||
"resolve",
|
||||
"may_upload_aggregate",
|
||||
"ensure_install_id",
|
||||
]
|
||||
187
agent/telemetry/redaction.py
Normal file
187
agent/telemetry/redaction.py
Normal file
@@ -0,0 +1,187 @@
|
||||
"""Redaction applied to telemetry data on export.
|
||||
|
||||
Two independent controls:
|
||||
|
||||
* Secrets are always redacted, on every export and in every mode; no setting
|
||||
disables this. Wraps ``agent/redact.py::redact_sensitive_text(force=True)``.
|
||||
|
||||
* Whether message bodies, reasoning, and raw tool arguments are exportable at all is
|
||||
governed by the trajectories plane (``telemetry.trajectories.enabled``, default
|
||||
off, admin-pinnable), not by a redaction mode. With trajectories off, content is
|
||||
dropped. With it on, content is exportable and ``content_redaction`` (none|pii)
|
||||
controls how much is scrubbed; secrets are still always stripped.
|
||||
|
||||
This applies to the local and trajectory export paths. It is unrelated to any
|
||||
aggregate-metrics path.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# Content-redaction strengths for any content that IS exported.
|
||||
CONTENT_NONE = "none" # drop content entirely (structural telemetry only)
|
||||
CONTENT_PII = "pii" # codec-aware PII redaction on exported content
|
||||
CONTENT_MODES = {CONTENT_NONE, CONTENT_PII}
|
||||
|
||||
# ── PII patterns (applied only in CONTENT_PII mode, on content that is exported) ──
|
||||
_EMAIL_RE = re.compile(r"[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}")
|
||||
# E.164-ish and common separators; conservative to avoid nuking code/IDs.
|
||||
_PHONE_RE = re.compile(
|
||||
r"(?<!\w)(?:\+?\d{1,3}[\s.\-]?)?(?:\(\d{2,4}\)[\s.\-]?)?\d{3}[\s.\-]?\d{3,4}(?:[\s.\-]?\d{2,4})?(?!\w)"
|
||||
)
|
||||
# Long opaque hex/uuid-ish user identifiers.
|
||||
_UUID_RE = re.compile(r"\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b")
|
||||
|
||||
|
||||
def _secret_redact(text: Optional[str]) -> Optional[str]:
|
||||
"""Always-on secret redaction. force=True so user config can't disable it."""
|
||||
if text is None:
|
||||
return None
|
||||
try:
|
||||
from agent.redact import redact_sensitive_text
|
||||
return redact_sensitive_text(str(text), force=True)
|
||||
except Exception:
|
||||
# Fail CLOSED: if the redactor can't run, do not emit the raw string.
|
||||
return "[redaction-unavailable]"
|
||||
|
||||
|
||||
def _pii_redact(text: str) -> str:
|
||||
text = _EMAIL_RE.sub("[email]", text)
|
||||
text = _UUID_RE.sub("[id]", text)
|
||||
text = _PHONE_RE.sub("[phone]", text)
|
||||
return text
|
||||
|
||||
|
||||
def redact_for_export(
|
||||
text: Optional[str],
|
||||
*,
|
||||
content_mode: str = CONTENT_NONE,
|
||||
) -> Optional[str]:
|
||||
"""Redact a single content string for export.
|
||||
|
||||
Secrets are ALWAYS stripped. Then PII is stripped when content_mode is 'pii'.
|
||||
Callers gate *whether content is exported at all* via the trajectories plane
|
||||
(see ``content_export_enabled``); this function only scrubs content that the
|
||||
caller has already decided to export.
|
||||
"""
|
||||
redacted = _secret_redact(text)
|
||||
if redacted is None:
|
||||
return None
|
||||
if content_mode == CONTENT_PII:
|
||||
redacted = _pii_redact(redacted)
|
||||
return redacted
|
||||
|
||||
|
||||
def content_export_enabled(config: Optional[Dict[str, Any]]) -> bool:
|
||||
"""True only when the trajectories plane is explicitly enabled.
|
||||
|
||||
This is the consent gate for exporting message bodies / reasoning / raw tool
|
||||
args. Default off. Admin-pinnable via managed scope (telemetry.trajectories.enabled).
|
||||
"""
|
||||
try:
|
||||
tel = (config or {}).get("telemetry") or {}
|
||||
traj = tel.get("trajectories") or {}
|
||||
return bool(traj.get("enabled", False))
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def content_mode_for(config: Optional[Dict[str, Any]]) -> str:
|
||||
try:
|
||||
tel = (config or {}).get("telemetry") or {}
|
||||
mode = tel.get("content_redaction", CONTENT_NONE)
|
||||
return mode if mode in CONTENT_MODES else CONTENT_NONE
|
||||
except Exception:
|
||||
return CONTENT_NONE
|
||||
|
||||
|
||||
# ── Codec-aware message redaction (NeMo pattern) ─────────────────────────────
|
||||
# Redact the right fields of a provider message shape rather than regex-blasting
|
||||
# the whole blob. Structure (roles, names, counts) is preserved; only the
|
||||
# free-text content fields are scrubbed.
|
||||
|
||||
def redact_message(
|
||||
msg: Dict[str, Any],
|
||||
*,
|
||||
content_mode: str = CONTENT_NONE,
|
||||
include_content: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Redact one chat message dict for export.
|
||||
|
||||
When include_content is False (trajectories off), content/reasoning/tool-arg
|
||||
fields are dropped — only structural fields (role, tool name, counts) remain.
|
||||
When True, those fields are kept but passed through redact_for_export.
|
||||
"""
|
||||
role = msg.get("role")
|
||||
out: Dict[str, Any] = {"role": role}
|
||||
|
||||
# Always-structural fields.
|
||||
if msg.get("tool_name") is not None:
|
||||
out["tool_name"] = msg.get("tool_name")
|
||||
if msg.get("name") is not None:
|
||||
out["name"] = msg.get("name")
|
||||
|
||||
if not include_content:
|
||||
# Structural only: record presence/size, not bytes.
|
||||
c = msg.get("content")
|
||||
if c is not None:
|
||||
out["content_chars"] = len(str(c))
|
||||
if msg.get("reasoning_content"):
|
||||
out["reasoning_chars"] = len(str(msg["reasoning_content"]))
|
||||
if msg.get("tool_calls"):
|
||||
out["tool_call_count"] = _count_tool_calls(msg["tool_calls"])
|
||||
return out
|
||||
|
||||
# Content included (trajectories enabled): scrub then keep.
|
||||
if msg.get("content") is not None:
|
||||
out["content"] = redact_for_export(msg["content"], content_mode=content_mode)
|
||||
if msg.get("reasoning_content"):
|
||||
out["reasoning_content"] = redact_for_export(
|
||||
msg["reasoning_content"], content_mode=content_mode
|
||||
)
|
||||
if msg.get("tool_calls"):
|
||||
out["tool_calls"] = _redact_tool_calls(msg["tool_calls"], content_mode=content_mode)
|
||||
return out
|
||||
|
||||
|
||||
def _count_tool_calls(tool_calls: Any) -> int:
|
||||
try:
|
||||
import json
|
||||
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
|
||||
return len(tc) if isinstance(tc, list) else (1 if tc else 0)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def _redact_tool_calls(tool_calls: Any, *, content_mode: str) -> Any:
|
||||
"""Redact raw tool-call arguments (free text) while keeping function names."""
|
||||
import json
|
||||
try:
|
||||
tc = json.loads(tool_calls) if isinstance(tool_calls, str) else tool_calls
|
||||
except Exception:
|
||||
return "[unparseable-tool-calls]"
|
||||
if not isinstance(tc, list):
|
||||
return []
|
||||
out: List[Dict[str, Any]] = []
|
||||
for call in tc:
|
||||
if not isinstance(call, dict):
|
||||
continue
|
||||
fn = (call.get("function") or {}) if isinstance(call.get("function"), dict) else {}
|
||||
name = fn.get("name") or call.get("name")
|
||||
args = fn.get("arguments")
|
||||
red_args = redact_for_export(args, content_mode=content_mode) if args is not None else None
|
||||
out.append({"name": name, "arguments": red_args})
|
||||
return out
|
||||
|
||||
|
||||
__all__ = [
|
||||
"CONTENT_NONE",
|
||||
"CONTENT_PII",
|
||||
"CONTENT_MODES",
|
||||
"redact_for_export",
|
||||
"content_export_enabled",
|
||||
"content_mode_for",
|
||||
"redact_message",
|
||||
]
|
||||
145
agent/telemetry/rollup.py
Normal file
145
agent/telemetry/rollup.py
Normal file
@@ -0,0 +1,145 @@
|
||||
"""Build per-run summary events from the local telemetry tables.
|
||||
|
||||
Reads the ``tel_*`` tables and projects each completed run into a summary dict holding
|
||||
the recorded values: provider, models used, tool names, token totals, duration, and
|
||||
cost. Powers ``hermes telemetry preview``. No aggregation or bucketing is applied here.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import platform
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
def _os_family() -> str:
|
||||
s = platform.system().lower()
|
||||
if s.startswith("lin"):
|
||||
return "linux"
|
||||
if s == "darwin":
|
||||
return "macos"
|
||||
if s.startswith("win"):
|
||||
return "windows"
|
||||
return "other"
|
||||
|
||||
|
||||
def _hermes_version() -> str:
|
||||
try:
|
||||
from hermes_cli import __version__
|
||||
return str(__version__)
|
||||
except Exception:
|
||||
return "0.0.0"
|
||||
|
||||
|
||||
def _open(db_path: Optional[Path], conn: Optional[sqlite3.Connection]):
|
||||
if conn is not None:
|
||||
prev = conn.row_factory
|
||||
conn.row_factory = sqlite3.Row
|
||||
return conn, prev, False
|
||||
if db_path is None:
|
||||
from hermes_constants import get_hermes_home
|
||||
db_path = get_hermes_home() / "state.db"
|
||||
c = sqlite3.connect(str(db_path), timeout=5.0)
|
||||
c.row_factory = sqlite3.Row
|
||||
return c, None, True
|
||||
|
||||
|
||||
def _run_events(c: sqlite3.Connection, since_ns: Optional[int]) -> List[Dict[str, Any]]:
|
||||
"""Project completed runs into per-run summary dicts."""
|
||||
where = " WHERE end_ns IS NOT NULL"
|
||||
if since_ns:
|
||||
where += f" AND start_ns >= {int(since_ns)}"
|
||||
rows = c.execute(
|
||||
"SELECT run_id, entrypoint, platform, end_reason, start_ns, end_ns, "
|
||||
"model_call_count, tool_call_count, error_count, estimated_cost_usd "
|
||||
"FROM tel_runs" + where
|
||||
).fetchall()
|
||||
|
||||
events: List[Dict[str, Any]] = []
|
||||
for r in rows:
|
||||
# Models actually used in this run (real ids), with token totals.
|
||||
models = [
|
||||
{"provider": m["provider"], "model": m["model"],
|
||||
"calls": m["n"], "input_tokens": int(m["inp"] or 0),
|
||||
"output_tokens": int(m["outp"] or 0)}
|
||||
for m in c.execute(
|
||||
"SELECT provider, model, COUNT(*) n, SUM(input_tokens) inp, "
|
||||
"SUM(output_tokens) outp FROM tel_model_calls WHERE run_id = ? "
|
||||
"GROUP BY provider, model ORDER BY n DESC",
|
||||
(r["run_id"],),
|
||||
).fetchall()
|
||||
]
|
||||
tools = [
|
||||
row["tool_name"]
|
||||
for row in c.execute(
|
||||
"SELECT DISTINCT tool_name FROM tel_tool_calls WHERE run_id = ?",
|
||||
(r["run_id"],),
|
||||
).fetchall()
|
||||
if row["tool_name"]
|
||||
]
|
||||
trow = c.execute(
|
||||
"SELECT SUM(input_tokens) inp, SUM(output_tokens) outp "
|
||||
"FROM tel_model_calls WHERE run_id = ?",
|
||||
(r["run_id"],),
|
||||
).fetchone()
|
||||
duration_ms = (r["end_ns"] - r["start_ns"]) / 1e6 if r["end_ns"] else None
|
||||
events.append({
|
||||
"event_name": "workflow_completed",
|
||||
"run_id": r["run_id"],
|
||||
"entrypoint": r["entrypoint"] or "cli",
|
||||
"platform": r["platform"],
|
||||
"end_reason": r["end_reason"] or "completed",
|
||||
"models_used": models,
|
||||
"tools_used": tools,
|
||||
"model_call_count": r["model_call_count"] or 0,
|
||||
"tool_call_count": r["tool_call_count"] or 0,
|
||||
"error_count": r["error_count"] or 0,
|
||||
"duration_ms": round(duration_ms, 1) if duration_ms is not None else None,
|
||||
"input_tokens": int((trow["inp"] if trow else 0) or 0),
|
||||
"output_tokens": int((trow["outp"] if trow else 0) or 0),
|
||||
"estimated_cost_usd": r["estimated_cost_usd"],
|
||||
})
|
||||
return events
|
||||
|
||||
|
||||
def build_aggregate_events(
|
||||
*,
|
||||
install_id: str,
|
||||
db_path: Optional[Path] = None,
|
||||
since_ns: Optional[int] = None,
|
||||
conn: Optional[sqlite3.Connection] = None,
|
||||
include_heartbeat: bool = True,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Return per-run summary events plus an optional heartbeat."""
|
||||
c, prev_factory, owned = _open(db_path, conn)
|
||||
try:
|
||||
events = _run_events(c, since_ns)
|
||||
if include_heartbeat:
|
||||
events.append({
|
||||
"event_name": "heartbeat",
|
||||
"install_id": install_id,
|
||||
"hermes_version": _hermes_version(),
|
||||
"os_family": _os_family(),
|
||||
"entrypoint": "cli",
|
||||
})
|
||||
return events
|
||||
finally:
|
||||
if owned:
|
||||
c.close()
|
||||
elif prev_factory is not None:
|
||||
c.row_factory = prev_factory
|
||||
|
||||
|
||||
def summarize(events: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||||
"""Counts by event_name + field coverage, for status/preview output."""
|
||||
by_name: Dict[str, int] = {}
|
||||
fields = set()
|
||||
for e in events:
|
||||
name = e.get("event_name", "?")
|
||||
by_name[name] = by_name.get(name, 0) + 1
|
||||
fields.update(e.keys())
|
||||
return {"total": len(events), "by_event_name": by_name, "fields_present": sorted(fields)}
|
||||
|
||||
|
||||
__all__ = ["build_aggregate_events", "summarize"]
|
||||
83
agent/telemetry/spans.py
Normal file
83
agent/telemetry/spans.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Trace / run / span id propagation via contextvars.
|
||||
|
||||
Telemetry events share IDs so a workflow can be reconstructed: one ``trace_id`` per
|
||||
workflow, one ``run_id`` per top-level execution, ``span_id`` per timed operation, and
|
||||
``parent_span_id`` for nesting. These live in contextvars so async tool calls and
|
||||
spawned subagents inherit the lineage automatically.
|
||||
|
||||
Provides helpers to start/clear a run context and mint child span ids. The telemetry
|
||||
plugin sets the run context on session start and reads it in each hook callback.
|
||||
Nothing here writes to storage — it only carries ids.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextvars
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
_trace_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
|
||||
"hermes_tel_trace_id", default=None
|
||||
)
|
||||
_run_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
|
||||
"hermes_tel_run_id", default=None
|
||||
)
|
||||
_parent_span_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar(
|
||||
"hermes_tel_parent_span_id", default=None
|
||||
)
|
||||
|
||||
|
||||
def new_id() -> str:
|
||||
return uuid.uuid4().hex
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RunContext:
|
||||
trace_id: str
|
||||
run_id: str
|
||||
|
||||
|
||||
def start_run(trace_id: Optional[str] = None, run_id: Optional[str] = None) -> RunContext:
|
||||
"""Begin a run context, minting ids when not supplied. Sets contextvars."""
|
||||
tid = trace_id or new_id()
|
||||
rid = run_id or new_id()
|
||||
_trace_id.set(tid)
|
||||
_run_id.set(rid)
|
||||
_parent_span_id.set(None)
|
||||
return RunContext(trace_id=tid, run_id=rid)
|
||||
|
||||
|
||||
def current_trace_id() -> Optional[str]:
|
||||
return _trace_id.get()
|
||||
|
||||
|
||||
def current_run_id() -> Optional[str]:
|
||||
return _run_id.get()
|
||||
|
||||
|
||||
def current_parent_span_id() -> Optional[str]:
|
||||
return _parent_span_id.get()
|
||||
|
||||
|
||||
def new_span_id() -> str:
|
||||
"""Mint a span id (does not alter the parent pointer)."""
|
||||
return new_id()
|
||||
|
||||
|
||||
def clear_run() -> None:
|
||||
_trace_id.set(None)
|
||||
_run_id.set(None)
|
||||
_parent_span_id.set(None)
|
||||
|
||||
|
||||
__all__ = [
|
||||
"RunContext",
|
||||
"new_id",
|
||||
"start_run",
|
||||
"current_trace_id",
|
||||
"current_run_id",
|
||||
"current_parent_span_id",
|
||||
"new_span_id",
|
||||
"clear_run",
|
||||
]
|
||||
@@ -1159,6 +1159,49 @@ display:
|
||||
# # Routing/delivery still uses the original values internally.
|
||||
# redact_pii: false
|
||||
|
||||
# =============================================================================
|
||||
# Telemetry & Observability
|
||||
# =============================================================================
|
||||
# Three planes with a hard wall between them:
|
||||
# local — full-fidelity observability you own. Default ON.
|
||||
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
|
||||
# trajectories — content trajectories for training. Separate consent (later).
|
||||
#
|
||||
# The local plane records real values (actual models, providers, tool names) —
|
||||
# your own data, on your machine. The aggregate plane is opt-in and has no
|
||||
# uploader today; if one ships it would summarize at that egress boundary.
|
||||
#
|
||||
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
|
||||
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
|
||||
# value. To hard-forbid egress on a locked-down deployment, pin
|
||||
# `telemetry.allow_aggregate: false` there.
|
||||
# telemetry:
|
||||
# # Local plane: event log + SQLite index in state.db. Never leaves your
|
||||
# # machine unless you export it or opt into the aggregate plane.
|
||||
# local: true
|
||||
# # Hard gate for the aggregate plane. When false, the aggregate plane is off
|
||||
# # regardless of consent_state. Pin false via managed scope to forbid egress.
|
||||
# allow_aggregate: true
|
||||
# # Aggregate-plane consent (the opt-in). No uploader ships yet.
|
||||
# # unknown (no choice — never uploads) | local (declined) | aggregate (opted in)
|
||||
# consent_state: unknown
|
||||
# # Stable install id (aggregate plane only). Empty = mint on first use;
|
||||
# # clear it to rotate.
|
||||
# install_id: ""
|
||||
# # Local event-log retention before rotation (days).
|
||||
# retention_days: 90
|
||||
# # Keep secret redaction on even at full local capture.
|
||||
# redact_secrets: true
|
||||
# # Content redaction for exports / support bundles: none | pii.
|
||||
# content_redaction: none
|
||||
# # Exporters. The OTLP exporter sends spans to a configured Collector endpoint;
|
||||
# # header values reference environment variable names, not inline secrets.
|
||||
# export:
|
||||
# otlp:
|
||||
# enabled: false
|
||||
# endpoint: null
|
||||
# headers_env: {} # e.g. {Authorization: MY_OTLP_TOKEN_ENVVAR}
|
||||
|
||||
# =============================================================================
|
||||
# Shell-script hooks
|
||||
# =============================================================================
|
||||
|
||||
@@ -307,6 +307,13 @@ nested agent work or security lifecycle events.
|
||||
|
||||
## Existing Consumers
|
||||
|
||||
The bundled **`telemetry`** plugin is the built-in local-first telemetry plane: it
|
||||
records runs, model calls, and tool calls to a local event log + `state.db`, powers
|
||||
`hermes insights`, and supports export to your own file or OpenTelemetry Collector. It
|
||||
is on by default and never sends data to Nous unless you opt in. See
|
||||
[`telemetry.md`](./telemetry.md) for the full feature, the `hermes telemetry` commands,
|
||||
and enterprise export.
|
||||
|
||||
The bundled Langfuse plugin demonstrates direct hook-based observability for
|
||||
turns, provider requests, and tool calls.
|
||||
|
||||
|
||||
209
docs/observability/telemetry.md
Normal file
209
docs/observability/telemetry.md
Normal file
@@ -0,0 +1,209 @@
|
||||
# Telemetry & Observability
|
||||
|
||||
Hermes ships with a built-in, local-first telemetry system. It records what your
|
||||
agent does — workflows, model calls, tool calls, errors — to your own machine, powers
|
||||
`/insights`, and (when *you* enable it) exports everything to your own observability
|
||||
stack. It is private by default and never sends your data to Nous unless you explicitly
|
||||
opt in to anonymous aggregate metrics.
|
||||
|
||||
This page explains the whole feature: the three planes, what's captured, the
|
||||
`hermes telemetry` commands, and how an enterprise streams or exports all of its data
|
||||
to its own infrastructure.
|
||||
|
||||
> Looking for the **plugin hook contract** (how to write your own observer plugin)?
|
||||
> That's in [`README.md`](./README.md). This page is about the built-in telemetry
|
||||
> plane and its CLI.
|
||||
|
||||
## The three planes
|
||||
|
||||
Telemetry is organized into three planes with a hard wall between them:
|
||||
|
||||
| Plane | What it holds | Default | Destination |
|
||||
| --- | --- | --- | --- |
|
||||
| **local** | Full-fidelity observability — runs, model/tool calls (real model & provider names), durations, errors | **on** | your machine only |
|
||||
| **aggregate** | Opt-in metadata to Nous (no uploader ships yet) | **off** (opt-in) | Nous, only if you enable it |
|
||||
| **trajectories** | Full message content / reasoning / raw tool args | **off** (opt-in) | your own export destinations only |
|
||||
|
||||
The local plane is the one you'll use day to day — it records the real values that
|
||||
happened (actual model ids, providers, tool names). The aggregate plane is the only
|
||||
thing that could ever leave for Nous; it is opt-in, default-off, and has no uploader
|
||||
today. The trajectories plane unlocks full-content export to *your own* destinations —
|
||||
it is never wired to Nous.
|
||||
|
||||
## Local plane — always-on observability
|
||||
|
||||
The local plane is implemented as a bundled `telemetry` plugin that listens to Hermes
|
||||
lifecycle hooks (model calls, tool calls, session start/finalize) and writes events to:
|
||||
|
||||
- an append-only JSONL log at `~/.hermes/telemetry/events.jsonl` (the source of truth)
|
||||
- indexed `tel_*` tables in `state.db` (for fast queries and rollups)
|
||||
|
||||
Writes are fire-and-forget on a background thread: telemetry can never block, slow, or
|
||||
fail a model call or tool call. If the local plane is disabled (`telemetry.local: false`)
|
||||
the plugin does not load at all.
|
||||
|
||||
### Seeing your local data
|
||||
|
||||
```bash
|
||||
hermes insights # usage report — now includes an "Observability" section
|
||||
hermes telemetry status # planes, consent, export posture, local data volume
|
||||
```
|
||||
|
||||
The `insights` Observability section shows workflow counts and success rate, duration
|
||||
p50/p95, tool failure rates by category, provider/model-class mix, and cache hit rate —
|
||||
all computed locally with exact values.
|
||||
|
||||
## `hermes telemetry` commands
|
||||
|
||||
```text
|
||||
hermes telemetry status Show planes, consent state, export posture, local volume
|
||||
hermes telemetry preview Show the aggregate events that would be produced (local)
|
||||
hermes telemetry export Export local telemetry to a file/stream or OTLP endpoint
|
||||
```
|
||||
|
||||
Consent and the install id are plain config, not separate verbs — set them with
|
||||
`hermes config set` (or a managed-scope pin):
|
||||
|
||||
```bash
|
||||
hermes config set telemetry.consent_state aggregate # opt in to the aggregate plane
|
||||
hermes config set telemetry.consent_state local # opt out (local plane stays on)
|
||||
hermes config set telemetry.install_id "" # reset the install id (mints a new one)
|
||||
```
|
||||
|
||||
### Aggregate plane (opt-in)
|
||||
|
||||
The aggregate plane is **off by default** and has **no uploader today** — nothing is
|
||||
sent to Nous. Consent lives in `telemetry.consent_state` (`unknown` / `local` /
|
||||
`aggregate`); setting it to `aggregate` records the opt-in for if/when an uploader ships.
|
||||
If one is built, it would summarize at that egress boundary.
|
||||
|
||||
`hermes telemetry preview` shows your recent runs as they'd be summarized — computed and
|
||||
shown **locally only**, with the real model and tool names from your own telemetry. It's
|
||||
a local inspection surface, not an upload.
|
||||
|
||||
## Enterprise: getting all of your data
|
||||
|
||||
Everything below sends data to **your own** destination — a file, your SIEM, or your own
|
||||
OpenTelemetry Collector. None of it goes to Nous.
|
||||
|
||||
### Bulk export to a file
|
||||
|
||||
```bash
|
||||
# Structural telemetry only (default — no message content)
|
||||
hermes telemetry export --out telemetry.ndjson
|
||||
|
||||
# JSON instead of NDJSON, last 7 days only
|
||||
hermes telemetry export --out dump.json --format json --since 7
|
||||
```
|
||||
|
||||
By default the export is **structural** — runs, model/tool-call metadata, session shells
|
||||
with message *counts* but no message bodies.
|
||||
|
||||
### Including content (trajectories plane)
|
||||
|
||||
To export full message content, enable the trajectories plane. This is a deliberate,
|
||||
separate consent — it's how an enterprise opts into exporting work-product content to its
|
||||
own store:
|
||||
|
||||
```yaml
|
||||
# config.yaml
|
||||
telemetry:
|
||||
trajectories:
|
||||
enabled: true # unlocks content export to YOUR destination
|
||||
content_redaction: pii # "none" | "pii"
|
||||
```
|
||||
|
||||
```bash
|
||||
hermes telemetry export --out full.ndjson --include-content
|
||||
```
|
||||
|
||||
`--include-content` is a no-op unless the trajectories plane is enabled — the consent
|
||||
plane governs, not the flag.
|
||||
|
||||
### Live streaming to your OpenTelemetry Collector / SIEM (OTLP)
|
||||
|
||||
Hermes can stream telemetry to your own OTLP endpoint. This requires the optional `otlp`
|
||||
extra:
|
||||
|
||||
```bash
|
||||
pip install 'hermes-agent[otlp]'
|
||||
```
|
||||
|
||||
```yaml
|
||||
# config.yaml
|
||||
telemetry:
|
||||
export:
|
||||
otlp:
|
||||
enabled: true
|
||||
endpoint: "https://collector.your-corp.internal:4318/v1/traces"
|
||||
headers_env: # secrets by reference — env var NAMES, not values
|
||||
Authorization: CORP_OTLP_TOKEN
|
||||
```
|
||||
|
||||
```bash
|
||||
export CORP_OTLP_TOKEN="..." # the actual token lives in the environment
|
||||
hermes telemetry export --otlp # drain current telemetry to your collector
|
||||
```
|
||||
|
||||
Span attributes are structural by default. For authentication, the config holds the
|
||||
*name* of an environment variable rather than the secret itself; the value is read at
|
||||
export time and is never written to config or logged.
|
||||
|
||||
## Redaction
|
||||
|
||||
Two independent controls govern what content looks like on export:
|
||||
|
||||
| Control | Values | Effect |
|
||||
| --- | --- | --- |
|
||||
| Secret redaction | always on | API keys, tokens, auth headers, connection strings are **always** stripped on every export path. Cannot be disabled. |
|
||||
| `content_redaction` | `none` \| `pii` | When content is exported, `pii` additionally redacts emails, phone numbers, and id-shaped strings. |
|
||||
|
||||
Secret redaction is always on — even at full content fidelity — because a SIEM or
|
||||
warehouse full of live credentials is a bigger attack target than the data it holds. It
|
||||
fails closed: if the redactor can't run, the raw string is not emitted.
|
||||
|
||||
## Configuration reference
|
||||
|
||||
```yaml
|
||||
telemetry:
|
||||
local: true # local plane (default on)
|
||||
allow_aggregate: true # hard gate; pin false to forbid the aggregate plane entirely
|
||||
consent_state: unknown # aggregate opt-in: unknown | local | aggregate
|
||||
install_id: "" # stable anon id; "" mints one; clear to rotate
|
||||
retention_days: 90 # local event-log retention
|
||||
redact_secrets: true # always-on secret redaction (kept on by design)
|
||||
content_redaction: none # none | pii
|
||||
trajectories:
|
||||
enabled: false # unlocks full-content export to your destination
|
||||
export:
|
||||
otlp:
|
||||
enabled: false
|
||||
endpoint: null
|
||||
headers_env: {} # {HeaderName: ENV_VAR_NAME}
|
||||
```
|
||||
|
||||
### Enterprise policy via managed scope
|
||||
|
||||
Any `telemetry.*` key can be pinned by an administrator through Hermes' managed-scope
|
||||
layer (`/etc/hermes/config.yaml`), which wins over the user's value on a per-key basis.
|
||||
There is no telemetry-specific policy block — to lock down a fleet, pin the keys you care
|
||||
about. Common examples:
|
||||
|
||||
- `telemetry.allow_aggregate: false` — the aggregate plane stays off even if
|
||||
`consent_state` is set to `aggregate`.
|
||||
- `telemetry.export.otlp.endpoint` — point every install at the corporate collector.
|
||||
- `telemetry.trajectories.enabled` — centrally decide whether content export is allowed.
|
||||
|
||||
When a key is managed, attempts to change it are rejected by managed scope with a message
|
||||
naming the source. `hermes telemetry status` shows the current export posture (endpoint
|
||||
host, whether the auth env var is set, content gate, redaction modes) — it never prints
|
||||
secret values.
|
||||
|
||||
## Privacy summary
|
||||
|
||||
- Local telemetry never leaves your machine.
|
||||
- The aggregate plane (the only thing that could go to Nous) is opt-in, default-off,
|
||||
and has no uploader today — nothing is sent.
|
||||
- All export surfaces (file, OTLP) point at *your* destinations.
|
||||
- Secrets are always redacted on export; content export is off until you enable the
|
||||
trajectories plane; PII redaction is a knob.
|
||||
@@ -1803,7 +1803,61 @@ DEFAULT_CONFIG = {
|
||||
"privacy": {
|
||||
"redact_pii": False, # When True, hash user IDs and strip phone numbers from LLM context
|
||||
},
|
||||
|
||||
|
||||
# Telemetry & observability. Three planes with a hard wall between them:
|
||||
# local — full-fidelity local observability the user owns (real
|
||||
# models, providers, tool names). Default ON.
|
||||
# aggregate — opt-in metadata to Nous (no uploader ships yet). Default OFF.
|
||||
# trajectories — content trajectories for training. Separate consent (not here yet).
|
||||
#
|
||||
# Enterprise locking: any telemetry.* key can be pinned by an administrator via
|
||||
# the managed-scope layer (/etc/hermes/config.yaml), which wins over the user's
|
||||
# value on a per-key basis. There is no telemetry-specific policy block — pin
|
||||
# e.g. `telemetry.allow_aggregate: false` in managed scope to hard-forbid egress.
|
||||
"telemetry": {
|
||||
# Local plane: event log + SQLite index in state.db. The user's own data;
|
||||
# never leaves the machine unless they export it or opt into aggregate.
|
||||
"local": True,
|
||||
# Hard gate for the aggregate plane. When False, the aggregate plane is off
|
||||
# regardless of consent_state. Intended to be pinned False by an administrator
|
||||
# via managed scope on locked-down or air-gapped deployments. Default True
|
||||
# (consent_state still governs the opt-in).
|
||||
"allow_aggregate": True,
|
||||
# Aggregate-plane consent — the single source of truth for the opt-in:
|
||||
# "unknown" (no choice made — never uploads), "local" (declined),
|
||||
# "aggregate" (opted in). Set with `hermes config set telemetry.consent_state`
|
||||
# or a managed-scope pin. Non-interactive installs sit at "unknown".
|
||||
"consent_state": "unknown",
|
||||
# Stable install identifier (aggregate plane only). Empty string means "mint a
|
||||
# fresh UUID on first use"; clear it to rotate. Never sent on the local plane.
|
||||
"install_id": "",
|
||||
# Local event-log retention before rotation (days). Local plane only.
|
||||
"retention_days": 90,
|
||||
# Keep secret redaction on even at full local capture (a SIEM full of live
|
||||
# credentials is a bigger attack target). Admin may override via managed scope.
|
||||
"redact_secrets": True,
|
||||
# Content redaction for exports / support bundles: "none" | "pii".
|
||||
"content_redaction": "none",
|
||||
# Trajectories plane: full message content / reasoning / raw tool args.
|
||||
# Off by default. When enabled, full content becomes exportable to the
|
||||
# configured destination — always secret-redacted, and PII-redacted per
|
||||
# content_redaction. This is the consent gate for content export and is
|
||||
# admin-pinnable via managed scope. It does not enable any upload.
|
||||
"trajectories": {
|
||||
"enabled": False,
|
||||
},
|
||||
# Exporters. The OTLP exporter sends to a configured Collector endpoint;
|
||||
# endpoint headers reference environment variable names rather than inline
|
||||
# secrets.
|
||||
"export": {
|
||||
"otlp": {
|
||||
"enabled": False,
|
||||
"endpoint": None,
|
||||
"headers_env": {}, # {"Authorization": "MY_OTLP_TOKEN_ENVVAR"}
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
# Text-to-speech configuration
|
||||
# Each provider supports an optional `max_text_length:` override for the
|
||||
# per-request input-character cap. Omit it to use the provider's documented
|
||||
|
||||
@@ -295,6 +295,7 @@ from hermes_cli.subcommands.memory import build_memory_parser
|
||||
from hermes_cli.subcommands.acp import build_acp_parser
|
||||
from hermes_cli.subcommands.tools import build_tools_parser
|
||||
from hermes_cli.subcommands.insights import build_insights_parser
|
||||
from hermes_cli.subcommands.telemetry import build_telemetry_parser
|
||||
from hermes_cli.subcommands.skills import build_skills_parser
|
||||
from hermes_cli.subcommands.pairing import build_pairing_parser
|
||||
from hermes_cli.subcommands.plugins import build_plugins_parser
|
||||
@@ -11531,7 +11532,7 @@ _BUILTIN_SUBCOMMANDS = frozenset(
|
||||
"model", "pairing", "plugins", "portal", "postinstall", "profile", "proxy",
|
||||
"prompt-size",
|
||||
"send", "sessions", "setup",
|
||||
"skills", "slack", "status", "tools", "uninstall", "update",
|
||||
"skills", "slack", "status", "telemetry", "tools", "uninstall", "update",
|
||||
"version", "webhook", "whatsapp", "whatsapp-cloud", "chat", "secrets", "security",
|
||||
# Help-ish invocations — plugin commands not being listed in
|
||||
# top-level --help is an acceptable trade-off for skipping an
|
||||
@@ -11963,6 +11964,186 @@ def cmd_insights(args):
|
||||
print(f"Error generating insights: {e}")
|
||||
|
||||
|
||||
def cmd_telemetry(args):
|
||||
"""Local-only telemetry control + inspection. No uploader exists yet."""
|
||||
import json as _json
|
||||
import time
|
||||
|
||||
from hermes_cli.config import load_config, save_config
|
||||
from agent.telemetry import policy, rollup, metrics
|
||||
|
||||
action = getattr(args, "telemetry_action", None) or "status"
|
||||
config = load_config()
|
||||
decision = policy.resolve(config)
|
||||
|
||||
def _persist_install_id():
|
||||
# Make sure a minted id is written back so it stays stable.
|
||||
config.setdefault("telemetry", {})["install_id"] = decision.install_id
|
||||
save_config(config)
|
||||
|
||||
if action == "status":
|
||||
print("Telemetry status")
|
||||
print("─" * 56)
|
||||
print(f" Local plane: {'on' if decision.local_enabled else 'off'} "
|
||||
f"(telemetry.local)")
|
||||
print(f" Aggregate plane: {'on' if decision.aggregate_enabled else 'off'} "
|
||||
f"(opt-in; consent_state={decision.consent_state})")
|
||||
if decision.consent_state != policy.CONSENT_AGGREGATE and decision.allow_aggregate:
|
||||
print(" opt in: hermes config set telemetry.consent_state aggregate")
|
||||
if not decision.allow_aggregate:
|
||||
print(" ⚠ allow_aggregate is false (egress hard-disabled)")
|
||||
print(f" Install id: {decision.install_id}")
|
||||
print(" Upload: DISABLED — no server yet. Aggregate is computed "
|
||||
"locally only.")
|
||||
print()
|
||||
# Export posture — where YOUR data can flow (never Nous). Values only;
|
||||
# lock-state is managed scope's concern, surfaced by its own tooling.
|
||||
try:
|
||||
from agent.telemetry import otlp_exporter, redaction
|
||||
otlp = otlp_exporter._otlp_config(config)
|
||||
print(" Export")
|
||||
if otlp.get("enabled") and otlp.get("endpoint"):
|
||||
_host = str(otlp.get("endpoint"))
|
||||
print(f" OTLP: enabled → {_host}")
|
||||
# Show the header name + env var + whether it's set, never the value.
|
||||
hdrs = otlp.get("headers_env") or {}
|
||||
if hdrs:
|
||||
import os as _os
|
||||
for _h, _env in hdrs.items():
|
||||
_state = "set" if _os.environ.get(str(_env)) else "NOT set"
|
||||
print(f" auth: header '{_h}' ← ${_env} ({_state})")
|
||||
_sdk = "installed" if otlp_exporter.is_available() else "MISSING — pip install 'hermes-agent[otlp]'"
|
||||
print(f" SDK: {_sdk}")
|
||||
else:
|
||||
print(" OTLP: disabled (telemetry.export.otlp.enabled)")
|
||||
# Content gate (trajectories plane) + redaction posture.
|
||||
if redaction.content_export_enabled(config):
|
||||
print(f" Content export: on (trajectories plane) — message "
|
||||
f"content exportable")
|
||||
else:
|
||||
print(" Content export: off (trajectories plane) — structural "
|
||||
"telemetry only")
|
||||
print(" Secret redaction: on (always)")
|
||||
print(f" PII redaction: {redaction.content_mode_for(config)}")
|
||||
print(" Bulk export: hermes telemetry export --out FILE [--otlp]")
|
||||
except Exception:
|
||||
pass
|
||||
print()
|
||||
# Local data volume
|
||||
try:
|
||||
ov = metrics.overview()
|
||||
runs = ov["workflows"]["total_runs"]
|
||||
mcs = sum(ov["model_calls"]["by_provider"].values())
|
||||
tcs = ov["tool_calls"]["total"]
|
||||
print(f" Local data: {runs:,} workflows · {mcs:,} model calls · {tcs:,} tool calls")
|
||||
except Exception:
|
||||
print(" Local data: (none yet)")
|
||||
print("\n Inspect what would be shared: hermes telemetry preview")
|
||||
return
|
||||
|
||||
if action == "preview":
|
||||
_persist_install_id()
|
||||
since_ns = int((time.time() - args.days * 86400) * 1e9)
|
||||
events = rollup.build_aggregate_events(
|
||||
install_id=decision.install_id, since_ns=since_ns
|
||||
)
|
||||
summary = rollup.summarize(events)
|
||||
print("Telemetry preview — computed locally, NOT uploaded")
|
||||
print("─" * 56)
|
||||
print(f" Window: last {args.days} days Events: {summary['total']}")
|
||||
for name, n in sorted(summary["by_event_name"].items()):
|
||||
print(f" {name}: {n}")
|
||||
print(" Shows the actual models, tools, and counts from local telemetry.")
|
||||
print()
|
||||
shown = events[: args.limit]
|
||||
if getattr(args, "json", False):
|
||||
print(_json.dumps(shown, indent=2))
|
||||
else:
|
||||
for e in shown:
|
||||
if e.get("event_name") == "heartbeat":
|
||||
print(f" • heartbeat hermes={e.get('hermes_version')} os={e.get('os_family')}")
|
||||
continue
|
||||
bits = [f"{e.get('event_name')}"]
|
||||
for k in ("entrypoint", "platform", "end_reason", "duration_ms",
|
||||
"estimated_cost_usd"):
|
||||
if e.get(k) is not None:
|
||||
bits.append(f"{k}={e[k]}")
|
||||
models = e.get("models_used") or []
|
||||
if models:
|
||||
bits.append("models=" + ",".join(
|
||||
f"{m.get('model') or m.get('provider') or '?'}" for m in models))
|
||||
if e.get("tools_used"):
|
||||
bits.append("tools=" + ",".join(e["tools_used"]))
|
||||
print(" • " + " ".join(bits))
|
||||
if len(events) > len(shown):
|
||||
print(f" ... and {len(events) - len(shown)} more (use --limit)")
|
||||
return
|
||||
|
||||
if action == "export":
|
||||
import sys as _sys
|
||||
from agent.telemetry import exporter_bulk, redaction
|
||||
|
||||
since_ns = int((time.time() - args.since * 86400) * 1e9) if getattr(args, "since", 0) else None
|
||||
want_content = getattr(args, "include_content", False)
|
||||
|
||||
# OTLP path: stream spans to the configured Collector endpoint.
|
||||
if getattr(args, "otlp", False):
|
||||
from agent.telemetry import otlp_exporter
|
||||
if not otlp_exporter.is_enabled(config):
|
||||
print("telemetry.export.otlp is not enabled/endpoint not set. "
|
||||
"Set telemetry.export.otlp.enabled + .endpoint.", file=_sys.stderr)
|
||||
return
|
||||
# The OTel SDK is an optional dep; export_once() lazily installs it on
|
||||
# first use (gated by security.allow_lazy_installs, TTY-prompted),
|
||||
# same as every other optional backend. OTLPUnavailable is the
|
||||
# fallback when it can't be installed.
|
||||
try:
|
||||
n = otlp_exporter.export_once(config, since_ns=since_ns)
|
||||
except otlp_exporter.OTLPUnavailable as e:
|
||||
print(str(e), file=_sys.stderr)
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"OTLP export failed: {e}", file=_sys.stderr)
|
||||
return
|
||||
print(f"Exported {n} spans to the OTLP endpoint "
|
||||
f"({(otlp_exporter._otlp_config(config) or {}).get('endpoint')}).",
|
||||
file=_sys.stderr)
|
||||
return
|
||||
|
||||
content_ok = redaction.content_export_enabled(config)
|
||||
if want_content and not content_ok:
|
||||
print("⚠ --include-content ignored: telemetry.trajectories.enabled is false.")
|
||||
print(" Enable the trajectories plane (admin/config) to export message content.")
|
||||
print(" Exporting structural telemetry only.")
|
||||
|
||||
if not getattr(args, "out", None):
|
||||
print("--out is required (or use --otlp).", file=_sys.stderr)
|
||||
return
|
||||
out_path = args.out
|
||||
if out_path == "-":
|
||||
counts = exporter_bulk.export(
|
||||
_sys.stdout, fmt=args.fmt, since_ns=since_ns,
|
||||
include_content=want_content, config=config,
|
||||
)
|
||||
else:
|
||||
with open(out_path, "w", encoding="utf-8") as fh:
|
||||
counts = exporter_bulk.export(
|
||||
fh, fmt=args.fmt, since_ns=since_ns,
|
||||
include_content=want_content, config=config,
|
||||
)
|
||||
# status to stderr so stdout stays clean for `--out -`
|
||||
msg = (f"Exported {counts['telemetry']} telemetry records"
|
||||
+ (f" + {counts['sessions']} sessions" if counts["sessions"] else "")
|
||||
+ (" (content INCLUDED, redacted)" if counts["content_included"]
|
||||
else " (structural only)")
|
||||
+ (f" -> {out_path}" if out_path != "-" else ""))
|
||||
print(msg, file=_sys.stderr)
|
||||
return
|
||||
|
||||
print(f"Unknown telemetry action: {action}")
|
||||
print("Use: status | preview | export")
|
||||
|
||||
|
||||
def cmd_skills(args):
|
||||
# Route 'config' action to skills_config module
|
||||
if getattr(args, "skills_action", None) == "config":
|
||||
@@ -12979,6 +13160,9 @@ def main():
|
||||
# =========================================================================
|
||||
build_insights_parser(subparsers, cmd_insights=cmd_insights)
|
||||
|
||||
# telemetry command (parser in hermes_cli/subcommands/telemetry.py)
|
||||
build_telemetry_parser(subparsers, cmd_telemetry=cmd_telemetry)
|
||||
|
||||
# =========================================================================
|
||||
# claw command (parser built in hermes_cli/subcommands/claw.py)
|
||||
# =========================================================================
|
||||
|
||||
@@ -204,6 +204,23 @@ def _env_enabled(name: str) -> bool:
|
||||
return env_var_enabled(name)
|
||||
|
||||
|
||||
def _telemetry_local_enabled() -> bool:
|
||||
"""True when the local telemetry plane is enabled (telemetry.local, default true).
|
||||
|
||||
Gates auto-loading of the bundled ``telemetry`` plugin. Reads config defensively
|
||||
so a malformed/missing telemetry section defaults to on (the design default).
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
config = load_config()
|
||||
tel = cfg_get(config, "telemetry", default={})
|
||||
if not isinstance(tel, dict):
|
||||
return True
|
||||
return bool(tel.get("local", True))
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def _get_disabled_plugins() -> set:
|
||||
"""Read the disabled plugins list from config.yaml.
|
||||
|
||||
@@ -1326,6 +1343,24 @@ class PluginManager:
|
||||
self._load_plugin(manifest)
|
||||
continue
|
||||
|
||||
# The bundled telemetry plugin auto-loads when the local plane is
|
||||
# enabled (telemetry.local, default true). It's observational
|
||||
# (lifecycle hooks -> local event log), writes nothing to the
|
||||
# network, and powers /usage and /insights, so it should be on out
|
||||
# of the box without an opt-in. A user who sets telemetry.local:
|
||||
# false opts out and it is skipped like any disabled plugin.
|
||||
if (
|
||||
manifest.source == "bundled"
|
||||
and (manifest.key or manifest.name) == "telemetry"
|
||||
):
|
||||
if _telemetry_local_enabled():
|
||||
self._load_plugin(manifest)
|
||||
else:
|
||||
loaded = LoadedPlugin(manifest=manifest, enabled=False)
|
||||
loaded.error = "disabled (telemetry.local is false)"
|
||||
self._plugins[lookup_key] = loaded
|
||||
continue
|
||||
|
||||
# Everything else (standalone, user-installed backends,
|
||||
# entry-point plugins) is opt-in via plugins.enabled.
|
||||
# Accept both the path-derived key and the legacy bare name
|
||||
|
||||
54
hermes_cli/subcommands/telemetry.py
Normal file
54
hermes_cli/subcommands/telemetry.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""``hermes telemetry`` subcommand parser.
|
||||
|
||||
Telemetry control and inspection. ``preview`` shows the per-run summary events that
|
||||
would be produced for the aggregate plane; there is no uploader, so it terminates as a
|
||||
local view.
|
||||
|
||||
The handler is injected to avoid importing ``main`` (mirrors the insights subcommand).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Callable
|
||||
|
||||
|
||||
def build_telemetry_parser(subparsers, *, cmd_telemetry: Callable) -> None:
|
||||
"""Attach the ``telemetry`` subcommand (with actions) to ``subparsers``."""
|
||||
p = subparsers.add_parser(
|
||||
"telemetry",
|
||||
help="Inspect local telemetry and export it",
|
||||
description=(
|
||||
"Local-first telemetry. The local plane records observability on this "
|
||||
"machine. The aggregate plane is opt-in (set telemetry.consent_state via "
|
||||
"`hermes config set`); it has no uploader and is shown only via `preview`."
|
||||
),
|
||||
)
|
||||
sub = p.add_subparsers(dest="telemetry_action")
|
||||
|
||||
sub.add_parser("status", help="Show telemetry planes, consent state, and local data volume")
|
||||
|
||||
prev = sub.add_parser(
|
||||
"preview",
|
||||
help="Show the aggregate events that would be produced (computed locally, not uploaded)",
|
||||
)
|
||||
prev.add_argument("--days", type=int, default=30, help="Window to roll up (default: 30)")
|
||||
prev.add_argument("--limit", type=int, default=10, help="Max events to print (default: 10)")
|
||||
prev.add_argument("--json", action="store_true", help="Print raw JSON events")
|
||||
|
||||
exp = sub.add_parser(
|
||||
"export",
|
||||
help="Export local telemetry (and optional content) to a file, stream, or OTLP endpoint",
|
||||
)
|
||||
exp.add_argument("--out", help="Output file path (use - for stdout). Not needed with --otlp.")
|
||||
exp.add_argument("--format", dest="fmt", choices=["ndjson", "json"], default="ndjson",
|
||||
help="Output format (default: ndjson)")
|
||||
exp.add_argument("--since", type=int, default=0,
|
||||
help="Only telemetry from the last N days (0 = all)")
|
||||
exp.add_argument("--include-content", action="store_true",
|
||||
help="Include session/message content (requires telemetry.trajectories.enabled). "
|
||||
"Secrets always redacted; PII per telemetry.content_redaction.")
|
||||
exp.add_argument("--otlp", action="store_true",
|
||||
help="Export to the configured OTLP endpoint (telemetry.export.otlp.*) "
|
||||
"instead of a file. Requires the optional 'otlp' extra.")
|
||||
|
||||
p.set_defaults(func=cmd_telemetry)
|
||||
110
hermes_state.py
110
hermes_state.py
@@ -116,7 +116,7 @@ T = TypeVar("T")
|
||||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 16
|
||||
SCHEMA_VERSION = 17 # v17: tel_* telemetry tables (local-plane observability, raw values)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WAL-compatibility fallback
|
||||
@@ -597,6 +597,114 @@ CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
CREATE INDEX IF NOT EXISTS idx_compression_locks_expires ON compression_locks(expires_at);
|
||||
|
||||
-- ── Telemetry (local plane: local observability) ───────────────────────────
|
||||
-- Event/span layer co-located in state.db so runs JOIN to sessions/messages.
|
||||
-- The append-only JSONL log at ~/.hermes/telemetry/events.jsonl is the source
|
||||
-- of truth; these tables are a rebuildable index. They hold the user's own data
|
||||
-- (real model ids, provider/tool names) and never leave the machine unless the
|
||||
-- user exports them.
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
trace_id TEXT NOT NULL,
|
||||
session_id TEXT,
|
||||
profile_id TEXT,
|
||||
entrypoint TEXT NOT NULL,
|
||||
platform TEXT,
|
||||
start_ns INTEGER NOT NULL,
|
||||
end_ns INTEGER,
|
||||
end_reason TEXT,
|
||||
model_call_count INTEGER DEFAULT 0,
|
||||
tool_call_count INTEGER DEFAULT 0,
|
||||
error_count INTEGER DEFAULT 0,
|
||||
estimated_cost_usd REAL,
|
||||
cost_status TEXT,
|
||||
schema_v INTEGER NOT NULL DEFAULT 1
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_spans (
|
||||
span_id TEXT PRIMARY KEY,
|
||||
trace_id TEXT NOT NULL,
|
||||
run_id TEXT NOT NULL,
|
||||
parent_span_id TEXT,
|
||||
name TEXT NOT NULL,
|
||||
kind TEXT,
|
||||
start_ns INTEGER NOT NULL,
|
||||
end_ns INTEGER,
|
||||
status TEXT,
|
||||
attrs_json TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_model_calls (
|
||||
span_id TEXT PRIMARY KEY,
|
||||
run_id TEXT NOT NULL,
|
||||
provider TEXT, -- raw provider, e.g. "anthropic"
|
||||
model TEXT, -- raw model id, e.g. "claude-opus-4"
|
||||
base_url TEXT,
|
||||
input_tokens INTEGER DEFAULT 0,
|
||||
output_tokens INTEGER DEFAULT 0,
|
||||
cache_read_tokens INTEGER DEFAULT 0,
|
||||
cache_write_tokens INTEGER DEFAULT 0,
|
||||
reasoning_tokens INTEGER DEFAULT 0,
|
||||
latency_ms INTEGER,
|
||||
ttft_ms INTEGER,
|
||||
estimated_cost_usd REAL,
|
||||
cost_status TEXT,
|
||||
cost_source TEXT,
|
||||
end_reason TEXT,
|
||||
retry_count INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_tool_calls (
|
||||
span_id TEXT PRIMARY KEY,
|
||||
run_id TEXT NOT NULL,
|
||||
tool_name TEXT, -- raw tool name (e.g. "web_search")
|
||||
backend TEXT,
|
||||
duration_ms INTEGER,
|
||||
result_class TEXT,
|
||||
retry_count INTEGER DEFAULT 0,
|
||||
approval TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_gateway_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
platform TEXT, direction TEXT, result TEXT,
|
||||
voice INTEGER DEFAULT 0, attachments INTEGER DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_cron_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
kind TEXT, result TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_skill_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
action TEXT, skill_name TEXT -- skill_name the local plane only
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_memory_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
action TEXT, result TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_feedback_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
kind TEXT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tel_error_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT, ts_ns INTEGER NOT NULL,
|
||||
error_class TEXT, subsystem TEXT, recovery TEXT -- enums only; NO raw message/stack
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_runs_trace ON tel_runs(trace_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_runs_session ON tel_runs(session_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_runs_start ON tel_runs(start_ns);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_spans_run ON tel_spans(run_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_spans_trace ON tel_spans(trace_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_model_run ON tel_model_calls(run_id);
|
||||
CREATE INDEX IF NOT EXISTS ix_tel_tool_run ON tel_tool_calls(run_id);
|
||||
"""
|
||||
|
||||
# Indexes that reference columns added in later schema versions must be
|
||||
|
||||
319
plugins/telemetry/__init__.py
Normal file
319
plugins/telemetry/__init__.py
Normal file
@@ -0,0 +1,319 @@
|
||||
"""Telemetry plugin — wires Hermes lifecycle hooks to the local-plane emitter.
|
||||
|
||||
This is the *only* instrumentation seam. It registers observational hooks (which core
|
||||
already invokes fail-open) and translates each into a typed local-plane telemetry
|
||||
event handed to ``agent.telemetry.emitter``. There are zero edits to core call sites:
|
||||
the hooks already carry model/provider/usage/duration/tool data.
|
||||
|
||||
Everything here is best-effort and fail-open — a raised exception in a hook callback is
|
||||
swallowed by core, and we additionally guard each callback so a telemetry bug can never
|
||||
disturb a session. No content, no network: local plane only.
|
||||
|
||||
Hooks consumed:
|
||||
on_session_start -> begin a run context (trace_id/run_id), buffer a run row
|
||||
post_api_request -> one model_call event (tokens, latency, raw provider/model)
|
||||
api_request_error -> one error event
|
||||
post_tool_call -> one tool_call event (raw tool name, duration, result class)
|
||||
on_session_finalize -> finalize the run row (end_reason, counts, cost)
|
||||
subagent_start/stop -> (reserved) lineage markers
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Per-run accumulators keyed by run_id, so on_session_finalize can roll up counts.
|
||||
_runs: Dict[str, Dict[str, Any]] = {}
|
||||
_runs_lock = threading.Lock()
|
||||
|
||||
|
||||
def _safe(fn):
|
||||
"""Decorator: never let a telemetry hook raise into core."""
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return fn(*args, **kwargs)
|
||||
except Exception:
|
||||
logger.debug("telemetry hook %s failed", getattr(fn, "__name__", "?"), exc_info=True)
|
||||
return None
|
||||
wrapper.__name__ = getattr(fn, "__name__", "wrapper")
|
||||
return wrapper
|
||||
|
||||
|
||||
def _run_key(session_id: Optional[str], task_id: Optional[str]) -> str:
|
||||
return session_id or task_id or "default"
|
||||
|
||||
|
||||
# ── on_session_start ────────────────────────────────────────────────────────
|
||||
@_safe
|
||||
def _on_session_start(**kw: Any) -> None:
|
||||
from agent.telemetry import spans
|
||||
|
||||
session_id = kw.get("session_id") or ""
|
||||
platform = kw.get("platform") or kw.get("source") or ""
|
||||
ctx = spans.start_run()
|
||||
key = _run_key(session_id, kw.get("task_id"))
|
||||
with _runs_lock:
|
||||
_runs[key] = {
|
||||
"run_id": ctx.run_id,
|
||||
"trace_id": ctx.trace_id,
|
||||
"session_id": session_id or None,
|
||||
"entrypoint": _entrypoint_for(platform, kw.get("source")),
|
||||
"platform": platform or None,
|
||||
"start_ns": time.time_ns(),
|
||||
"model_call_count": 0,
|
||||
"tool_call_count": 0,
|
||||
"error_count": 0,
|
||||
}
|
||||
|
||||
|
||||
def _ensure_run(session_id: Optional[str], task_id: Optional[str], platform: str = "") -> Dict[str, Any]:
|
||||
"""Return the run accumulator, lazily creating one if session_start was missed."""
|
||||
from agent.telemetry import spans
|
||||
|
||||
key = _run_key(session_id, task_id)
|
||||
with _runs_lock:
|
||||
run = _runs.get(key)
|
||||
if run is None:
|
||||
rid = spans.current_run_id() or spans.new_id()
|
||||
tid = spans.current_trace_id() or spans.new_id()
|
||||
run = {
|
||||
"run_id": rid,
|
||||
"trace_id": tid,
|
||||
"session_id": session_id or None,
|
||||
"entrypoint": _entrypoint_for(platform),
|
||||
"platform": platform or None,
|
||||
"start_ns": time.time_ns(),
|
||||
"model_call_count": 0,
|
||||
"tool_call_count": 0,
|
||||
"error_count": 0,
|
||||
}
|
||||
_runs[key] = run
|
||||
return run
|
||||
|
||||
|
||||
def _entrypoint_for(platform: Optional[str], source: Optional[str] = None) -> str:
|
||||
"""Coarse entrypoint label (cli / gateway / tui / api / cron …).
|
||||
|
||||
This is a workflow *surface* label, not model/tool anonymization — it answers
|
||||
"where did this run come from", which is genuinely categorical.
|
||||
"""
|
||||
s = (source or platform or "").lower()
|
||||
if s in ("", "chat", "interactive", "cli", "desktop"):
|
||||
return "cli"
|
||||
if s in ("telegram", "discord", "slack", "whatsapp", "signal", "matrix",
|
||||
"email", "sms", "teams", "feishu", "wecom", "line", "google_chat"):
|
||||
return "gateway"
|
||||
if s == "tui":
|
||||
return "tui"
|
||||
if s in ("api", "api_server", "openai_api"):
|
||||
return "api"
|
||||
if s == "cron":
|
||||
return "cron"
|
||||
if s == "batch":
|
||||
return "batch"
|
||||
if s == "acp":
|
||||
return "acp"
|
||||
return "cli"
|
||||
|
||||
|
||||
# ── post_api_request -> model_call event ────────────────────────────────────
|
||||
@_safe
|
||||
def _on_post_api_request(**kw: Any) -> None:
|
||||
from agent.telemetry import emitter, spans
|
||||
from agent.telemetry.events import ModelCallEvent
|
||||
|
||||
session_id = kw.get("session_id") or ""
|
||||
platform = kw.get("platform") or ""
|
||||
run = _ensure_run(session_id, kw.get("task_id"), platform)
|
||||
|
||||
usage = kw.get("usage") or {}
|
||||
duration = kw.get("api_duration")
|
||||
latency_ms = int(duration * 1000) if isinstance(duration, (int, float)) else None
|
||||
|
||||
evt = ModelCallEvent(
|
||||
span_id=spans.new_span_id(),
|
||||
run_id=run["run_id"],
|
||||
provider=kw.get("provider"), # raw
|
||||
model=kw.get("model"), # raw
|
||||
base_url=kw.get("base_url"),
|
||||
input_tokens=int(usage.get("input_tokens") or 0),
|
||||
output_tokens=int(usage.get("output_tokens") or 0),
|
||||
cache_read_tokens=int(usage.get("cache_read_tokens") or 0),
|
||||
cache_write_tokens=int(usage.get("cache_write_tokens") or 0),
|
||||
reasoning_tokens=int(usage.get("reasoning_tokens") or 0),
|
||||
latency_ms=latency_ms,
|
||||
end_reason="completed",
|
||||
)
|
||||
with _runs_lock:
|
||||
run["model_call_count"] += 1
|
||||
emitter.emit(evt)
|
||||
|
||||
|
||||
# ── api_request_error -> error event ────────────────────────────────────────
|
||||
@_safe
|
||||
def _on_api_request_error(**kw: Any) -> None:
|
||||
from agent.telemetry import emitter
|
||||
from agent.telemetry.events import ErrorEvent
|
||||
|
||||
session_id = kw.get("session_id") or ""
|
||||
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
|
||||
error_class = _coarse_error_class(kw.get("error_type") or kw.get("error") or "")
|
||||
with _runs_lock:
|
||||
run["error_count"] += 1
|
||||
emitter.emit(ErrorEvent(
|
||||
run_id=run["run_id"],
|
||||
error_class=error_class,
|
||||
subsystem="model_api",
|
||||
recovery=None,
|
||||
))
|
||||
|
||||
|
||||
def _coarse_error_class(raw: Any) -> str:
|
||||
s = str(raw).lower()
|
||||
if "timeout" in s:
|
||||
return "provider_timeout"
|
||||
if "rate" in s and "limit" in s:
|
||||
return "rate_limit"
|
||||
if any(k in s for k in ("auth", "401", "403", "unauthorized", "forbidden")):
|
||||
return "auth"
|
||||
if any(k in s for k in ("connection", "network", "dns", "socket")):
|
||||
return "network"
|
||||
if "context" in s and ("length" in s or "overflow" in s or "token" in s):
|
||||
return "context_overflow"
|
||||
if any(k in s for k in ("500", "502", "503", "server error", "provider")):
|
||||
return "provider_error"
|
||||
return "unknown"
|
||||
|
||||
|
||||
# ── post_tool_call -> tool_call event ───────────────────────────────────────
|
||||
@_safe
|
||||
def _on_post_tool_call(**kw: Any) -> None:
|
||||
from agent.telemetry import emitter, spans
|
||||
from agent.telemetry.events import ToolCallEvent
|
||||
|
||||
session_id = kw.get("session_id") or ""
|
||||
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
|
||||
|
||||
function_name = kw.get("function_name") or kw.get("tool_name")
|
||||
duration_ms = kw.get("duration_ms")
|
||||
result = kw.get("result")
|
||||
result_class = _tool_result_class(result)
|
||||
|
||||
with _runs_lock:
|
||||
run["tool_call_count"] += 1
|
||||
if result_class == "error":
|
||||
run["error_count"] += 1
|
||||
|
||||
emitter.emit(ToolCallEvent(
|
||||
span_id=spans.new_span_id(),
|
||||
run_id=run["run_id"],
|
||||
tool_name=function_name, # raw tool name
|
||||
duration_ms=int(duration_ms) if isinstance(duration_ms, (int, float)) else None,
|
||||
result_class=result_class,
|
||||
))
|
||||
|
||||
|
||||
def _tool_result_class(result: Any) -> str:
|
||||
"""Classify a tool result without retaining content — error vs ok vs blocked."""
|
||||
try:
|
||||
import json
|
||||
if isinstance(result, str):
|
||||
r = result.strip()
|
||||
if r.startswith("{"):
|
||||
obj = json.loads(r)
|
||||
if isinstance(obj, dict):
|
||||
if obj.get("error") or obj.get("blocked"):
|
||||
return "blocked" if obj.get("blocked") else "error"
|
||||
if obj.get("timeout"):
|
||||
return "timeout"
|
||||
return "ok"
|
||||
if isinstance(result, dict):
|
||||
if result.get("error"):
|
||||
return "error"
|
||||
return "ok"
|
||||
except Exception:
|
||||
return "ok"
|
||||
return "ok"
|
||||
|
||||
|
||||
# ── on_session_finalize -> finalize the run row ─────────────────────────────
|
||||
@_safe
|
||||
def _on_session_finalize(**kw: Any) -> None:
|
||||
from agent.telemetry import emitter, spans
|
||||
from agent.telemetry.events import RunEvent
|
||||
|
||||
session_id = kw.get("session_id") or ""
|
||||
key = _run_key(session_id, kw.get("task_id"))
|
||||
with _runs_lock:
|
||||
run = _runs.pop(key, None)
|
||||
if run is None:
|
||||
run = _ensure_run(session_id, kw.get("task_id"), kw.get("platform") or "")
|
||||
with _runs_lock:
|
||||
_runs.pop(key, None)
|
||||
|
||||
end_reason = _coarse_end_reason(kw)
|
||||
emitter.emit(RunEvent(
|
||||
run_id=run["run_id"],
|
||||
trace_id=run["trace_id"],
|
||||
entrypoint=run.get("entrypoint", "cli"),
|
||||
session_id=run.get("session_id"),
|
||||
platform=run.get("platform"),
|
||||
start_ns=run.get("start_ns", time.time_ns()),
|
||||
end_ns=time.time_ns(),
|
||||
end_reason=end_reason,
|
||||
model_call_count=run.get("model_call_count", 0),
|
||||
tool_call_count=run.get("tool_call_count", 0),
|
||||
error_count=run.get("error_count", 0),
|
||||
estimated_cost_usd=_as_float(kw.get("estimated_cost_usd")),
|
||||
cost_status=kw.get("cost_status"),
|
||||
))
|
||||
spans.clear_run()
|
||||
|
||||
|
||||
def _coarse_end_reason(kw: Dict[str, Any]) -> str:
|
||||
if kw.get("interrupted"):
|
||||
return "interrupted"
|
||||
if kw.get("failed"):
|
||||
return "failed"
|
||||
reason = str(kw.get("turn_exit_reason") or "").lower()
|
||||
if "max_iteration" in reason:
|
||||
return "max_iterations"
|
||||
if "timeout" in reason:
|
||||
return "timeout"
|
||||
return "completed"
|
||||
|
||||
|
||||
def _as_float(v: Any) -> Optional[float]:
|
||||
try:
|
||||
return float(v) if v is not None else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
# ── subagent lineage (reserved) ─────────────────────────────────────────────
|
||||
@_safe
|
||||
def _on_subagent_start(**kw: Any) -> None:
|
||||
# Subagents inherit the run context via contextvars; no explicit handling needed.
|
||||
return None
|
||||
|
||||
|
||||
@_safe
|
||||
def _on_subagent_stop(**kw: Any) -> None:
|
||||
return None
|
||||
|
||||
|
||||
# ── registration ────────────────────────────────────────────────────────────
|
||||
def register(ctx) -> None:
|
||||
ctx.register_hook("on_session_start", _on_session_start)
|
||||
ctx.register_hook("post_api_request", _on_post_api_request)
|
||||
ctx.register_hook("api_request_error", _on_api_request_error)
|
||||
ctx.register_hook("post_tool_call", _on_post_tool_call)
|
||||
ctx.register_hook("on_session_finalize", _on_session_finalize)
|
||||
ctx.register_hook("subagent_start", _on_subagent_start)
|
||||
ctx.register_hook("subagent_stop", _on_subagent_stop)
|
||||
logger.debug("telemetry plugin registered 7 hooks")
|
||||
12
plugins/telemetry/plugin.yaml
Normal file
12
plugins/telemetry/plugin.yaml
Normal file
@@ -0,0 +1,12 @@
|
||||
name: telemetry
|
||||
version: 1.0.0
|
||||
description: "Local-first telemetry & observability. Records runs, model calls, tool calls, and errors to a local event log + state.db index via plugin hooks — no agent action, no content, no network. Powers /usage, /insights, and local dashboards."
|
||||
author: NousResearch
|
||||
hooks:
|
||||
- post_api_request
|
||||
- api_request_error
|
||||
- post_tool_call
|
||||
- on_session_start
|
||||
- on_session_finalize
|
||||
- subagent_start
|
||||
- subagent_stop
|
||||
@@ -154,6 +154,10 @@ edge-tts = ["edge-tts==7.2.7"]
|
||||
modal = ["modal==1.3.4"]
|
||||
daytona = ["daytona==0.155.0"]
|
||||
hindsight = ["hindsight-client==0.6.1"]
|
||||
# OTLP telemetry export (optional). Provides the OpenTelemetry SDK + OTLP/HTTP
|
||||
# exporter so `hermes telemetry export --otlp` can send spans to a Collector.
|
||||
# Lazy-imported by agent/telemetry/otlp_exporter.py; never a core dependency.
|
||||
otlp = ["opentelemetry-sdk==1.30.0", "opentelemetry-exporter-otlp-proto-http==1.30.0"]
|
||||
dev = ["debugpy==1.8.20", "pytest==9.0.2", "pytest-asyncio==1.3.0", "mcp==1.26.0", "starlette==1.0.1", "ty==0.0.21", "ruff==0.15.10", "setuptools==81.0.0"] # starlette: CVE-2026-48710; setuptools: latest <82 (torch >=2.11 caps setuptools<82)
|
||||
messaging = ["python-telegram-bot[webhooks]==22.6", "discord.py[voice]==2.7.1", "aiohttp==3.13.4", "brotlicffi==1.2.0.1", "slack-bolt==1.27.0", "slack-sdk==3.40.1", "qrcode==7.4.2"] # aiohttp: CVE-2026-34513/34518/34519/34520/34525
|
||||
cron = [] # croniter is now a core dependency; this extra kept for back-compat
|
||||
|
||||
@@ -760,7 +760,11 @@ class TestPluginHooks:
|
||||
mgr.discover_and_load()
|
||||
|
||||
assert mgr.has_hook("pre_api_request") is True
|
||||
assert mgr.has_hook("post_api_request") is False
|
||||
# Negative sentinel: a hook no plugin (user or bundled) registers, proving
|
||||
# that registering pre_api_request doesn't conjure an unrelated hook.
|
||||
# (post_api_request is now registered by the bundled telemetry plugin, so
|
||||
# it is no longer a valid "nothing registered this" sentinel.)
|
||||
assert mgr.has_hook("transform_terminal_output") is False
|
||||
results = mgr.invoke_hook(
|
||||
"pre_api_request",
|
||||
session_id="s1",
|
||||
|
||||
0
tests/telemetry/__init__.py
Normal file
0
tests/telemetry/__init__.py
Normal file
94
tests/telemetry/test_cli_telemetry.py
Normal file
94
tests/telemetry/test_cli_telemetry.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""`hermes telemetry` handler smoke tests (local-only; no upload)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import time
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_state
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def home(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
# state.db with tel_* + seeded data
|
||||
db = tmp_path / "state.db"
|
||||
hermes_state.SessionDB(db_path=db)
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import RunEvent, ModelCallEvent, ToolCallEvent
|
||||
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "e.jsonl", db_path=db)
|
||||
now = time.time_ns()
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
|
||||
start_ns=now - 60_000_000, end_ns=now, model_call_count=1,
|
||||
tool_call_count=1, estimated_cost_usd=0.3))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
|
||||
model="claude-opus-4",
|
||||
input_tokens=20000, output_tokens=2000))
|
||||
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search",
|
||||
result_class="ok"))
|
||||
em.flush()
|
||||
em.close()
|
||||
yield tmp_path
|
||||
|
||||
|
||||
def _run(action, **kw):
|
||||
from hermes_cli.main import cmd_telemetry
|
||||
args = types.SimpleNamespace(telemetry_action=action, days=30, limit=10, json=False)
|
||||
for k, v in kw.items():
|
||||
setattr(args, k, v)
|
||||
cmd_telemetry(args)
|
||||
|
||||
|
||||
def test_status_runs(home, capsys):
|
||||
_run("status")
|
||||
out = capsys.readouterr().out
|
||||
assert "Telemetry status" in out
|
||||
assert "Upload:" in out and "DISABLED" in out
|
||||
assert "Local data:" in out
|
||||
|
||||
|
||||
def test_preview_shows_real_values(home, capsys):
|
||||
_run("preview")
|
||||
out = capsys.readouterr().out
|
||||
assert "NOT uploaded" in out
|
||||
assert "workflow_completed" in out
|
||||
# real model + tool names ARE shown (this is the user's own local data)
|
||||
assert "claude-opus-4" in out
|
||||
assert "web_search" in out
|
||||
|
||||
|
||||
def test_status_reflects_consent_set_via_config(home, capsys):
|
||||
# Opting in is a plain config write now (no `enable` verb). status should
|
||||
# reflect consent_state=aggregate as the aggregate plane being on.
|
||||
from hermes_cli.config import load_config, save_config
|
||||
cfg = load_config()
|
||||
cfg.setdefault("telemetry", {})["consent_state"] = "aggregate"
|
||||
save_config(cfg)
|
||||
_run("status")
|
||||
out = capsys.readouterr().out
|
||||
assert "consent_state=aggregate" in out
|
||||
assert "Aggregate plane: on" in out
|
||||
|
||||
|
||||
def test_status_shows_optin_hint_when_unknown(home, capsys):
|
||||
_run("status")
|
||||
out = capsys.readouterr().out
|
||||
assert "Aggregate plane: off" in out
|
||||
assert "config set telemetry.consent_state aggregate" in out
|
||||
|
||||
|
||||
def test_allow_aggregate_false_keeps_plane_off_in_status(home, capsys):
|
||||
# Even with consent opted in, a managed allow_aggregate:false wins.
|
||||
from hermes_cli.config import load_config, save_config
|
||||
cfg = load_config()
|
||||
tel = cfg.setdefault("telemetry", {})
|
||||
tel["consent_state"] = "aggregate"
|
||||
tel["allow_aggregate"] = False
|
||||
save_config(cfg)
|
||||
_run("status")
|
||||
out = capsys.readouterr().out
|
||||
assert "Aggregate plane: off" in out
|
||||
assert "allow_aggregate is false" in out
|
||||
107
tests/telemetry/test_emitter.py
Normal file
107
tests/telemetry/test_emitter.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Emitter tests — the hot-path invariant is the one that matters most.
|
||||
|
||||
Invariant: emit() never blocks, never raises, and a broken writer cannot slow or
|
||||
break the caller. Plus: JSONL + SQLite round-trip, and the SQLite index is rebuildable
|
||||
from the JSONL source of truth.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
import hermes_state
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
|
||||
|
||||
|
||||
def _fresh_db(tmp_path):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
conn.close()
|
||||
return db
|
||||
|
||||
|
||||
def test_emit_is_fast_even_when_writer_is_broken(tmp_path, monkeypatch):
|
||||
"""The core guarantee: a writer that raises AND sleeps cannot stall emit()."""
|
||||
db = _fresh_db(tmp_path)
|
||||
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
|
||||
|
||||
# Sabotage the row indexer to raise after a long sleep.
|
||||
def broken(_conn, _ev):
|
||||
time.sleep(5.0)
|
||||
raise RuntimeError("writer exploded")
|
||||
|
||||
monkeypatch.setattr(em, "_index_one", broken)
|
||||
|
||||
start = time.monotonic()
|
||||
for i in range(50):
|
||||
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=10))
|
||||
elapsed = time.monotonic() - start
|
||||
|
||||
# 50 emits must complete in well under the writer's single 5s sleep.
|
||||
assert elapsed < 1.0, f"emit() blocked: {elapsed:.2f}s"
|
||||
em.close()
|
||||
|
||||
|
||||
def test_emit_never_raises_on_bad_event(tmp_path):
|
||||
db = _fresh_db(tmp_path)
|
||||
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
|
||||
# Non-serializable / wrong-shaped inputs must not raise out of emit().
|
||||
em.emit(object()) # no to_dict, not a mapping
|
||||
em.emit({"event": "run"}) # minimal dict
|
||||
em.close()
|
||||
|
||||
|
||||
def test_jsonl_and_sqlite_roundtrip(tmp_path):
|
||||
db = _fresh_db(tmp_path)
|
||||
jsonl = tmp_path / "telemetry" / "events.jsonl"
|
||||
em = TelemetryEmitter(events_path=jsonl, db_path=db)
|
||||
|
||||
em.emit(RunEvent(run_id="run1", trace_id="t1", entrypoint="cli", end_reason="completed"))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="run1", provider="anthropic",
|
||||
model="claude-opus-4", input_tokens=100, output_tokens=20))
|
||||
em.emit(ToolCallEvent(span_id="tc1", run_id="run1", tool_name="web_search",
|
||||
duration_ms=120, result_class="ok"))
|
||||
em.flush()
|
||||
em.close()
|
||||
|
||||
# JSONL has all three lines
|
||||
lines = [l for l in jsonl.read_text(encoding="utf-8").splitlines() if l.strip()]
|
||||
assert len(lines) == 3
|
||||
|
||||
# SQLite index has the rows in the right tables
|
||||
conn = sqlite3.connect(db)
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 1
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_tool_calls").fetchone()[0] == 1
|
||||
row = conn.execute("SELECT provider, model, input_tokens FROM tel_model_calls").fetchone()
|
||||
assert row == ("anthropic", "claude-opus-4", 100)
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_unknown_event_kind_is_ignored_not_fatal(tmp_path):
|
||||
db = _fresh_db(tmp_path)
|
||||
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
|
||||
em.emit({"event": "totally_unknown", "foo": "bar"})
|
||||
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli"))
|
||||
em.flush()
|
||||
em.close()
|
||||
conn = sqlite3.connect(db)
|
||||
# The unknown event is in JSONL but skipped by the indexer; the known one indexes.
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 1
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_disabled_emitter_writes_nothing(tmp_path):
|
||||
db = _fresh_db(tmp_path)
|
||||
jsonl = tmp_path / "telemetry" / "events.jsonl"
|
||||
em = TelemetryEmitter(events_path=jsonl, db_path=db, enabled=False)
|
||||
em.emit(RunEvent(run_id="r3", trace_id="t3", entrypoint="cli"))
|
||||
em.flush()
|
||||
em.close()
|
||||
assert not jsonl.exists()
|
||||
conn = sqlite3.connect(db)
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_runs").fetchone()[0] == 0
|
||||
conn.close()
|
||||
113
tests/telemetry/test_export_redaction.py
Normal file
113
tests/telemetry/test_export_redaction.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Export redaction pipeline tests — the security-critical layer.
|
||||
|
||||
Invariants:
|
||||
* Secrets ALWAYS stripped, every mode, no flag disables it.
|
||||
* Content gated by the trajectories plane, not a redaction mode.
|
||||
* PII stripped in 'pii' mode; structure preserved (codec-aware).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from agent.telemetry import redaction as R
|
||||
|
||||
|
||||
# ── secrets are always redacted ─────────────────────────────────────────────
|
||||
def test_secrets_stripped_in_none_mode():
|
||||
text = "here is sk-ant-api03-SECRETKEY123 and a token"
|
||||
out = R.redact_for_export(text, content_mode=R.CONTENT_NONE)
|
||||
assert "SECRETKEY123" not in out
|
||||
|
||||
|
||||
def test_secrets_stripped_in_pii_mode():
|
||||
text = "Authorization: Bearer abcdef123456789secret"
|
||||
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
|
||||
assert "abcdef123456789secret" not in out
|
||||
|
||||
|
||||
def test_secret_redactor_fails_closed(monkeypatch):
|
||||
# If the underlying redactor raises, we must NOT return the raw string.
|
||||
import agent.redact as ar
|
||||
monkeypatch.setattr(ar, "redact_sensitive_text", lambda *a, **k: (_ for _ in ()).throw(RuntimeError()))
|
||||
out = R.redact_for_export("sk-secret-value", content_mode=R.CONTENT_NONE)
|
||||
assert "sk-secret-value" not in out
|
||||
assert out == "[redaction-unavailable]"
|
||||
|
||||
|
||||
# ── PII ─────────────────────────────────────────────────────────────────────
|
||||
def test_pii_mode_strips_email_and_phone():
|
||||
text = "contact alice@example.com or +1 415 555 1234"
|
||||
out = R.redact_for_export(text, content_mode=R.CONTENT_PII)
|
||||
assert "alice@example.com" not in out
|
||||
assert "[email]" in out
|
||||
assert "555" not in out or "[phone]" in out
|
||||
|
||||
|
||||
def test_none_mode_keeps_nonsecret_text_but_drops_via_message_path():
|
||||
# redact_for_export(none) scrubs secrets but doesn't strip ordinary words;
|
||||
# content *dropping* happens at the message layer (trajectories gate).
|
||||
out = R.redact_for_export("just ordinary words", content_mode=R.CONTENT_NONE)
|
||||
assert "ordinary" in out
|
||||
|
||||
|
||||
# ── trajectories gate (content_export_enabled) ──────────────────────────────
|
||||
def test_content_export_disabled_by_default():
|
||||
assert R.content_export_enabled({}) is False
|
||||
assert R.content_export_enabled({"telemetry": {}}) is False
|
||||
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": False}}}) is False
|
||||
|
||||
|
||||
def test_content_export_enabled_when_trajectories_on():
|
||||
assert R.content_export_enabled({"telemetry": {"trajectories": {"enabled": True}}}) is True
|
||||
|
||||
|
||||
# ── codec-aware message redaction ───────────────────────────────────────────
|
||||
def test_message_structural_only_when_content_excluded():
|
||||
msg = {"role": "user", "content": "my email is bob@x.com and key sk-12345"}
|
||||
out = R.redact_message(msg, include_content=False)
|
||||
assert out["role"] == "user"
|
||||
assert "content" not in out # body dropped entirely
|
||||
assert out["content_chars"] == len(msg["content"]) # only the size remains
|
||||
assert "bob@x.com" not in json.dumps(out)
|
||||
|
||||
|
||||
def test_message_content_included_is_redacted():
|
||||
msg = {"role": "user", "content": "email bob@x.com secret sk-ant-SECRET999"}
|
||||
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
|
||||
assert "content" in out
|
||||
assert "SECRET999" not in out["content"] # secret gone
|
||||
assert "bob@x.com" not in out["content"] # pii gone
|
||||
assert "[email]" in out["content"]
|
||||
|
||||
|
||||
def test_tool_calls_redacted_names_kept_args_scrubbed():
|
||||
msg = {
|
||||
"role": "assistant",
|
||||
"tool_calls": json.dumps([
|
||||
{"function": {"name": "web_search", "arguments": '{"q": "email me at z@z.com"}'}}
|
||||
]),
|
||||
}
|
||||
out = R.redact_message(msg, content_mode=R.CONTENT_PII, include_content=True)
|
||||
tc = out["tool_calls"]
|
||||
assert tc[0]["name"] == "web_search" # structure/name preserved
|
||||
assert "z@z.com" not in json.dumps(tc) # arg pii scrubbed
|
||||
|
||||
|
||||
def test_tool_calls_counted_when_content_excluded():
|
||||
msg = {
|
||||
"role": "assistant",
|
||||
"tool_calls": json.dumps([
|
||||
{"function": {"name": "a", "arguments": "{}"}},
|
||||
{"function": {"name": "b", "arguments": "{}"}},
|
||||
]),
|
||||
}
|
||||
out = R.redact_message(msg, include_content=False)
|
||||
assert out["tool_call_count"] == 2
|
||||
assert "tool_calls" not in out
|
||||
|
||||
|
||||
def test_content_mode_for_reads_config():
|
||||
assert R.content_mode_for({"telemetry": {"content_redaction": "pii"}}) == "pii"
|
||||
assert R.content_mode_for({"telemetry": {"content_redaction": "bogus"}}) == "none"
|
||||
assert R.content_mode_for({}) == "none"
|
||||
102
tests/telemetry/test_exporter_bulk.py
Normal file
102
tests/telemetry/test_exporter_bulk.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Bulk export tests — telemetry always, content only behind the trajectories gate."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import io
|
||||
import json
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
import hermes_state
|
||||
from agent.telemetry import exporter_bulk
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
|
||||
|
||||
|
||||
def _seed(tmp_path, with_secret_content=True):
|
||||
db = tmp_path / "state.db"
|
||||
sdb = hermes_state.SessionDB(db_path=db)
|
||||
# telemetry
|
||||
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
|
||||
now = time.time_ns()
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
|
||||
start_ns=now - 60_000_000, end_ns=now, model_call_count=1, tool_call_count=1))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
|
||||
model="claude-sonnet-4", input_tokens=1000, output_tokens=100))
|
||||
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
|
||||
em.flush()
|
||||
em.close()
|
||||
# session + message content (with an embedded secret + email)
|
||||
sdb.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
|
||||
if with_secret_content:
|
||||
sdb.append_message("s1", role="user",
|
||||
content="my key is AKIAIOSFODNN7EXAMPLE and email me at carol@corp.com")
|
||||
sdb.append_message("s1", role="assistant", content="ok")
|
||||
sdb.close()
|
||||
return db
|
||||
|
||||
|
||||
def test_telemetry_exported_content_excluded_by_default(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
buf = io.StringIO()
|
||||
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=False, config={}, db_path=db)
|
||||
assert counts["telemetry"] >= 3
|
||||
assert counts["content_included"] == 0
|
||||
text = buf.getvalue()
|
||||
# message bodies must NOT be present
|
||||
assert "carol@corp.com" not in text
|
||||
assert "AKIAIOSFODNN7EXAMPLE" not in text
|
||||
# but a session record (structural) is present with message structure
|
||||
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
|
||||
sess = [r for r in lines if r.get("_kind") == "session"]
|
||||
assert sess and sess[0]["messages"]
|
||||
assert "content" not in sess[0]["messages"][0] # structural only
|
||||
assert "content_chars" in sess[0]["messages"][0]
|
||||
|
||||
|
||||
def test_include_content_ignored_without_trajectories(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
buf = io.StringIO()
|
||||
# request content but trajectories disabled -> forced off
|
||||
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True,
|
||||
config={"telemetry": {"trajectories": {"enabled": False}}}, db_path=db)
|
||||
assert counts["content_included"] == 0
|
||||
assert "carol@corp.com" not in buf.getvalue()
|
||||
|
||||
|
||||
def test_content_included_when_trajectories_on_but_redacted(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
buf = io.StringIO()
|
||||
cfg = {"telemetry": {"trajectories": {"enabled": True}, "content_redaction": "pii"}}
|
||||
counts = exporter_bulk.export(buf, fmt="ndjson", include_content=True, config=cfg, db_path=db)
|
||||
assert counts["content_included"] == 1
|
||||
text = buf.getvalue()
|
||||
# content present but secret + pii scrubbed
|
||||
assert "AKIAIOSFODNN7EXAMPLE" not in text # secret always gone
|
||||
assert "carol@corp.com" not in text # pii gone in pii mode
|
||||
lines = [json.loads(l) for l in text.splitlines() if l.strip()]
|
||||
sess = [r for r in lines if r.get("_kind") == "session"][0]
|
||||
# the user message now has a (redacted) content field
|
||||
user_msg = [m for m in sess["messages"] if m["role"] == "user"][0]
|
||||
assert "content" in user_msg
|
||||
assert "[email]" in user_msg["content"]
|
||||
|
||||
|
||||
def test_json_format_roundtrips(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
buf = io.StringIO()
|
||||
exporter_bulk.export(buf, fmt="json", include_content=False, config={}, db_path=db)
|
||||
obj = json.loads(buf.getvalue())
|
||||
assert "records" in obj
|
||||
assert any(r["_kind"] == "tel_runs" for r in obj["records"])
|
||||
|
||||
|
||||
def test_since_window_filters_runs(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
buf = io.StringIO()
|
||||
# since 1ns ago in the future-ish -> the run (start ~60ms ago) excluded
|
||||
future_ns = int((time.time() + 1) * 1e9)
|
||||
counts = exporter_bulk.export(buf, fmt="ndjson", since_ns=future_ns, config={}, db_path=db)
|
||||
lines = [json.loads(l) for l in buf.getvalue().splitlines() if l.strip()]
|
||||
runs = [r for r in lines if r.get("_kind") == "tel_runs"]
|
||||
assert runs == []
|
||||
98
tests/telemetry/test_governance.py
Normal file
98
tests/telemetry/test_governance.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""Export configuration visibility in `hermes telemetry status`.
|
||||
|
||||
The status Export block reports the current export configuration. Whether a key is
|
||||
locked is handled by the managed-scope layer, not repeated here; the allow_aggregate
|
||||
gate is covered by a test so a managed pin can't be regressed.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import types
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_state
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def home(tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
hermes_state.SessionDB(db_path=tmp_path / "state.db")
|
||||
yield tmp_path
|
||||
|
||||
|
||||
def _status(capsys):
|
||||
from hermes_cli.main import cmd_telemetry
|
||||
cmd_telemetry(types.SimpleNamespace(telemetry_action="status"))
|
||||
return capsys.readouterr().out
|
||||
|
||||
|
||||
def test_export_block_default_shows_otlp_disabled(home, capsys):
|
||||
out = _status(capsys)
|
||||
assert "Export" in out
|
||||
assert "OTLP:" in out and "disabled" in out
|
||||
assert "Content export: off" in out
|
||||
assert "Secret redaction: on (always)" in out
|
||||
|
||||
|
||||
def test_export_block_shows_endpoint_host_never_token(home, capsys, monkeypatch):
|
||||
from hermes_cli.config import load_config, save_config
|
||||
monkeypatch.setenv("CORP_OTLP_TOKEN", "supersecret-do-not-print")
|
||||
c = load_config()
|
||||
t = c.setdefault("telemetry", {})
|
||||
t.setdefault("export", {})["otlp"] = {
|
||||
"enabled": True,
|
||||
"endpoint": "https://collector.corp:4318/v1/traces",
|
||||
"headers_env": {"Authorization": "CORP_OTLP_TOKEN"},
|
||||
}
|
||||
save_config(c)
|
||||
out = _status(capsys)
|
||||
# endpoint host present
|
||||
assert "https://collector.corp:4318/v1/traces" in out
|
||||
# env var name + set-state present; the VALUE never printed
|
||||
assert "CORP_OTLP_TOKEN" in out
|
||||
assert "(set)" in out
|
||||
assert "supersecret-do-not-print" not in out
|
||||
|
||||
|
||||
def test_export_block_reflects_trajectories_gate(home, capsys):
|
||||
from hermes_cli.config import load_config, save_config
|
||||
c = load_config()
|
||||
c.setdefault("telemetry", {})["trajectories"] = {"enabled": True}
|
||||
save_config(c)
|
||||
out = _status(capsys)
|
||||
assert "Content export: on (trajectories plane)" in out
|
||||
|
||||
|
||||
def test_token_env_not_set_shows_not_set(home, capsys):
|
||||
from hermes_cli.config import load_config, save_config
|
||||
c = load_config()
|
||||
t = c.setdefault("telemetry", {})
|
||||
t.setdefault("export", {})["otlp"] = {
|
||||
"enabled": True,
|
||||
"endpoint": "https://x:4318/v1/traces",
|
||||
"headers_env": {"Authorization": "TOTALLY_UNSET_ENV_VAR_XYZ"},
|
||||
}
|
||||
save_config(c)
|
||||
out = _status(capsys)
|
||||
assert "(NOT set)" in out
|
||||
|
||||
|
||||
def test_allow_aggregate_pin_blocks_opt_in(home):
|
||||
"""A managed allow_aggregate:false pin overrides a consent_state opt-in.
|
||||
|
||||
Consent is set in config (as a user or managed-scope pin would); the hard gate
|
||||
still wins, so the aggregate plane resolves off and may_upload stays false.
|
||||
"""
|
||||
from hermes_cli.config import load_config, save_config
|
||||
from agent.telemetry import policy
|
||||
c = load_config()
|
||||
tel = c.setdefault("telemetry", {})
|
||||
tel["consent_state"] = "aggregate"
|
||||
tel["allow_aggregate"] = False
|
||||
save_config(c)
|
||||
d = policy.resolve(load_config())
|
||||
assert d.allow_aggregate is False
|
||||
assert d.aggregate_enabled is False
|
||||
assert d.may_upload_aggregate() is False
|
||||
82
tests/telemetry/test_insights_integration.py
Normal file
82
tests/telemetry/test_insights_integration.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""Insights ↔ telemetry integration: the observability section in /insights output."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_state import SessionDB
|
||||
from agent.insights import InsightsEngine
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db(tmp_path):
|
||||
session_db = SessionDB(db_path=tmp_path / "ins_tel.db")
|
||||
yield session_db
|
||||
session_db.close()
|
||||
|
||||
|
||||
def _seed_telemetry(db_path):
|
||||
em = TelemetryEmitter(events_path=db_path.parent / "tel" / "events.jsonl", db_path=db_path)
|
||||
now = time.time_ns()
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
|
||||
platform="telegram", end_reason="completed",
|
||||
start_ns=now - 90_000_000, end_ns=now))
|
||||
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli",
|
||||
end_reason="failed", start_ns=now - 11_000_000, end_ns=now))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
|
||||
model="claude-opus-4", input_tokens=5000, output_tokens=800,
|
||||
cache_read_tokens=1000, latency_ms=2200))
|
||||
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search", result_class="ok"))
|
||||
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate", result_class="error"))
|
||||
em.flush()
|
||||
em.close()
|
||||
|
||||
|
||||
def test_report_includes_telemetry_when_present(db):
|
||||
# A session so generate() isn't the empty branch
|
||||
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
|
||||
_seed_telemetry(db.db_path)
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
tel = report.get("telemetry")
|
||||
assert tel, "telemetry section missing"
|
||||
assert tel["workflows"]["total_runs"] == 2
|
||||
assert tel["workflows"]["success_rate"] == 0.5
|
||||
assert tel["tool_calls"]["total"] == 2
|
||||
assert tel["tool_calls"]["failure_rate"] == 0.5
|
||||
assert tel["model_calls"]["by_provider"]["anthropic"] == 1
|
||||
|
||||
|
||||
def test_terminal_output_renders_observability_section(db):
|
||||
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
|
||||
_seed_telemetry(db.db_path)
|
||||
|
||||
engine = InsightsEngine(db)
|
||||
out = engine.format_terminal(engine.generate(days=30))
|
||||
assert "Observability" in out
|
||||
assert "Workflows:" in out
|
||||
assert "Failure rate:" in out
|
||||
assert "Providers:" in out
|
||||
|
||||
|
||||
def test_telemetry_section_absent_when_no_tel_rows(db):
|
||||
# Session present, but no telemetry events seeded.
|
||||
db.create_session(session_id="s1", source="cli", model="anthropic/claude-sonnet-4")
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report.get("telemetry") == {}
|
||||
out = engine.format_terminal(report)
|
||||
assert "Observability" not in out
|
||||
|
||||
|
||||
def test_empty_report_has_telemetry_key(db):
|
||||
# No sessions at all -> empty branch still carries the key (renderer-safe).
|
||||
engine = InsightsEngine(db)
|
||||
report = engine.generate(days=30)
|
||||
assert report.get("empty") is True
|
||||
assert report.get("telemetry") == {}
|
||||
171
tests/telemetry/test_otlp_exporter.py
Normal file
171
tests/telemetry/test_otlp_exporter.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""OTLP exporter tests.
|
||||
|
||||
Skip cleanly when the optional OTel SDK isn't installed. When it is, verify:
|
||||
* event -> OTel span attribute mapping
|
||||
* headers_env resolves the value from the named environment variable, not config
|
||||
* a failing or slow subscriber never breaks the emitter hot path
|
||||
* is_enabled / is_available gating
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_state
|
||||
from agent.telemetry import otlp_exporter as OE
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
|
||||
|
||||
otel = pytest.importorskip("opentelemetry.sdk.trace", reason="otlp extra not installed")
|
||||
|
||||
|
||||
def _in_memory_provider():
|
||||
"""A TracerProvider with an in-memory span exporter (no network)."""
|
||||
from opentelemetry.sdk.trace import TracerProvider
|
||||
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
|
||||
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
|
||||
provider = TracerProvider()
|
||||
mem = InMemorySpanExporter()
|
||||
provider.add_span_processor(SimpleSpanProcessor(mem))
|
||||
return provider, mem
|
||||
|
||||
|
||||
def test_event_maps_to_span_with_real_attrs():
|
||||
provider, mem = _in_memory_provider()
|
||||
batch = [
|
||||
{"event": "run", "entrypoint": "gateway", "platform": "telegram",
|
||||
"end_reason": "completed", "model_call_count": 2, "tool_call_count": 3},
|
||||
{"event": "model_call", "provider": "anthropic", "model": "claude-opus-4",
|
||||
"input_tokens": 5000, "output_tokens": 800},
|
||||
{"event": "tool_call", "tool_name": "web_search", "result_class": "ok"},
|
||||
]
|
||||
n = OE.export_batch(provider, batch)
|
||||
assert n == 3
|
||||
spans = mem.get_finished_spans()
|
||||
names = {s.name for s in spans}
|
||||
assert names == {"hermes.run", "hermes.model_call", "hermes.tool_call"}
|
||||
# real values present as span attributes
|
||||
run = [s for s in spans if s.name == "hermes.run"][0]
|
||||
assert run.attributes["hermes.entrypoint"] == "gateway"
|
||||
assert run.attributes["hermes.platform"] == "telegram"
|
||||
model = [s for s in spans if s.name == "hermes.model_call"][0]
|
||||
assert model.attributes["hermes.model"] == "claude-opus-4"
|
||||
assert model.attributes["hermes.provider"] == "anthropic"
|
||||
tool = [s for s in spans if s.name == "hermes.tool_call"][0]
|
||||
assert tool.attributes["hermes.tool_name"] == "web_search"
|
||||
|
||||
|
||||
def test_headers_resolve_from_env_not_value(monkeypatch):
|
||||
monkeypatch.setenv("MY_OTLP_TOKEN", "supersecretvalue")
|
||||
resolved = OE._resolve_headers({"Authorization": "MY_OTLP_TOKEN"})
|
||||
assert resolved == {"Authorization": "supersecretvalue"}
|
||||
# missing env var -> skipped, not crashed
|
||||
assert OE._resolve_headers({"X": "NOPE_NOT_SET"}) == {}
|
||||
|
||||
|
||||
def test_is_enabled_requires_endpoint_and_flag():
|
||||
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}) is True
|
||||
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": True}}}}) is False
|
||||
assert OE.is_enabled({"telemetry": {"export": {"otlp": {"enabled": False, "endpoint": "http://x"}}}}) is False
|
||||
assert OE.is_enabled({}) is False
|
||||
|
||||
|
||||
def test_require_sdk_routes_through_lazy_install(monkeypatch):
|
||||
# _require_sdk(auto_install=True) should call lazy_deps.ensure('export.otlp').
|
||||
import tools.lazy_deps as ld
|
||||
calls = []
|
||||
monkeypatch.setattr(ld, "ensure", lambda feature, **kw: calls.append((feature, kw)))
|
||||
OE._require_sdk(auto_install=True, prompt=False)
|
||||
assert calls == [("export.otlp", {"prompt": False})]
|
||||
|
||||
|
||||
def test_is_available_does_not_install(monkeypatch):
|
||||
# A pure availability check must NEVER trigger an install.
|
||||
import tools.lazy_deps as ld
|
||||
calls = []
|
||||
monkeypatch.setattr(ld, "ensure", lambda *a, **k: calls.append(a))
|
||||
OE.is_available()
|
||||
assert calls == []
|
||||
|
||||
|
||||
def test_export_otlp_feature_specs_match_pyproject():
|
||||
# The LAZY_DEPS entry must track the [otlp] extra in pyproject.toml.
|
||||
from tools.lazy_deps import feature_specs
|
||||
import pathlib, re
|
||||
specs = set(feature_specs("export.otlp"))
|
||||
pyproject = pathlib.Path(__file__).resolve().parents[2] / "pyproject.toml"
|
||||
text = pyproject.read_text(encoding="utf-8")
|
||||
m = re.search(r"^otlp\s*=\s*\[([^\]]*)\]", text, re.MULTILINE)
|
||||
assert m, "otlp extra not found in pyproject.toml"
|
||||
extra = set(re.findall(r'"([^"]+)"', m.group(1)))
|
||||
assert specs == extra, f"LAZY_DEPS {specs} != pyproject extra {extra}"
|
||||
|
||||
|
||||
def test_streamer_subscription_receives_events(tmp_path, monkeypatch):
|
||||
# Wire an OTLPStreamer-like subscriber via the in-memory provider.
|
||||
provider, mem = _in_memory_provider()
|
||||
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
|
||||
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
|
||||
|
||||
def subscriber(batch):
|
||||
OE.export_batch(provider, batch)
|
||||
|
||||
em.subscribe(subscriber)
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed"))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic", model="claude-opus-4"))
|
||||
em.flush()
|
||||
em.close()
|
||||
spans = mem.get_finished_spans()
|
||||
assert {s.name for s in spans} == {"hermes.run", "hermes.model_call"}
|
||||
|
||||
|
||||
def test_failing_subscriber_never_breaks_hot_path(tmp_path):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
|
||||
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
|
||||
|
||||
def bad_subscriber(batch):
|
||||
time.sleep(0.2)
|
||||
raise RuntimeError("OTLP collector down")
|
||||
|
||||
em.subscribe(bad_subscriber)
|
||||
start = time.monotonic()
|
||||
for i in range(30):
|
||||
em.emit(ModelCallEvent(span_id=f"s{i}", run_id="r1", input_tokens=1))
|
||||
elapsed = time.monotonic() - start
|
||||
# emit() returns immediately regardless of the broken subscriber
|
||||
assert elapsed < 1.0
|
||||
em.flush()
|
||||
# durable writes still happened despite the subscriber raising
|
||||
conn = sqlite3.connect(db)
|
||||
assert conn.execute("SELECT COUNT(*) FROM tel_model_calls").fetchone()[0] == 30
|
||||
conn.close()
|
||||
em.close()
|
||||
|
||||
|
||||
def test_export_once_reads_db_and_returns_count(tmp_path, monkeypatch):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db); conn.executescript(hermes_state.SCHEMA_SQL); conn.close()
|
||||
em = TelemetryEmitter(events_path=tmp_path / "t" / "e.jsonl", db_path=db)
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="cli", end_reason="completed",
|
||||
start_ns=time.time_ns(), end_ns=time.time_ns()))
|
||||
em.emit(ToolCallEvent(span_id="w1", run_id="r1", tool_name="web_search", result_class="ok"))
|
||||
em.flush(); em.close()
|
||||
|
||||
# Patch the provider builder to an in-memory one (no network). The processor
|
||||
# stand-in only needs force_flush(); provider.shutdown() works on the real one.
|
||||
provider, mem = _in_memory_provider()
|
||||
|
||||
class _Proc:
|
||||
def force_flush(self, *a, **k):
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(OE, "_make_provider", lambda config: (provider, _Proc()))
|
||||
n = OE.export_once({"telemetry": {"export": {"otlp": {"enabled": True, "endpoint": "http://x"}}}}, db_path=db)
|
||||
assert n == 2
|
||||
assert len(mem.get_finished_spans()) == 2
|
||||
53
tests/telemetry/test_plugin_autoload.py
Normal file
53
tests/telemetry/test_plugin_autoload.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Telemetry plugin auto-load gating: on by default, off when telemetry.local=false."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hermes_cli.plugins as plugins_mod
|
||||
|
||||
|
||||
def test_local_enabled_defaults_true(monkeypatch):
|
||||
monkeypatch.setattr(plugins_mod, "load_config", lambda: {}, raising=False)
|
||||
# With no telemetry section, the default is on.
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config", lambda: {}, raising=False
|
||||
)
|
||||
assert plugins_mod._telemetry_local_enabled() is True
|
||||
|
||||
|
||||
def test_local_disabled_when_config_false(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"telemetry": {"local": False}},
|
||||
raising=False,
|
||||
)
|
||||
assert plugins_mod._telemetry_local_enabled() is False
|
||||
|
||||
|
||||
def test_local_enabled_when_config_true(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"telemetry": {"local": True}},
|
||||
raising=False,
|
||||
)
|
||||
assert plugins_mod._telemetry_local_enabled() is True
|
||||
|
||||
|
||||
def test_malformed_config_defaults_on(monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"telemetry": "not a dict"},
|
||||
raising=False,
|
||||
)
|
||||
assert plugins_mod._telemetry_local_enabled() is True
|
||||
|
||||
|
||||
def test_plugin_manifest_is_discoverable():
|
||||
"""The bundled telemetry plugin.yaml exists and declares the lifecycle hooks."""
|
||||
from pathlib import Path
|
||||
import hermes_cli.plugins as p
|
||||
bundled = p.get_bundled_plugins_dir()
|
||||
manifest = bundled / "telemetry" / "plugin.yaml"
|
||||
assert manifest.exists(), f"missing {manifest}"
|
||||
text = manifest.read_text(encoding="utf-8")
|
||||
for hook in ("post_api_request", "post_tool_call", "on_session_finalize"):
|
||||
assert hook in text
|
||||
135
tests/telemetry/test_plugin_hooks.py
Normal file
135
tests/telemetry/test_plugin_hooks.py
Normal file
@@ -0,0 +1,135 @@
|
||||
"""Telemetry plugin hook tests — feed realistic kwargs, assert tel_* rows + no leak."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
|
||||
import pytest
|
||||
|
||||
import hermes_state
|
||||
from agent.telemetry import emitter as emitter_mod
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def wired(tmp_path, monkeypatch):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
conn.close()
|
||||
em = TelemetryEmitter(events_path=tmp_path / "telemetry" / "events.jsonl", db_path=db)
|
||||
emitter_mod.reset_emitter_for_tests(em)
|
||||
# reset the plugin's per-run accumulators between tests
|
||||
import plugins.telemetry as plug
|
||||
plug._runs.clear()
|
||||
yield db, em, plug
|
||||
em.flush()
|
||||
em.close()
|
||||
emitter_mod.reset_emitter_for_tests(None)
|
||||
|
||||
|
||||
def test_full_session_lifecycle_produces_rows(wired):
|
||||
db, em, plug = wired
|
||||
|
||||
plug._on_session_start(session_id="sess1", platform="telegram")
|
||||
plug._on_post_api_request(
|
||||
session_id="sess1", platform="telegram",
|
||||
provider="anthropic", base_url=None, model="claude-opus-4",
|
||||
api_duration=2.5,
|
||||
usage={"input_tokens": 5000, "output_tokens": 800, "cache_read_tokens": 1000,
|
||||
"cache_write_tokens": 0, "reasoning_tokens": 0},
|
||||
)
|
||||
plug._on_post_tool_call(
|
||||
session_id="sess1", platform="telegram",
|
||||
function_name="web_search", duration_ms=812, result="{\"data\": \"...\"}",
|
||||
)
|
||||
plug._on_session_finalize(
|
||||
session_id="sess1", platform="telegram",
|
||||
turn_exit_reason="completed", estimated_cost_usd=0.042, cost_status="known",
|
||||
)
|
||||
em.flush()
|
||||
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
run = conn.execute("SELECT * FROM tel_runs").fetchone()
|
||||
assert run is not None
|
||||
assert run["entrypoint"] == "gateway"
|
||||
assert run["platform"] == "telegram"
|
||||
assert run["end_reason"] == "completed"
|
||||
assert run["model_call_count"] == 1
|
||||
assert run["tool_call_count"] == 1
|
||||
assert abs(run["estimated_cost_usd"] - 0.042) < 1e-9
|
||||
|
||||
mc = conn.execute("SELECT * FROM tel_model_calls").fetchone()
|
||||
assert mc["provider"] == "anthropic"
|
||||
assert mc["model"] == "claude-opus-4"
|
||||
assert mc["input_tokens"] == 5000
|
||||
|
||||
tc = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
|
||||
assert tc["tool_name"] == "web_search"
|
||||
assert tc["result_class"] == "ok"
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_tool_error_result_classified_and_counted(wired):
|
||||
db, em, plug = wired
|
||||
plug._on_session_start(session_id="s2", platform="cli")
|
||||
plug._on_post_tool_call(
|
||||
session_id="s2", function_name="terminal", duration_ms=10,
|
||||
result="{\"error\": \"command failed\"}",
|
||||
)
|
||||
plug._on_session_finalize(session_id="s2", turn_exit_reason="completed")
|
||||
em.flush()
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
tc = conn.execute("SELECT result_class, tool_name FROM tel_tool_calls").fetchone()
|
||||
assert tc["result_class"] == "error"
|
||||
assert tc["tool_name"] == "terminal"
|
||||
run = conn.execute("SELECT error_count FROM tel_runs").fetchone()
|
||||
assert run["error_count"] == 1
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_api_error_recorded(wired):
|
||||
db, em, plug = wired
|
||||
plug._on_session_start(session_id="s3", platform="cli")
|
||||
plug._on_api_request_error(session_id="s3", error_type="provider timeout after 60s")
|
||||
plug._on_session_finalize(session_id="s3", failed=True)
|
||||
em.flush()
|
||||
conn = sqlite3.connect(db)
|
||||
conn.row_factory = sqlite3.Row
|
||||
err = conn.execute("SELECT error_class, subsystem FROM tel_error_events").fetchone()
|
||||
assert err["error_class"] == "provider_timeout"
|
||||
assert err["subsystem"] == "model_api"
|
||||
run = conn.execute("SELECT end_reason FROM tel_runs").fetchone()
|
||||
assert run["end_reason"] == "failed"
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_hooks_never_raise_on_garbage_kwargs(wired):
|
||||
_, em, plug = wired
|
||||
# Missing everything — must be swallowed by the _safe wrapper.
|
||||
plug._on_post_api_request()
|
||||
plug._on_post_tool_call()
|
||||
plug._on_session_finalize()
|
||||
plug._on_api_request_error()
|
||||
em.flush()
|
||||
|
||||
|
||||
def test_no_message_content_in_tool_rows(wired):
|
||||
"""The tool hook receives a result blob; only the classification persists, not content."""
|
||||
db, em, plug = wired
|
||||
plug._on_session_start(session_id="s4", platform="cli")
|
||||
secret = "{\"data\": \"USER SECRET sk-ABCDEF and /Users/alice/file.txt\"}"
|
||||
plug._on_post_tool_call(session_id="s4", function_name="web_search",
|
||||
duration_ms=5, result=secret)
|
||||
plug._on_session_finalize(session_id="s4")
|
||||
em.flush()
|
||||
# The whole tel_tool_calls row must contain none of the result content.
|
||||
conn = sqlite3.connect(db)
|
||||
row = conn.execute("SELECT * FROM tel_tool_calls").fetchone()
|
||||
conn.close()
|
||||
blob = " ".join(str(x) for x in row)
|
||||
assert "sk-ABCDEF" not in blob
|
||||
assert "/Users/alice" not in blob
|
||||
assert "SECRET" not in blob
|
||||
61
tests/telemetry/test_policy_consent.py
Normal file
61
tests/telemetry/test_policy_consent.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Consent posture + org-policy enforcement tests.
|
||||
|
||||
Consent is a single field (``telemetry.consent_state``); the aggregate opt-in is
|
||||
expressed by setting it to ``"aggregate"`` (via ``hermes config set`` or a managed-scope
|
||||
pin). ``allow_aggregate`` is the hard gate.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from agent.telemetry import policy
|
||||
|
||||
|
||||
def _cfg(**telemetry):
|
||||
return {"telemetry": telemetry}
|
||||
|
||||
|
||||
def test_default_posture_is_local_only():
|
||||
d = policy.resolve(_cfg(local=True, consent_state="unknown"))
|
||||
assert d.local_enabled is True
|
||||
assert d.aggregate_enabled is False
|
||||
assert d.may_upload_aggregate() is False
|
||||
|
||||
|
||||
def test_unknown_consent_never_uploads():
|
||||
# A headless box with no choice recorded: stays unknown, never uploads.
|
||||
d = policy.resolve(_cfg(local=True, consent_state="unknown"))
|
||||
assert d.may_upload_aggregate() is False
|
||||
|
||||
|
||||
def test_opted_in_uploads():
|
||||
d = policy.resolve(_cfg(local=True, consent_state="aggregate"))
|
||||
assert d.aggregate_enabled is True
|
||||
assert d.may_upload_aggregate() is True
|
||||
|
||||
|
||||
def test_declined_does_not_upload():
|
||||
d = policy.resolve(_cfg(local=True, consent_state="local"))
|
||||
assert d.may_upload_aggregate() is False
|
||||
|
||||
|
||||
def test_allow_aggregate_false_overrides_opt_in():
|
||||
# An admin pins telemetry.allow_aggregate: false via managed scope.
|
||||
cfg = _cfg(local=True, consent_state="aggregate", allow_aggregate=False)
|
||||
d = policy.resolve(cfg)
|
||||
assert d.allow_aggregate is False
|
||||
assert d.aggregate_enabled is False
|
||||
assert d.may_upload_aggregate() is False # the hard gate wins
|
||||
|
||||
|
||||
def test_invalid_consent_state_treated_as_unknown():
|
||||
d = policy.resolve(_cfg(local=True, consent_state="bogus"))
|
||||
assert d.consent_state == "unknown"
|
||||
assert d.may_upload_aggregate() is False
|
||||
|
||||
|
||||
def test_install_id_minted_when_empty_and_stable_when_set():
|
||||
cfg = _cfg(install_id="")
|
||||
minted = policy.ensure_install_id(cfg)
|
||||
assert minted and len(minted) >= 32 # uuid4
|
||||
cfg2 = _cfg(install_id="fixed-id")
|
||||
assert policy.ensure_install_id(cfg2) == "fixed-id"
|
||||
88
tests/telemetry/test_rollup.py
Normal file
88
tests/telemetry/test_rollup.py
Normal file
@@ -0,0 +1,88 @@
|
||||
"""rollup tests: tel_* -> per-run summary events with REAL values (local only)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
import hermes_state
|
||||
from agent.telemetry import rollup
|
||||
from agent.telemetry.emitter import TelemetryEmitter
|
||||
from agent.telemetry.events import ModelCallEvent, RunEvent, ToolCallEvent
|
||||
|
||||
|
||||
def _seed(tmp_path):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
conn.close()
|
||||
em = TelemetryEmitter(events_path=tmp_path / "tel" / "e.jsonl", db_path=db)
|
||||
now = time.time_ns()
|
||||
em.emit(RunEvent(run_id="r1", trace_id="t1", entrypoint="gateway",
|
||||
platform="telegram", end_reason="completed",
|
||||
start_ns=now - 90_000_000, end_ns=now,
|
||||
model_call_count=2, tool_call_count=2, estimated_cost_usd=2.1))
|
||||
em.emit(ModelCallEvent(span_id="m1", run_id="r1", provider="anthropic",
|
||||
model="claude-opus-4", input_tokens=60000, output_tokens=8000))
|
||||
em.emit(ModelCallEvent(span_id="m2", run_id="r1", provider="anthropic",
|
||||
model="claude-opus-4", input_tokens=5000, output_tokens=500))
|
||||
em.emit(ToolCallEvent(span_id="tc1", run_id="r1", tool_name="web_search",
|
||||
result_class="ok"))
|
||||
em.emit(ToolCallEvent(span_id="tc2", run_id="r1", tool_name="browser_navigate",
|
||||
result_class="ok"))
|
||||
# an in-progress run (no end_ns) must be excluded
|
||||
em.emit(RunEvent(run_id="r2", trace_id="t2", entrypoint="cli", start_ns=now))
|
||||
em.flush()
|
||||
em.close()
|
||||
return db
|
||||
|
||||
|
||||
def test_builds_one_event_per_completed_run_with_real_values(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db,
|
||||
include_heartbeat=False)
|
||||
wf = [e for e in events if e["event_name"] == "workflow_completed"]
|
||||
assert len(wf) == 1 # r2 (no end_ns) excluded
|
||||
e = wf[0]
|
||||
assert e["entrypoint"] == "gateway"
|
||||
assert e["platform"] == "telegram"
|
||||
# REAL model id + provider, not a bucket/class
|
||||
models = {m["model"] for m in e["models_used"]}
|
||||
assert models == {"claude-opus-4"}
|
||||
assert e["models_used"][0]["provider"] == "anthropic"
|
||||
assert sorted(e["tools_used"]) == ["browser_navigate", "web_search"]
|
||||
# real token totals, not buckets
|
||||
assert e["input_tokens"] == 65000
|
||||
assert e["output_tokens"] == 8500
|
||||
|
||||
|
||||
def test_real_model_and_tool_names_present(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
|
||||
blob = " ".join(str(v) for e in events for v in e.values())
|
||||
assert "claude-opus-4" in blob
|
||||
assert "web_search" in blob
|
||||
|
||||
|
||||
def test_heartbeat_included_by_default(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
|
||||
assert any(e["event_name"] == "heartbeat" for e in events)
|
||||
|
||||
|
||||
def test_summarize_counts_by_event_name(tmp_path):
|
||||
db = _seed(tmp_path)
|
||||
events = rollup.build_aggregate_events(install_id="fixed-id", db_path=db)
|
||||
s = rollup.summarize(events)
|
||||
assert s["total"] == len(events)
|
||||
assert s["by_event_name"]["workflow_completed"] == 1
|
||||
assert s["by_event_name"]["heartbeat"] == 1
|
||||
|
||||
|
||||
def test_empty_db_yields_only_heartbeat(tmp_path):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
conn.close()
|
||||
events = rollup.build_aggregate_events(install_id="x", db_path=db)
|
||||
assert [e["event_name"] for e in events] == ["heartbeat"]
|
||||
46
tests/telemetry/test_schema_migration.py
Normal file
46
tests/telemetry/test_schema_migration.py
Normal file
@@ -0,0 +1,46 @@
|
||||
"""Schema tests: tel_* tables exist after init; SCHEMA_VERSION bumped."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sqlite3
|
||||
|
||||
import hermes_state
|
||||
|
||||
|
||||
TEL_TABLES = {
|
||||
"tel_runs", "tel_spans", "tel_model_calls", "tel_tool_calls",
|
||||
"tel_gateway_events", "tel_cron_events", "tel_skill_events",
|
||||
"tel_memory_events", "tel_feedback_events", "tel_error_events",
|
||||
}
|
||||
|
||||
|
||||
def test_schema_version_is_17_or_higher():
|
||||
assert hermes_state.SCHEMA_VERSION >= 17
|
||||
|
||||
|
||||
def test_tel_tables_present_in_schema_sql():
|
||||
for tbl in TEL_TABLES:
|
||||
assert f"CREATE TABLE IF NOT EXISTS {tbl}" in hermes_state.SCHEMA_SQL
|
||||
|
||||
|
||||
def test_tel_tables_created_on_executescript(tmp_path):
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
rows = {
|
||||
r[0]
|
||||
for r in conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table'"
|
||||
).fetchall()
|
||||
}
|
||||
conn.close()
|
||||
assert TEL_TABLES.issubset(rows), f"missing: {TEL_TABLES - rows}"
|
||||
|
||||
|
||||
def test_executescript_is_idempotent(tmp_path):
|
||||
# IF NOT EXISTS means re-running on an existing DB is a no-op, not an error.
|
||||
db = tmp_path / "state.db"
|
||||
conn = sqlite3.connect(db)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL)
|
||||
conn.executescript(hermes_state.SCHEMA_SQL) # second run must not raise
|
||||
conn.close()
|
||||
@@ -116,6 +116,15 @@ LAZY_DEPS: dict[str, tuple[str, ...]] = {
|
||||
# ─── Image generation backends ─────────────────────────────────────────
|
||||
"image.fal": ("fal-client==0.13.1",),
|
||||
|
||||
# ─── Observability ─────────────────────────────────────────────────────
|
||||
# OTLP telemetry export. Lazily installed on first use of
|
||||
# `hermes telemetry export --otlp`. Tracks the `otlp` extra in
|
||||
# pyproject.toml — bump both together.
|
||||
"export.otlp": (
|
||||
"opentelemetry-sdk==1.30.0",
|
||||
"opentelemetry-exporter-otlp-proto-http==1.30.0",
|
||||
),
|
||||
|
||||
# ─── Memory providers ──────────────────────────────────────────────────
|
||||
"memory.honcho": ("honcho-ai==2.0.1",),
|
||||
"memory.hindsight": ("hindsight-client==0.6.1",),
|
||||
|
||||
Reference in New Issue
Block a user