diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index af4cb210dd5..5c35b69a611 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -194,6 +194,12 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_complete.add_argument("task_ids", nargs="+", help="One or more task ids (only --result applies to all of them)") p_complete.add_argument("--result", default=None, help="Result summary") + p_complete.add_argument("--summary", default=None, + help="Structured handoff summary for downstream tasks. " + "Falls back to --result if omitted.") + p_complete.add_argument("--metadata", default=None, + help='JSON dict of structured facts (e.g. \'{"changed_files": [...], ' + '"tests_run": 12}\'). Stored on the closing run.') p_block = sub.add_parser("block", help="Mark one or more tasks blocked") p_block.add_argument("task_id") @@ -301,6 +307,15 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_log.add_argument("--tail", type=int, default=None, help="Only print the last N bytes") + # --- runs (per-attempt history for a task) --- + p_runs = sub.add_parser( + "runs", + help="Show attempt history for a task (one row per run: profile, " + "outcome, elapsed, summary)", + ) + p_runs.add_argument("task_id") + p_runs.add_argument("--json", action="store_true") + # --- heartbeat (worker liveness signal) --- p_hb = sub.add_parser( "heartbeat", @@ -383,6 +398,7 @@ def kanban_command(args: argparse.Namespace) -> int: "watch": _cmd_watch, "stats": _cmd_stats, "log": _cmd_log, + "runs": _cmd_runs, "heartbeat": _cmd_heartbeat, "assignees": _cmd_assignees, "notify-subscribe": _cmd_notify_subscribe, @@ -694,10 +710,25 @@ def _cmd_complete(args: argparse.Namespace) -> int: if not ids: print("at least one task_id is required", file=sys.stderr) return 1 + metadata = None + raw_meta = getattr(args, "metadata", None) + if raw_meta: + try: + metadata = json.loads(raw_meta) + if not isinstance(metadata, dict): + raise ValueError("must be a JSON object") + except (ValueError, json.JSONDecodeError) as exc: + print(f"kanban: --metadata: {exc}", file=sys.stderr) + return 2 failed: list[str] = [] with kb.connect() as conn: for tid in ids: - if not kb.complete_task(conn, tid, result=args.result): + if not kb.complete_task( + conn, tid, + result=args.result, + summary=getattr(args, "summary", None), + metadata=metadata, + ): failed.append(tid) print(f"cannot complete {tid} (unknown id or terminal state)", file=sys.stderr) else: @@ -993,6 +1024,45 @@ def _cmd_log(args: argparse.Namespace) -> int: return 0 +def _cmd_runs(args: argparse.Namespace) -> int: + """Show attempt history for a task.""" + with kb.connect() as conn: + runs = kb.list_runs(conn, args.task_id) + if getattr(args, "json", False): + print(json.dumps([ + { + "id": r.id, "profile": r.profile, "status": r.status, + "outcome": r.outcome, "started_at": r.started_at, + "ended_at": r.ended_at, "summary": r.summary, + "error": r.error, "metadata": r.metadata, + "worker_pid": r.worker_pid, "step_key": r.step_key, + } for r in runs + ], indent=2, ensure_ascii=False)) + return 0 + if not runs: + print(f"(no runs yet for {args.task_id})") + return 0 + print(f"{'#':3s} {'OUTCOME':12s} {'PROFILE':16s} {'ELAPSED':>8s} STARTED") + for i, r in enumerate(runs, 1): + end = r.ended_at or int(time.time()) + elapsed = end - r.started_at + if elapsed < 60: + el = f"{elapsed}s" + elif elapsed < 3600: + el = f"{elapsed // 60}m" + else: + el = f"{elapsed / 3600:.1f}h" + outcome = r.outcome or ("(running)" if not r.ended_at else r.status) + print(f"{i:3d} {outcome:12s} {(r.profile or '-'):16s} {el:>8s} {_fmt_ts(r.started_at)}") + if r.summary: + # Indent and truncate long summaries to keep the table readable. + summary = r.summary.splitlines()[0][:100] + print(f" → {summary}") + if r.error: + print(f" ✖ {r.error[:100]}") + return 0 + + def _cmd_context(args: argparse.Namespace) -> int: with kb.connect() as conn: text = kb.build_worker_context(conn, args.task_id) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index dd7d82cb855..4d43017945e 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -90,6 +90,9 @@ class Task: last_spawn_error: Optional[str] = None max_runtime_seconds: Optional[int] = None last_heartbeat_at: Optional[int] = None + current_run_id: Optional[int] = None + workflow_template_id: Optional[str] = None + current_step_key: Optional[str] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "Task": @@ -121,6 +124,69 @@ class Task: last_heartbeat_at=( row["last_heartbeat_at"] if "last_heartbeat_at" in keys else None ), + current_run_id=( + row["current_run_id"] if "current_run_id" in keys else None + ), + workflow_template_id=( + row["workflow_template_id"] if "workflow_template_id" in keys else None + ), + current_step_key=( + row["current_step_key"] if "current_step_key" in keys else None + ), + ) + + +@dataclass +class Run: + """In-memory view of a ``task_runs`` row. + + A run is one attempt to execute a task — created on claim, closed + on complete/block/crash/timeout/spawn_failure/reclaim. Multiple runs + per task when retries happen. Carries the claim machinery, PID, + heartbeat, and the structured handoff summary that downstream workers + read via ``build_worker_context``. + """ + + id: int + task_id: str + profile: Optional[str] + step_key: Optional[str] + status: str + claim_lock: Optional[str] + claim_expires: Optional[int] + worker_pid: Optional[int] + max_runtime_seconds: Optional[int] + last_heartbeat_at: Optional[int] + started_at: int + ended_at: Optional[int] + outcome: Optional[str] + summary: Optional[str] + metadata: Optional[dict] + error: Optional[str] + + @classmethod + def from_row(cls, row: sqlite3.Row) -> "Run": + try: + meta = json.loads(row["metadata"]) if row["metadata"] else None + except Exception: + meta = None + return cls( + id=int(row["id"]), + task_id=row["task_id"], + profile=row["profile"], + step_key=row["step_key"], + status=row["status"], + claim_lock=row["claim_lock"], + claim_expires=row["claim_expires"], + worker_pid=row["worker_pid"], + max_runtime_seconds=row["max_runtime_seconds"], + last_heartbeat_at=row["last_heartbeat_at"], + started_at=int(row["started_at"]), + ended_at=(int(row["ended_at"]) if row["ended_at"] is not None else None), + outcome=row["outcome"], + summary=row["summary"], + metadata=meta, + error=row["error"], ) @@ -148,28 +214,36 @@ class Event: SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS tasks ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - body TEXT, - assignee TEXT, - status TEXT NOT NULL, - priority INTEGER DEFAULT 0, - created_by TEXT, - created_at INTEGER NOT NULL, - started_at INTEGER, - completed_at INTEGER, - workspace_kind TEXT NOT NULL DEFAULT 'scratch', - workspace_path TEXT, - claim_lock TEXT, - claim_expires INTEGER, - tenant TEXT, - result TEXT, - idempotency_key TEXT, - spawn_failures INTEGER NOT NULL DEFAULT 0, - worker_pid INTEGER, - last_spawn_error TEXT, - max_runtime_seconds INTEGER, - last_heartbeat_at INTEGER + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + body TEXT, + assignee TEXT, + status TEXT NOT NULL, + priority INTEGER DEFAULT 0, + created_by TEXT, + created_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + workspace_kind TEXT NOT NULL DEFAULT 'scratch', + workspace_path TEXT, + claim_lock TEXT, + claim_expires INTEGER, + tenant TEXT, + result TEXT, + idempotency_key TEXT, + spawn_failures INTEGER NOT NULL DEFAULT 0, + worker_pid INTEGER, + last_spawn_error TEXT, + max_runtime_seconds INTEGER, + last_heartbeat_at INTEGER, + -- Pointer into task_runs for the currently-active run (NULL if no + -- run is in-flight). Denormalised for cheap reads. + current_run_id INTEGER, + -- Forward-compat for v2 workflow routing. In v1 the kernel writes + -- these when the task is opted into a template but otherwise ignores + -- them; the dispatcher doesn't consult them for routing yet. + workflow_template_id TEXT, + current_step_key TEXT ); CREATE TABLE IF NOT EXISTS task_links ( @@ -189,11 +263,41 @@ CREATE TABLE IF NOT EXISTS task_comments ( CREATE TABLE IF NOT EXISTS task_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, task_id TEXT NOT NULL, + run_id INTEGER, kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL ); +-- Historical attempt record. Each time the dispatcher claims a task, a +-- new row is created here; claim state, PID, heartbeat, runtime cap, +-- and structured summary all live on the run, not the task. Multiple +-- rows per task id when the task was retried after crash/timeout/block. +-- v2 of the kanban schema will use ``step_key`` to drive per-stage +-- workflow routing; in v1 the column is nullable and unused (kernel +-- ignores it). +CREATE TABLE IF NOT EXISTS task_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL, + profile TEXT, + step_key TEXT, + status TEXT NOT NULL, + -- status: running | done | blocked | crashed | timed_out | failed | released + claim_lock TEXT, + claim_expires INTEGER, + worker_pid INTEGER, + max_runtime_seconds INTEGER, + last_heartbeat_at INTEGER, + started_at INTEGER NOT NULL, + ended_at INTEGER, + outcome TEXT, + -- outcome: completed | blocked | crashed | timed_out | spawn_failed | + -- gave_up | reclaimed | (null while still running) + summary TEXT, + metadata TEXT, + error TEXT +); + -- Subscription from a gateway source (platform + chat + thread) to a -- task. The gateway's kanban-notifier watcher tails task_events and -- pushes ``completed`` / ``blocked`` / ``spawn_auto_blocked`` events to @@ -217,6 +321,9 @@ CREATE INDEX IF NOT EXISTS idx_links_child ON task_links(child_id); CREATE INDEX IF NOT EXISTS idx_links_parent ON task_links(parent_id); CREATE INDEX IF NOT EXISTS idx_comments_task ON task_comments(task_id, created_at); CREATE INDEX IF NOT EXISTS idx_events_task ON task_events(task_id, created_at); +CREATE INDEX IF NOT EXISTS idx_events_run ON task_events(run_id, id); +CREATE INDEX IF NOT EXISTS idx_runs_task ON task_runs(task_id, started_at); +CREATE INDEX IF NOT EXISTS idx_runs_status ON task_runs(status); CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_id); """ @@ -278,6 +385,60 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER") if "last_heartbeat_at" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER") + if "current_run_id" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN current_run_id INTEGER") + if "workflow_template_id" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN workflow_template_id TEXT") + if "current_step_key" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN current_step_key TEXT") + + # task_events gained a run_id column; back-fill it as NULL for + # historical events (they predate runs and can't be attributed). + ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")} + if "run_id" not in ev_cols: + conn.execute("ALTER TABLE task_events ADD COLUMN run_id INTEGER") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_events_run " + "ON task_events(run_id, id)" + ) + + # One-shot backfill: any task that is 'running' before runs existed + # had its claim_lock / claim_expires / worker_pid on the task row. + # Synthesize a matching task_runs row so subsequent end-run / heartbeat + # calls have something to write to. Safe to re-run: the check below + # skips tasks that already have a current_run_id. + runs_exist = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='task_runs'" + ).fetchone() is not None + if runs_exist: + inflight = conn.execute( + "SELECT id, assignee, claim_lock, claim_expires, worker_pid, " + " max_runtime_seconds, last_heartbeat_at, started_at " + "FROM tasks " + "WHERE status = 'running' AND (current_run_id IS NULL)" + ).fetchall() + for row in inflight: + started = row["started_at"] or int(time.time()) + cur = conn.execute( + """ + INSERT INTO task_runs ( + task_id, profile, status, + claim_lock, claim_expires, worker_pid, + max_runtime_seconds, last_heartbeat_at, + started_at + ) VALUES (?, ?, 'running', ?, ?, ?, ?, ?, ?) + """, + ( + row["id"], row["assignee"], row["claim_lock"], + row["claim_expires"], row["worker_pid"], + row["max_runtime_seconds"], row["last_heartbeat_at"], + started, + ), + ) + conn.execute( + "UPDATE tasks SET current_run_id = ? WHERE id = ?", + (cur.lastrowid, row["id"]), + ) # One-shot event-kind rename pass. The old names ("ready", "priority", # "spawn_auto_blocked") still worked but were awkward on the wire; @@ -723,17 +884,89 @@ def _append_event( task_id: str, kind: str, payload: Optional[dict] = None, + *, + run_id: Optional[int] = None, ) -> None: - """Record an event row. Called from within an already-open txn.""" + """Record an event row. Called from within an already-open txn. + + ``run_id`` is optional: pass the current run id so UIs can group + events by attempt. For events that aren't scoped to a single run + (task created/edited/archived, dependency promotion) leave it None + and the row carries NULL. + """ now = int(time.time()) pl = json.dumps(payload, ensure_ascii=False) if payload else None conn.execute( - "INSERT INTO task_events (task_id, kind, payload, created_at) " - "VALUES (?, ?, ?, ?)", - (task_id, kind, pl, now), + "INSERT INTO task_events (task_id, run_id, kind, payload, created_at) " + "VALUES (?, ?, ?, ?, ?)", + (task_id, run_id, kind, pl, now), ) +def _end_run( + conn: sqlite3.Connection, + task_id: str, + *, + outcome: str, + summary: Optional[str] = None, + error: Optional[str] = None, + metadata: Optional[dict] = None, + status: Optional[str] = None, +) -> Optional[int]: + """Close the currently-active run for ``task_id`` and clear the pointer. + + ``outcome`` is the semantic result (completed / blocked / crashed / + timed_out / spawn_failed / gave_up / reclaimed). ``status`` is the + run-row status (usually just ``outcome``, but callers can pass it + explicitly). Returns the closed run_id or ``None`` if no active run + existed (e.g. a CLI user calling ``hermes kanban complete`` on a + task that was never claimed). + """ + now = int(time.time()) + row = conn.execute( + "SELECT current_run_id FROM tasks WHERE id = ?", (task_id,), + ).fetchone() + if not row or not row["current_run_id"]: + return None + run_id = int(row["current_run_id"]) + conn.execute( + """ + UPDATE task_runs + SET status = ?, + outcome = ?, + summary = ?, + error = ?, + metadata = ?, + ended_at = ?, + claim_lock = NULL, + claim_expires = NULL, + worker_pid = NULL + WHERE id = ? + AND ended_at IS NULL + """, + ( + status or outcome, + outcome, + summary, + error, + json.dumps(metadata, ensure_ascii=False) if metadata else None, + now, + run_id, + ), + ) + conn.execute( + "UPDATE tasks SET current_run_id = NULL WHERE id = ?", (task_id,), + ) + return run_id + + +def _current_run_id(conn: sqlite3.Connection, task_id: str) -> Optional[int]: + row = conn.execute( + "SELECT current_run_id FROM tasks WHERE id = ?", (task_id,), + ).fetchone() + return int(row["current_run_id"]) if row and row["current_run_id"] else None + + # --------------------------------------------------------------------------- # Dependency resolution (todo -> ready) # --------------------------------------------------------------------------- @@ -802,7 +1035,41 @@ def claim_task( ) if cur.rowcount != 1: return None - _append_event(conn, task_id, "claimed", {"lock": lock, "expires": expires}) + # Look up the current task row so we can populate the run with + # its assignee / step / runtime cap. + trow = conn.execute( + "SELECT assignee, max_runtime_seconds, current_step_key " + "FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + run_cur = conn.execute( + """ + INSERT INTO task_runs ( + task_id, profile, step_key, status, + claim_lock, claim_expires, max_runtime_seconds, + started_at + ) VALUES (?, ?, ?, 'running', ?, ?, ?, ?) + """, + ( + task_id, + trow["assignee"] if trow else None, + trow["current_step_key"] if trow else None, + lock, + expires, + trow["max_runtime_seconds"] if trow else None, + now, + ), + ) + run_id = run_cur.lastrowid + conn.execute( + "UPDATE tasks SET current_run_id = ? WHERE id = ?", + (run_id, task_id), + ) + _append_event( + conn, task_id, "claimed", + {"lock": lock, "expires": expires, "run_id": run_id}, + run_id=run_id, + ) return get_task(conn, task_id) @@ -826,7 +1093,15 @@ def heartbeat_claim( "WHERE id = ? AND status = 'running' AND claim_lock = ?", (expires, task_id, lock), ) - return cur.rowcount == 1 + if cur.rowcount == 1: + run_id = _current_run_id(conn, task_id) + if run_id is not None: + conn.execute( + "UPDATE task_runs SET claim_expires = ? WHERE id = ?", + (expires, run_id), + ) + return True + return False def release_stale_claims(conn: sqlite3.Connection) -> int: @@ -849,9 +1124,15 @@ def release_stale_claims(conn: sqlite3.Connection) -> int: "WHERE id = ? AND status = 'running'", (row["id"],), ) + run_id = _end_run( + conn, row["id"], + outcome="reclaimed", status="reclaimed", + error=f"stale_lock={row['claim_lock']}", + ) _append_event( conn, row["id"], "reclaimed", {"stale_lock": row["claim_lock"]}, + run_id=run_id, ) reclaimed += 1 return reclaimed @@ -862,12 +1143,21 @@ def complete_task( task_id: str, *, result: Optional[str] = None, + summary: Optional[str] = None, + metadata: Optional[dict] = None, ) -> bool: """Transition ``running|ready -> done`` and record ``result``. Accepts a task that's merely ``ready`` too, so a manual CLI completion (``hermes kanban complete ``) works without requiring a claim/start/complete sequence. + + ``summary`` and ``metadata`` are stored on the closing run (if any) + and surfaced to downstream children via :func:`build_worker_context`. + When ``summary`` is omitted we fall back to ``result`` so single-run + callers don't have to pass both. ``metadata`` is a free-form dict + (e.g. ``{"changed_files": [...], "tests_run": [...]}``) — workers + are encouraged to use it for structured handoff facts. """ now = int(time.time()) with write_txn(conn): @@ -887,9 +1177,16 @@ def complete_task( ) if cur.rowcount != 1: return False + run_id = _end_run( + conn, task_id, + outcome="completed", status="done", + summary=summary if summary is not None else result, + metadata=metadata, + ) _append_event( conn, task_id, "completed", {"result_len": len(result) if result else 0}, + run_id=run_id, ) # Recompute ready status for dependents (separate txn so children see done). recompute_ready(conn) @@ -918,7 +1215,12 @@ def block_task( ) if cur.rowcount != 1: return False - _append_event(conn, task_id, "blocked", {"reason": reason}) + run_id = _end_run( + conn, task_id, + outcome="blocked", status="blocked", + summary=reason, + ) + _append_event(conn, task_id, "blocked", {"reason": reason}, run_id=run_id) return True @@ -1076,9 +1378,16 @@ def heartbeat_worker( ) if cur.rowcount != 1: return False + run_id = _current_run_id(conn, task_id) + if run_id is not None: + conn.execute( + "UPDATE task_runs SET last_heartbeat_at = ? WHERE id = ?", + (now, run_id), + ) _append_event( conn, task_id, "heartbeat", {"note": note} if note else None, + run_id=run_id, ) return True @@ -1154,14 +1463,20 @@ def enforce_max_runtime( (tid,), ) if cur.rowcount == 1: + payload = { + "pid": pid, + "elapsed_seconds": int(elapsed), + "limit_seconds": int(row["max_runtime_seconds"]), + "sigkill": killed, + } + run_id = _end_run( + conn, tid, + outcome="timed_out", status="timed_out", + error=f"elapsed {int(elapsed)}s > limit {int(row['max_runtime_seconds'])}s", + metadata=payload, + ) _append_event( - conn, tid, "timed_out", - { - "pid": pid, - "elapsed_seconds": int(elapsed), - "limit_seconds": int(row["max_runtime_seconds"]), - "sigkill": killed, - }, + conn, tid, "timed_out", payload, run_id=run_id, ) timed_out.append(tid) return timed_out @@ -1215,9 +1530,19 @@ def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: (row["id"],), ) if cur.rowcount == 1: + run_id = _end_run( + conn, row["id"], + outcome="crashed", status="crashed", + error=f"pid {int(row['worker_pid'])} not alive", + metadata={ + "pid": int(row["worker_pid"]), + "claimer": row["claim_lock"], + }, + ) _append_event( conn, row["id"], "crashed", {"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]}, + run_id=run_id, ) crashed.append(row["id"]) return crashed @@ -1249,9 +1574,16 @@ def _record_spawn_failure( "WHERE id = ? AND status IN ('running', 'ready')", (failures, error[:500], task_id), ) + run_id = _end_run( + conn, task_id, + outcome="gave_up", status="gave_up", + error=error[:500], + metadata={"failures": failures}, + ) _append_event( conn, task_id, "gave_up", {"failures": failures, "error": error[:500]}, + run_id=run_id, ) blocked = True else: @@ -1262,9 +1594,16 @@ def _record_spawn_failure( "WHERE id = ? AND status = 'running'", (failures, error[:500], task_id), ) + run_id = _end_run( + conn, task_id, + outcome="spawn_failed", status="spawn_failed", + error=error[:500], + metadata={"failures": failures}, + ) _append_event( conn, task_id, "spawn_failed", {"error": error[:500], "failures": failures}, + run_id=run_id, ) return blocked @@ -1281,7 +1620,13 @@ def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None: "UPDATE tasks SET worker_pid = ? WHERE id = ?", (int(pid), task_id), ) - _append_event(conn, task_id, "spawned", {"pid": int(pid)}) + run_id = _current_run_id(conn, task_id) + if run_id is not None: + conn.execute( + "UPDATE task_runs SET worker_pid = ? WHERE id = ?", + (int(pid), run_id), + ) + _append_event(conn, task_id, "spawned", {"pid": int(pid)}, run_id=run_id) def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None: @@ -1521,11 +1866,16 @@ def run_daemon( def build_worker_context(conn: sqlite3.Connection, task_id: str) -> str: """Return the full text a worker should read to understand its task. - Order (per design spec §8): + Order: 1. Task title (mandatory). 2. Task body (optional opening post). - 3. Every comment on the task, chronologically, with authors. - 4. Completion results of every done parent task. + 3. Prior attempts on THIS task (if any) — their outcome + summary + + error. Lets a retrying worker see why earlier attempts failed + and skip that path. + 4. Structured handoff results of every done parent task. Prefers + ``run.summary`` / ``run.metadata`` when the parent was executed + via a run; falls back to ``task.result`` for older data. + 5. Every comment on the task, chronologically, with authors. """ task = get_task(conn, task_id) if not task: @@ -1546,12 +1896,67 @@ def build_worker_context(conn: sqlite3.Connection, task_id: str) -> str: lines.append(task.body.strip()) lines.append("") - parents = parent_results(conn, task_id) - if parents: - lines.append("## Parent task results") - for pid, result in parents: + # Prior attempts — show closed runs so a retrying worker sees the + # history. Skip the currently-active run (that's this worker). + prior = [r for r in list_runs(conn, task_id) if r.ended_at is not None] + if prior: + lines.append("## Prior attempts on this task") + for idx, run in enumerate(prior, start=1): + ts = time.strftime("%Y-%m-%d %H:%M", time.localtime(run.started_at)) + profile = run.profile or "(unknown)" + outcome = run.outcome or run.status + lines.append(f"### Attempt {idx} — {outcome} ({profile}, {ts})") + if run.summary and run.summary.strip(): + lines.append(run.summary.strip()) + if run.error and run.error.strip(): + lines.append(f"_error_: {run.error.strip()}") + if run.metadata: + try: + meta_str = json.dumps(run.metadata, ensure_ascii=False, sort_keys=True) + lines.append(f"_metadata_: `{meta_str}`") + except Exception: + pass + lines.append("") + + # Parents: prefer the most-recent 'completed' run's summary + metadata, + # fall back to ``task.result`` when no run rows exist (legacy DBs, + # or tasks completed before the runs table landed). + parent_rows = conn.execute( + "SELECT parent_id FROM task_links WHERE child_id = ? ORDER BY parent_id", + (task_id,), + ).fetchall() + parent_ids = [r["parent_id"] for r in parent_rows] + + if parent_ids: + wrote_header = False + for pid in parent_ids: + pt = get_task(conn, pid) + if not pt or pt.status != "done": + continue + runs = [r for r in list_runs(conn, pid) if r.outcome == "completed"] + runs.sort(key=lambda r: r.started_at, reverse=True) + run = runs[0] if runs else None + + if not wrote_header: + lines.append("## Parent task results") + wrote_header = True lines.append(f"### {pid}") - lines.append((result or "(no result recorded)").strip()) + + body_lines: list[str] = [] + if run is not None and run.summary and run.summary.strip(): + body_lines.append(run.summary.strip()) + elif pt.result: + body_lines.append(pt.result.strip()) + else: + body_lines.append("(no result recorded)") + + if run is not None and run.metadata: + try: + meta_str = json.dumps(run.metadata, ensure_ascii=False, sort_keys=True) + body_lines.append(f"_metadata_: `{meta_str}`") + except Exception: + pass + lines.extend(body_lines) lines.append("") comments = list_comments(conn, task_id) @@ -1892,3 +2297,55 @@ def known_assignees(conn: sqlite3.Connection) -> list[dict]: } for name in names ] + + +# --------------------------------------------------------------------------- +# Runs (attempt history on a task) +# --------------------------------------------------------------------------- + +def list_runs( + conn: sqlite3.Connection, + task_id: str, + *, + include_active: bool = True, +) -> list[Run]: + """Return all runs for ``task_id`` in start order. + + ``include_active=True`` (default) includes the currently-running + attempt if any. Set False to return only closed runs (useful for + "how many prior attempts have there been?" checks). + """ + q = "SELECT * FROM task_runs WHERE task_id = ?" + params: list[Any] = [task_id] + if not include_active: + q += " AND ended_at IS NOT NULL" + q += " ORDER BY started_at ASC, id ASC" + rows = conn.execute(q, params).fetchall() + return [Run.from_row(r) for r in rows] + + +def get_run(conn: sqlite3.Connection, run_id: int) -> Optional[Run]: + row = conn.execute( + "SELECT * FROM task_runs WHERE id = ?", (int(run_id),), + ).fetchone() + return Run.from_row(row) if row else None + + +def active_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]: + """Return the currently-open run for ``task_id`` (``ended_at IS NULL``).""" + row = conn.execute( + "SELECT * FROM task_runs WHERE task_id = ? AND ended_at IS NULL " + "ORDER BY started_at DESC LIMIT 1", + (task_id,), + ).fetchone() + return Run.from_row(row) if row else None + + +def latest_run(conn: sqlite3.Connection, task_id: str) -> Optional[Run]: + """Return the most recent run regardless of outcome (active or closed).""" + row = conn.execute( + "SELECT * FROM task_runs WHERE task_id = ? " + "ORDER BY started_at DESC, id DESC LIMIT 1", + (task_id,), + ).fetchone() + return Run.from_row(row) if row else None diff --git a/plugins/kanban/dashboard/dist/index.js b/plugins/kanban/dashboard/dist/index.js index abcea9afd3f..1395ebb1481 100644 --- a/plugins/kanban/dashboard/dist/index.js +++ b/plugins/kanban/dashboard/dist/index.js @@ -1171,6 +1171,68 @@ }), ), h(WorkerLogSection, { taskId: t.id }), + h(RunHistorySection, { runs: props.data.runs || [] }), + ); + } + + // Per-attempt history. Closed runs first (most recent last), then the + // active run if any. Each row shows profile / outcome / elapsed / + // summary. Collapsed by default when there are more than three runs. + function RunHistorySection(props) { + const runs = props.runs || []; + const [expanded, setExpanded] = useState(false); + if (runs.length === 0) return null; + const showAll = expanded || runs.length <= 3; + const visible = showAll ? runs : runs.slice(-3); + + const fmtElapsed = function (run) { + if (!run || !run.started_at) return ""; + const end = run.ended_at || Math.floor(Date.now() / 1000); + const secs = Math.max(0, end - run.started_at); + if (secs < 60) return `${secs}s`; + if (secs < 3600) return `${Math.round(secs / 60)}m`; + return `${(secs / 3600).toFixed(1)}h`; + }; + + return h("div", { className: "hermes-kanban-section" }, + h("div", { className: "hermes-kanban-section-head-row" }, + h("span", { className: "hermes-kanban-section-head" }, + `Run history (${runs.length})`), + !showAll + ? h("button", { + type: "button", + onClick: function () { setExpanded(true); }, + className: "hermes-kanban-edit-link", + title: "Show all attempts", + }, `+${runs.length - 3} earlier`) + : null, + ), + visible.map(function (r) { + const outcomeClass = r.ended_at + ? `hermes-kanban-run--${r.outcome || r.status || "ended"}` + : "hermes-kanban-run--active"; + return h("div", { key: r.id, className: cn("hermes-kanban-run", outcomeClass) }, + h("div", { className: "hermes-kanban-run-head" }, + h("span", { className: "hermes-kanban-run-outcome" }, + r.ended_at ? (r.outcome || r.status || "ended") : "active"), + h("span", { className: "hermes-kanban-run-profile" }, + r.profile ? `@${r.profile}` : "(no profile)"), + h("span", { className: "hermes-kanban-run-elapsed" }, fmtElapsed(r)), + h("span", { className: "hermes-kanban-run-ago" }, + timeAgo ? timeAgo(r.started_at) : ""), + ), + r.summary + ? h("div", { className: "hermes-kanban-run-summary" }, r.summary) + : null, + r.error + ? h("div", { className: "hermes-kanban-run-error" }, r.error) + : null, + r.metadata + ? h("code", { className: "hermes-kanban-run-meta" }, + JSON.stringify(r.metadata)) + : null, + ); + }), ); } diff --git a/plugins/kanban/dashboard/dist/style.css b/plugins/kanban/dashboard/dist/style.css index 8d1d83ebd93..802b19c87b8 100644 --- a/plugins/kanban/dashboard/dist/style.css +++ b/plugins/kanban/dashboard/dist/style.css @@ -682,3 +682,70 @@ font-size: 0.7rem; line-height: 1.45; } + + +/* ---- Run history (per-attempt log in the drawer) ------------------- */ + +.hermes-kanban-run { + border-left: 2px solid var(--color-border); + padding: 0.35rem 0.5rem; + margin-bottom: 0.4rem; + background: color-mix(in srgb, var(--color-foreground) 3%, transparent); + border-radius: var(--radius-sm, 0.25rem); +} +.hermes-kanban-run--active { border-left-color: #3fb97d; } +.hermes-kanban-run--completed { border-left-color: #4a8cd1; } +.hermes-kanban-run--blocked { border-left-color: var(--color-destructive, #d14a4a); } +.hermes-kanban-run--crashed, +.hermes-kanban-run--timed_out, +.hermes-kanban-run--gave_up, +.hermes-kanban-run--spawn_failed { + border-left-color: var(--color-destructive, #d14a4a); + background: color-mix(in srgb, var(--color-destructive, #d14a4a) 6%, transparent); +} +.hermes-kanban-run--reclaimed { border-left-color: #d4b348; } + +.hermes-kanban-run-head { + display: flex; + align-items: center; + gap: 0.6rem; + font-size: 0.7rem; +} +.hermes-kanban-run-outcome { + font-family: var(--font-mono, ui-monospace, monospace); + font-weight: 600; + text-transform: uppercase; + letter-spacing: 0.05em; + color: var(--color-foreground); +} +.hermes-kanban-run-profile { + color: var(--color-muted-foreground); +} +.hermes-kanban-run-elapsed { + font-variant-numeric: tabular-nums; + color: var(--color-muted-foreground); +} +.hermes-kanban-run-ago { + margin-left: auto; + color: var(--color-muted-foreground); +} +.hermes-kanban-run-summary { + font-size: 0.75rem; + padding: 0.2rem 0 0; + color: var(--color-foreground); +} +.hermes-kanban-run-error { + font-size: 0.7rem; + color: var(--color-destructive, #d14a4a); + padding: 0.15rem 0 0; + font-family: var(--font-mono, ui-monospace, monospace); +} +.hermes-kanban-run-meta { + display: block; + font-size: 0.65rem; + padding: 0.15rem 0 0; + color: var(--color-muted-foreground); + white-space: pre-wrap; + word-break: break-word; + font-family: var(--font-mono, ui-monospace, monospace); +} diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index c54f81fb67f..0f35c92b1a5 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -127,6 +127,28 @@ def _comment_dict(c: kanban_db.Comment) -> dict[str, Any]: } +def _run_dict(r: kanban_db.Run) -> dict[str, Any]: + """Serialise a Run for the drawer's Run history section.""" + return { + "id": r.id, + "task_id": r.task_id, + "profile": r.profile, + "step_key": r.step_key, + "status": r.status, + "claim_lock": r.claim_lock, + "claim_expires": r.claim_expires, + "worker_pid": r.worker_pid, + "max_runtime_seconds": r.max_runtime_seconds, + "last_heartbeat_at": r.last_heartbeat_at, + "started_at": r.started_at, + "ended_at": r.ended_at, + "outcome": r.outcome, + "summary": r.summary, + "metadata": r.metadata, + "error": r.error, + } + + def _links_for(conn: sqlite3.Connection, task_id: str) -> dict[str, list[str]]: """Return {'parents': [...], 'children': [...]} for a task.""" parents = [ @@ -262,6 +284,7 @@ def get_task(task_id: str): "comments": [_comment_dict(c) for c in kanban_db.list_comments(conn, task_id)], "events": [_event_dict(e) for e in kanban_db.list_events(conn, task_id)], "links": _links_for(conn, task_id), + "runs": [_run_dict(r) for r in kanban_db.list_runs(conn, task_id)], } finally: conn.close() diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index f572162f928..113657d5263 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -1000,3 +1000,367 @@ def test_cli_create_max_runtime_via_duration(kanban_home): def test_cli_create_max_runtime_bad_format_exits_nonzero(kanban_home): out = run_slash("create 'bad' --max-runtime fish") assert "max-runtime" in out.lower() or "malformed" in out.lower() + + +# --------------------------------------------------------------------------- +# Runs as first-class (vulcan-artivus RFC feedback) +# --------------------------------------------------------------------------- + +def test_run_created_on_claim(kanban_home): + """claim_task opens a new task_runs row and points current_run_id at it.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + assert kb.get_task(conn, tid).current_run_id is None + + claimed = kb.claim_task(conn, tid) + assert claimed is not None + + task = kb.get_task(conn, tid) + assert task.current_run_id is not None + + runs = kb.list_runs(conn, tid) + assert len(runs) == 1 + r = runs[0] + assert r.id == task.current_run_id + assert r.profile == "worker" + assert r.status == "running" + assert r.outcome is None + assert r.ended_at is None + assert r.claim_lock is not None and r.claim_expires is not None + finally: + conn.close() + + +def test_run_closed_on_complete_with_summary(kanban_home): + """complete_task ends the active run with outcome='completed' and + persists summary + metadata.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + ok = kb.complete_task( + conn, tid, + result="shipped", + summary="implemented rate limiter, tests pass", + metadata={"changed_files": ["limiter.py"], "tests_run": 12}, + ) + assert ok is True + + task = kb.get_task(conn, tid) + assert task.current_run_id is None + assert task.result == "shipped" + + runs = kb.list_runs(conn, tid) + assert len(runs) == 1 + r = runs[0] + assert r.status == "done" + assert r.outcome == "completed" + assert r.summary == "implemented rate limiter, tests pass" + assert r.metadata == {"changed_files": ["limiter.py"], "tests_run": 12} + assert r.ended_at is not None + finally: + conn.close() + + +def test_run_summary_falls_back_to_result(kanban_home): + """If the caller doesn't pass summary, we fall back to result so + single-run workflows don't need to pass the same string twice.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.complete_task(conn, tid, result="only-arg") + r = kb.latest_run(conn, tid) + assert r.summary == "only-arg" + finally: + conn.close() + + +def test_multiple_attempts_preserved_as_runs(kanban_home): + """Crash / retry / complete flow produces one run per attempt, all + visible in list_runs in chronological order.""" + import hermes_cli.kanban_db as _kb + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + + # Attempt 1: claim then force the claim to be stale by backdating + # claim_expires, then let release_stale_claims reclaim it. + kb.claim_task(conn, tid) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET claim_expires = ? WHERE id = ?", + (int(time.time()) - 10, tid), + ) + conn.execute( + "UPDATE task_runs SET claim_expires = ? WHERE task_id = ?", + (int(time.time()) - 10, tid), + ) + kb.release_stale_claims(conn) + + # Attempt 2: claim then crash (simulated: pid dead). + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, 98765) + original_alive = _kb._pid_alive + _kb._pid_alive = lambda pid: False + try: + kb.detect_crashed_workers(conn) + finally: + _kb._pid_alive = original_alive + + # Attempt 3: claim then complete. + kb.claim_task(conn, tid) + kb.complete_task(conn, tid, result="finally") + + runs = kb.list_runs(conn, tid) + assert len(runs) == 3 + assert [r.outcome for r in runs] == ["reclaimed", "crashed", "completed"] + assert runs[-1].summary == "finally" + assert kb.get_task(conn, tid).current_run_id is None + finally: + conn.close() + + +def test_run_on_block_with_reason(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.block_task(conn, tid, reason="needs API key") + + r = kb.latest_run(conn, tid) + assert r.outcome == "blocked" + assert r.summary == "needs API key" + assert r.ended_at is not None + assert kb.get_task(conn, tid).current_run_id is None + finally: + conn.close() + + +def test_run_on_spawn_failure_records_failed_runs(kanban_home): + """Each spawn_failed event closes a run with outcome='spawn_failed', + and the Nth failure closes a run with outcome='gave_up'.""" + def _bad(task, ws): + raise RuntimeError("no PATH") + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + for _ in range(5): + kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5) + + runs = kb.list_runs(conn, tid) + # 5 claim attempts → 5 runs. Final one is gave_up, earlier ones + # are spawn_failed. + assert len(runs) == 5 + assert runs[-1].outcome == "gave_up" + assert all(r.outcome == "spawn_failed" for r in runs[:-1]) + assert runs[-1].error and "no PATH" in runs[-1].error + finally: + conn.close() + + +def test_event_rows_carry_run_id(kanban_home): + """task_events.run_id is populated for run-scoped kinds and NULL for + task-scoped ones.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + # task-scoped: 'created' — no run yet + # run-scoped: 'claimed' + 'completed' + kb.claim_task(conn, tid) + kb.complete_task(conn, tid, result="ok") + + rows = conn.execute( + "SELECT kind, run_id FROM task_events WHERE task_id = ? ORDER BY id", + (tid,), + ).fetchall() + by_kind = {r["kind"]: r["run_id"] for r in rows} + assert by_kind["created"] is None + assert by_kind["claimed"] is not None + assert by_kind["completed"] is not None + # Both belong to the same run. + assert by_kind["claimed"] == by_kind["completed"] + finally: + conn.close() + + +def test_build_worker_context_includes_prior_attempts(kanban_home): + """A worker spawned after a prior attempt sees that attempt's outcome + + summary in its context so it can skip the failed path.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="port x", assignee="worker") + + # Attempt 1: blocked with a reason. + kb.claim_task(conn, tid) + kb.block_task(conn, tid, reason="needs clarification on IP vs user_id") + kb.unblock_task(conn, tid) + + # Attempt 2: claim (but don't complete yet) and read the context + # as this worker would see it. + kb.claim_task(conn, tid) + ctx = kb.build_worker_context(conn, tid) + + assert "Prior attempts on this task" in ctx + assert "blocked" in ctx + assert "needs clarification on IP vs user_id" in ctx + finally: + conn.close() + + +def test_build_worker_context_uses_parent_run_summary(kanban_home): + """Downstream children read the parent's run.summary + metadata, not + just task.result.""" + conn = kb.connect() + try: + parent = kb.create_task(conn, title="research", assignee="researcher") + child = kb.create_task( + conn, title="write", assignee="writer", parents=[parent], + ) + + kb.claim_task(conn, parent) + kb.complete_task( + conn, parent, + result="done", + summary="three angles explored; B looks strongest", + metadata={"sources": ["paper A", "paper B", "paper C"]}, + ) + + # child becomes ready via recompute_ready (runs inside complete_task) + ctx = kb.build_worker_context(conn, child) + assert "Parent task results" in ctx + assert "three angles explored; B looks strongest" in ctx + assert '"sources"' in ctx # metadata JSON serialized + finally: + conn.close() + + +def test_migration_backfills_inflight_run_for_legacy_db(kanban_home): + """An existing 'running' task from before task_runs existed should + get a synthesized run row so subsequent operations (complete, + heartbeat) have something to write to.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="pre-migration", assignee="worker") + # Simulate legacy: set running + claim_lock directly, leave + # current_run_id NULL and delete the run row the claim created. + kb.claim_task(conn, tid) + with kb.write_txn(conn): + conn.execute("DELETE FROM task_runs WHERE task_id = ?", (tid,)) + conn.execute( + "UPDATE tasks SET current_run_id = NULL WHERE id = ?", + (tid,), + ) + + # Sanity: no runs, no pointer. + assert kb.list_runs(conn, tid) == [] + assert kb.get_task(conn, tid).current_run_id is None + + # Re-run init_db — migration backfill should kick in. + kb.init_db() + conn2 = kb.connect() + try: + runs = kb.list_runs(conn2, tid) + assert len(runs) == 1 + assert runs[0].status == "running" + assert runs[0].profile == "worker" + task = kb.get_task(conn2, tid) + assert task.current_run_id == runs[0].id + + # Subsequent complete closes the backfilled run cleanly. + kb.complete_task(conn2, tid, result="done", summary="ok") + r = kb.latest_run(conn2, tid) + assert r.outcome == "completed" + assert r.summary == "ok" + finally: + conn2.close() + finally: + conn.close() + + +def test_forward_compat_columns_writable(kanban_home): + """v2 will route by workflow_template_id + current_step_key. In v1 + these are nullable, kernel doesn't consult them for routing, but + they must be writable so a v2 client can populate them without + schema changes.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET workflow_template_id = ?, current_step_key = ? " + "WHERE id = ?", + ("code-review-v1", "implement", tid), + ) + task = kb.get_task(conn, tid) + assert task.workflow_template_id == "code-review-v1" + assert task.current_step_key == "implement" + finally: + conn.close() + + +def test_cli_runs_verb(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.complete_task(conn, tid, result="ok", summary="shipped") + finally: + conn.close() + out = run_slash(f"runs {tid}") + assert "completed" in out + assert "shipped" in out + assert "worker" in out + + +def test_cli_runs_json(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.complete_task( + conn, tid, result="ok", summary="shipped", + metadata={"files": 1}, + ) + finally: + conn.close() + out = run_slash(f"runs {tid} --json") + data = json.loads(out) + assert len(data) == 1 + assert data[0]["outcome"] == "completed" + assert data[0]["metadata"] == {"files": 1} + + +def test_cli_complete_with_summary_and_metadata(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + finally: + conn.close() + # JSON metadata must round-trip through shlex + argparse. + meta = '{"files": 3}' + out = run_slash( + "complete " + tid + " --summary \"done it\" --metadata '" + meta + "'" + ) + assert "Completed" in out + conn = kb.connect() + try: + r = kb.latest_run(conn, tid) + finally: + conn.close() + assert r.summary == "done it" + assert r.metadata == {"files": 3} + + +def test_cli_complete_bad_metadata_exits_nonzero(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + finally: + conn.close() + out = run_slash(f"complete {tid} --metadata not-json") + assert "metadata" in out.lower() diff --git a/tests/plugins/test_kanban_dashboard_plugin.py b/tests/plugins/test_kanban_dashboard_plugin.py index 6b45688d6e1..7f6c37df09b 100644 --- a/tests/plugins/test_kanban_dashboard_plugin.py +++ b/tests/plugins/test_kanban_dashboard_plugin.py @@ -627,3 +627,48 @@ def test_config_reads_dashboard_kanban_section(tmp_path, monkeypatch, client): assert data["lane_by_profile"] is False assert data["include_archived_by_default"] is True assert data["render_markdown"] is False + + +# --------------------------------------------------------------------------- +# Runs surfacing (vulcan-artivus RFC feedback) +# --------------------------------------------------------------------------- + +def test_task_detail_includes_runs(client): + """GET /tasks/:id carries a runs[] array with the attempt history.""" + r = client.post("/api/plugins/kanban/tasks", + json={"title": "port x", "assignee": "worker"}).json() + tid = r["task"]["id"] + + # Drive status running to force a run creation: PATCH to running + # doesn't call claim_task (the PATCH path uses _set_status_direct), + # so use the bulk/claim indirection via the kernel. + import hermes_cli.kanban_db as _kb + conn = _kb.connect() + try: + _kb.claim_task(conn, tid) + _kb.complete_task( + conn, tid, + result="done", + summary="tested on rate limiter", + metadata={"changed_files": ["limiter.py"]}, + ) + finally: + conn.close() + + d = client.get(f"/api/plugins/kanban/tasks/{tid}").json() + assert "runs" in d + assert len(d["runs"]) == 1 + run = d["runs"][0] + assert run["outcome"] == "completed" + assert run["profile"] == "worker" + assert run["summary"] == "tested on rate limiter" + assert run["metadata"] == {"changed_files": ["limiter.py"]} + assert run["ended_at"] is not None + + +def test_task_detail_runs_empty_before_claim(client): + """A task that's never been claimed has an empty runs[] list, not + a missing key.""" + r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json() + d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json() + assert d["runs"] == [] diff --git a/website/docs/user-guide/features/kanban.md b/website/docs/user-guide/features/kanban.md index e35b00858c5..d1fcdca412a 100644 --- a/website/docs/user-guide/features/kanban.md +++ b/website/docs/user-guide/features/kanban.md @@ -283,6 +283,7 @@ hermes kanban tail # follow a single task's hermes kanban watch [--assignee P] [--tenant T] # live stream ALL events to the terminal [--kinds completed,blocked,…] [--interval SECS] hermes kanban heartbeat [--note "..."] # worker liveness signal for long ops +hermes kanban runs [--json] # attempt history (one row per run) hermes kanban assignees [--json] # profiles on disk + per-assignee task counts hermes kanban dispatch [--dry-run] [--max N] # one-shot pass [--failure-limit N] [--json] @@ -349,9 +350,45 @@ hermes kanban notify-unsubscribe t_abcd \ A subscription removes itself automatically once the task reaches `done` or `archived`; no cleanup needed. +## Runs — one row per attempt + +A task is a logical unit of work; a **run** is one attempt to execute it. When the dispatcher claims a ready task it creates a row in `task_runs` and points `tasks.current_run_id` at it. When that attempt ends — completed, blocked, crashed, timed out, spawn-failed, reclaimed — the run row closes with an `outcome` and the task's pointer clears. A task that's been attempted three times has three `task_runs` rows. + +Why two tables instead of just mutating the task: you need **full attempt history** for real-world postmortems ("the second reviewer attempt got to approve, the third merged"), and you need a clean place to hang per-attempt metadata — which files changed, which tests ran, which findings a reviewer noted. Those are run facts, not task facts. + +Runs are also where **structured handoff** lives. When a worker completes a task it can pass: + +- `--result ""` — goes on the task row as before (for back-compat). +- `--summary ""` — goes on the run; downstream children see it in their `build_worker_context`. +- `--metadata '{"changed_files": [...], "tests_run": 12}'` — JSON dict on the run; children see it serialized alongside the summary. + +Downstream children read the most recent completed run's summary + metadata for each parent. Retrying workers read the prior attempts on their own task (outcome, summary, error) so they don't repeat a path that already failed. + +```bash +# Worker completes with a structured handoff: +hermes kanban complete t_abcd \ + --result "rate limiter shipped" \ + --summary "implemented token bucket, keys on user_id with IP fallback, all tests pass" \ + --metadata '{"changed_files": ["limiter.py", "tests/test_limiter.py"], "tests_run": 14}' + +# Review the attempt history on a retried task: +hermes kanban runs t_abcd +# # OUTCOME PROFILE ELAPSED STARTED +# 1 blocked worker 12s 2026-04-27 14:02 +# → BLOCKED: need decision on rate-limit key +# 2 completed worker 8m 2026-04-27 15:18 +# → implemented token bucket, keys on user_id with IP fallback +``` + +Runs are exposed on the dashboard (Run History section in the drawer, one coloured row per attempt) and on the REST API (`GET /api/plugins/kanban/tasks/:id` returns a `runs[]` array). Task_events rows carry the run_id they belong to so the UI can group them by attempt. + +### Forward compatibility + +Two nullable columns on `tasks` are reserved for v2 workflow routing: `workflow_template_id` (which template this task belongs to) and `current_step_key` (which step in that template is active). The v1 kernel ignores them for routing but lets clients write them, so a v2 release can add the routing machinery without another schema migration. + ## Event reference -Every transition appends a row to `task_events`. The kinds group into three clusters so filtering is easy (`hermes kanban watch --kinds completed,gave_up,timed_out`): +Every transition appends a row to `task_events`. Each row carries an optional `run_id` so UIs can group events by attempt. Kinds group into three clusters so filtering is easy (`hermes kanban watch --kinds completed,gave_up,timed_out`): **Lifecycle** (what changed about the task as a logical unit):