#!/usr/bin/env python3 """Drive the Hermes TUI under HERMES_DEV_PERF and summarize the pipeline. Usage: scripts/profile-tui.py [--session SID] [--hold KEY] [--seconds N] [--rate HZ] Defaults: picks the session with the most messages, holds PageUp for 8s at ~30 Hz (matching xterm key-repeat), summarizes ~/.hermes/perf.log on exit. The --tui build must exist (run `npm run build` in ui-tui first). This script launches `node dist/entry.js` directly with HERMES_TUI_RESUME set so it bypasses the hermes_cli wrapper — we want repeatable timing, not the CLI's session-picker flow. Environment overrides: HERMES_PERF_LOG (default ~/.hermes/perf.log) HERMES_PERF_NODE (default node from $PATH) HERMES_TUI_DIR (default /home/bb/hermes-agent/ui-tui) Exit code is 0 if the harness ran and parsed results, 2 if the TUI crashed or produced no perf data (suggests HERMES_DEV_PERF wiring is broken). """ from __future__ import annotations import argparse import json import os import pty import select import signal import sqlite3 import sys import time from pathlib import Path from typing import Any DEFAULT_TUI_DIR = Path(os.environ.get("HERMES_TUI_DIR", "/home/bb/hermes-agent/ui-tui")) DEFAULT_LOG = Path(os.environ.get("HERMES_PERF_LOG", str(Path.home() / ".hermes" / "perf.log"))) DEFAULT_STATE_DB = Path.home() / ".hermes" / "state.db" # Keystroke escape sequences. Matches what xterm/VT220 send when the # terminal has bracketed-paste disabled and the key-repeat handler fires. KEYS = { "page_up": b"\x1b[5~", "page_down": b"\x1b[6~", "wheel_up": b"\x1b[M`!!", # mouse wheel up (SGR-less) — best-effort "shift_up": b"\x1b[1;2A", "shift_down": b"\x1b[1;2B", } def pick_longest_session(db: Path) -> str: conn = sqlite3.connect(db) row = conn.execute( "SELECT id FROM sessions s ORDER BY " "(SELECT COUNT(*) FROM messages m WHERE m.session_id = s.id) DESC LIMIT 1" ).fetchone() if not row: sys.exit(f"no sessions in {db}") return row[0] def drain(fd: int, timeout: float) -> bytes: """Read whatever's available from fd within `timeout`, then return.""" chunks = [] end = time.monotonic() + timeout while time.monotonic() < end: r, _, _ = select.select([fd], [], [], max(0.0, end - time.monotonic())) if not r: break try: data = os.read(fd, 4096) except OSError: break if not data: break chunks.append(data) return b"".join(chunks) def hold_key(fd: int, seq: bytes, seconds: float, rate_hz: int) -> int: """Write `seq` to fd at ~rate_hz for `seconds`. Returns keystrokes sent.""" interval = 1.0 / max(1, rate_hz) end = time.monotonic() + seconds sent = 0 while time.monotonic() < end: try: os.write(fd, seq) sent += 1 except OSError: break # Drain stdout to keep the PTY buffer flowing; ignore content. drain(fd, 0) time.sleep(interval) return sent def summarize(log: Path, since_ts_ms: int) -> dict[str, Any]: """Parse perf.log, keep only events newer than since_ts_ms, return stats.""" react_events: list[dict[str, Any]] = [] frame_events: list[dict[str, Any]] = [] if not log.exists(): return {"error": f"no log at {log}", "react": [], "frame": []} for line in log.read_text().splitlines(): line = line.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: continue if int(row.get("ts", 0)) < since_ts_ms: continue src = row.get("src") if src == "react": react_events.append(row) elif src == "frame": frame_events.append(row) return { "react": react_events, "frame": frame_events, } def pct(values: list[float], p: float) -> float: if not values: return 0.0 s = sorted(values) idx = min(len(s) - 1, int(len(s) * p)) return s[idx] def format_report(data: dict[str, Any]) -> str: react = data.get("react") or [] frames = data.get("frame") or [] out = [] out.append("═══ React Profiler ═══") if not react: out.append(" (no react events — HERMES_DEV_PERF wired? threshold too high?)") else: by_id: dict[str, list[float]] = {} for r in react: by_id.setdefault(r["id"], []).append(r["actualMs"]) out.append(f" {'pane':<14} {'count':>6} {'p50':>8} {'p95':>8} {'p99':>8} {'max':>8}") for pid, ms in sorted(by_id.items(), key=lambda kv: -pct(kv[1], 0.99)): out.append( f" {pid:<14} {len(ms):>6} {pct(ms,0.50):>8.2f} {pct(ms,0.95):>8.2f} " f"{pct(ms,0.99):>8.2f} {max(ms):>8.2f}" ) out.append("") out.append("═══ Ink pipeline ═══") if not frames: out.append(" (no frame events — onFrame wiring broken?)") else: dur = [f["durationMs"] for f in frames] phases_present = any(f.get("phases") for f in frames) out.append(f" frames captured: {len(frames)}") out.append( f" durationMs p50={pct(dur,0.50):.2f} p95={pct(dur,0.95):.2f} " f"p99={pct(dur,0.99):.2f} max={max(dur):.2f}" ) # Effective FPS during the run: frames / elapsed seconds. ts = sorted(f["ts"] for f in frames) if len(ts) >= 2: elapsed_s = (ts[-1] - ts[0]) / 1000.0 fps = len(frames) / elapsed_s if elapsed_s > 0 else float("inf") out.append(f" throughput: {len(frames)} frames / {elapsed_s:.2f}s = {fps:.1f} fps") if phases_present: fields = ["yoga", "renderer", "diff", "optimize", "write", "commit"] out.append("") out.append(f" {'phase':<10} {'p50':>8} {'p95':>8} {'p99':>8} {'max':>8} (ms)") for field in fields: vals = [f["phases"][field] for f in frames if f.get("phases")] if vals: out.append( f" {field:<10} {pct(vals,0.50):>8.2f} {pct(vals,0.95):>8.2f} " f"{pct(vals,0.99):>8.2f} {max(vals):>8.2f}" ) # Derived: sum of phases vs durationMs (reveals hidden time). sum_ps = [ sum(f["phases"][k] for k in fields) for f in frames if f.get("phases") ] if sum_ps: dur_match = [f["durationMs"] for f in frames if f.get("phases")] deltas = [d - s for d, s in zip(dur_match, sum_ps)] out.append( f" {'dur-Σphases':<10} {pct(deltas,0.50):>8.2f} {pct(deltas,0.95):>8.2f} " f"{pct(deltas,0.99):>8.2f} {max(deltas):>8.2f} (unaccounted-for time)" ) # Yoga counters visited = [f["phases"]["yogaVisited"] for f in frames if f.get("phases")] measured = [f["phases"]["yogaMeasured"] for f in frames if f.get("phases")] cache_hits = [f["phases"]["yogaCacheHits"] for f in frames if f.get("phases")] live = [f["phases"]["yogaLive"] for f in frames if f.get("phases")] out.append("") out.append(" Yoga counters (per frame):") for name, vals in ( ("visited", visited), ("measured", measured), ("cacheHits", cache_hits), ("live", live), ): if vals: out.append(f" {name:<11} p50={pct(vals,0.5):.0f} p99={pct(vals,0.99):.0f} max={max(vals)}") # Patch counts — proxy for "how much changed each frame" patches = [f["phases"]["patches"] for f in frames if f.get("phases")] if patches: out.append( f" patches p50={pct(patches,0.5):.0f} p99={pct(patches,0.99):.0f} " f"max={max(patches)} total={sum(patches)}" ) optimized = [ f["phases"].get("optimizedPatches", 0) for f in frames if f.get("phases") ] if any(optimized): out.append( f" optimized p50={pct(optimized,0.5):.0f} p99={pct(optimized,0.99):.0f} " f"max={max(optimized)} total={sum(optimized)}" f" (ratio: {sum(optimized)/max(1,sum(patches)):.2f})" ) # Write bytes + drain telemetry — the outer-terminal bottleneck gauge. bytes_written = [ f["phases"].get("writeBytes", 0) for f in frames if f.get("phases") ] if any(bytes_written): total_b = sum(bytes_written) kb = total_b / 1024 out.append( f" writeBytes p50={pct(bytes_written,0.5):.0f}B p99={pct(bytes_written,0.99):.0f}B " f"max={max(bytes_written)}B total={kb:.1f}KB" ) drains = [ f["phases"].get("prevFrameDrainMs", 0) for f in frames if f.get("phases") ] if any(d > 0 for d in drains): nonzero = [d for d in drains if d > 0] out.append( f" drainMs p50={pct(nonzero,0.5):.2f} p95={pct(nonzero,0.95):.2f} " f"p99={pct(nonzero,0.99):.2f} max={max(nonzero):.2f} (terminal flush latency)" ) backpressure = sum(1 for f in frames if f.get("phases", {}).get("backpressure")) if backpressure: out.append( f" backpressure: {backpressure}/{len(frames)} frames " f"({100*backpressure/len(frames):.0f}%) (Node stdout buffer full — terminal slow)" ) # Flickers flicker_frames = [f for f in frames if f.get("flickers")] if flicker_frames: out.append("") out.append(f" ⚠ flickers detected in {len(flicker_frames)} frames") reasons: dict[str, int] = {} for f in flicker_frames: for fl in f["flickers"]: reasons[fl["reason"]] = reasons.get(fl["reason"], 0) + 1 for reason, n in sorted(reasons.items(), key=lambda kv: -kv[1]): out.append(f" {reason}: {n}") return "\n".join(out) def key_metrics(data: dict[str, Any]) -> dict[str, float]: """Flatten the report into a dict of scalar metrics for A/B diffing.""" metrics: dict[str, float] = {} frames = data.get("frame") or [] react = data.get("react") or [] if frames: durs = [f["durationMs"] for f in frames] metrics["frames"] = len(frames) metrics["dur_p50"] = pct(durs, 0.50) metrics["dur_p95"] = pct(durs, 0.95) metrics["dur_p99"] = pct(durs, 0.99) metrics["dur_max"] = max(durs) ts = sorted(f["ts"] for f in frames) if len(ts) >= 2: elapsed = (ts[-1] - ts[0]) / 1000.0 metrics["fps_throughput"] = len(frames) / elapsed if elapsed > 0 else 0.0 # Interframe gaps distribution — complementary view to throughput: gaps = [ts[i] - ts[i - 1] for i in range(1, len(ts))] if gaps: metrics["gap_p50_ms"] = pct(gaps, 0.50) metrics["gap_p99_ms"] = pct(gaps, 0.99) metrics["gaps_under_16ms"] = sum(1 for g in gaps if g < 16) metrics["gaps_over_200ms"] = sum(1 for g in gaps if g >= 200) for phase in ("renderer", "yoga", "diff", "write"): vals = [f["phases"][phase] for f in frames if f.get("phases")] if vals: metrics[f"{phase}_p99"] = pct(vals, 0.99) metrics[f"{phase}_max"] = max(vals) patches = [f["phases"]["patches"] for f in frames if f.get("phases")] if patches: metrics["patches_total"] = sum(patches) metrics["patches_p99"] = pct(patches, 0.99) optimized = [ f["phases"].get("optimizedPatches", 0) for f in frames if f.get("phases") ] if any(optimized): metrics["optimized_total"] = sum(optimized) bytes_list = [ f["phases"].get("writeBytes", 0) for f in frames if f.get("phases") ] if any(bytes_list): metrics["writeBytes_total"] = sum(bytes_list) drains = [ f["phases"].get("prevFrameDrainMs", 0) for f in frames if f.get("phases") ] drain_nonzero = [d for d in drains if d > 0] if drain_nonzero: metrics["drain_p99"] = pct(drain_nonzero, 0.99) metrics["drain_max"] = max(drain_nonzero) bp = sum(1 for f in frames if f.get("phases", {}).get("backpressure")) metrics["backpressure_frames"] = bp if react: for pid in set(e["id"] for e in react): ms = [e["actualMs"] for e in react if e["id"] == pid] metrics[f"react_{pid}_p99"] = pct(ms, 0.99) metrics[f"react_{pid}_max"] = max(ms) return metrics def format_diff(before: dict[str, float], after: dict[str, float]) -> str: """Render a side-by-side A/B comparison table.""" keys = sorted(set(before) | set(after)) lines = [f"{'metric':<28} {'before':>12} {'after':>12} {'delta':>12} {'%':>6}"] lines.append("─" * 76) for k in keys: b = before.get(k, 0.0) a = after.get(k, 0.0) d = a - b pct_change = ((a / b) - 1) * 100 if b not in (0, 0.0) else float("inf") if a else 0 # Flag improvements vs regressions. For _p99 / _max / _total / gaps_over / # patches / writeBytes / backpressure, LOWER is better. For fps / gaps_under, # HIGHER is better. lower_is_better = any( token in k for token in ( "p50", "p95", "p99", "_max", "_total", "gaps_over", "backpressure", "drain", ) ) higher_is_better = "fps_" in k or "gaps_under" in k mark = "" if d and not (lower_is_better or higher_is_better): mark = "" elif d < 0 and lower_is_better: mark = "↓" elif d > 0 and higher_is_better: mark = "↑" elif d > 0 and lower_is_better: mark = "↑" # regression elif d < 0 and higher_is_better: mark = "↓" # regression pct_str = "—" if pct_change == float("inf") else f"{pct_change:+6.1f}%" lines.append( f"{k:<28} {b:>12.2f} {a:>12.2f} {d:>+12.2f} {pct_str} {mark}" ) return "\n".join(lines) def run_once(args: argparse.Namespace) -> dict[str, Any]: tui_dir = Path(args.tui_dir).resolve() entry = tui_dir / "dist" / "entry.js" if not entry.exists(): sys.exit(f"{entry} missing — run `npm run build` in {tui_dir} first") sid = args.session or pick_longest_session(DEFAULT_STATE_DB) print(f"• session: {sid}") print(f"• hold: {args.hold} x {args.rate}Hz for {args.seconds}s after {args.warmup}s warmup") print(f"• terminal: {args.cols}x{args.rows}") log = Path(args.log) if not args.keep_log and log.exists(): log.unlink() since_ms = int(time.time() * 1000) env = os.environ.copy() env["HERMES_DEV_PERF"] = "1" env["HERMES_DEV_PERF_MS"] = str(args.threshold_ms) env["HERMES_DEV_PERF_LOG"] = str(log) env["HERMES_TUI_RESUME"] = sid env["COLUMNS"] = str(args.cols) env["LINES"] = str(args.rows) env["TERM"] = env.get("TERM", "xterm-256color") # Pass through extra flags the TUI wrapper recognizes (e.g. --no-fullscreen). # Stored on args as `extra_flags` list. node = os.environ.get("HERMES_PERF_NODE", "node") node_args = [node, str(entry), *getattr(args, "extra_flags", [])] pid, fd = pty.fork() if pid == 0: os.execvpe(node, node_args, env) try: import fcntl, struct, termios winsize = struct.pack("HHHH", args.rows, args.cols, 0, 0) fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) print(f"• pid: {pid} fd: {fd}") print(f"• warmup {args.warmup}s (drain startup output)…") drain(fd, args.warmup) print(f"• holding {args.hold}…") sent = hold_key(fd, KEYS[args.hold], args.seconds, args.rate) print(f" sent {sent} keystrokes") drain(fd, 0.5) finally: try: os.kill(pid, signal.SIGTERM) for _ in range(10): pid_done, _ = os.waitpid(pid, os.WNOHANG) if pid_done == pid: break time.sleep(0.1) else: os.kill(pid, signal.SIGKILL) os.waitpid(pid, 0) except (ProcessLookupError, ChildProcessError): pass try: os.close(fd) except OSError: pass time.sleep(0.2) return summarize(log, since_ms) def main() -> int: p = argparse.ArgumentParser() p.add_argument("--session", help="session id to resume (default: longest in db)") p.add_argument("--hold", default="page_up", choices=sorted(KEYS.keys()), help="key to hold") p.add_argument("--seconds", type=float, default=8.0, help="how long to hold the key") p.add_argument("--rate", type=int, default=30, help="keystrokes per second") p.add_argument("--warmup", type=float, default=3.0, help="seconds to wait after launch before input") p.add_argument("--threshold-ms", type=float, default=0.0, help="HERMES_DEV_PERF_MS (0 = capture all)") p.add_argument("--cols", type=int, default=120) p.add_argument("--rows", type=int, default=40) p.add_argument("--keep-log", action="store_true", help="don't wipe perf.log before run") p.add_argument("--tui-dir", default=str(DEFAULT_TUI_DIR)) p.add_argument("--log", default=str(DEFAULT_LOG)) p.add_argument("--save", metavar="LABEL", help="save the final metrics as /tmp/perf-