diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index cd03a01d27a..7f0e4ea397f 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -260,7 +260,7 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu help="Only show events from tasks in this tenant") p_watch.add_argument("--kinds", default=None, help="Comma-separated event kinds to include " - "(e.g. 'completed,blocked,spawn_auto_blocked')") + "(e.g. 'completed,blocked,gave_up,crashed,timed_out')") p_watch.add_argument("--interval", type=float, default=0.5, help="Poll interval in seconds (default: 0.5)") diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 8dcd5354b19..f59eec1221f 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -206,6 +206,7 @@ class Event: kind: str payload: Optional[dict] created_at: int + run_id: Optional[int] = None # --------------------------------------------------------------------------- @@ -874,6 +875,7 @@ def list_events(conn: sqlite3.Connection, task_id: str) -> list[Event]: kind=r["kind"], payload=payload, created_at=r["created_at"], + run_id=(int(r["run_id"]) if "run_id" in r.keys() and r["run_id"] is not None else None), ) ) return out @@ -967,6 +969,57 @@ def _current_run_id(conn: sqlite3.Connection, task_id: str) -> Optional[int]: return int(row["current_run_id"]) if row and row["current_run_id"] else None +def _synthesize_ended_run( + conn: sqlite3.Connection, + task_id: str, + *, + outcome: str, + summary: Optional[str] = None, + error: Optional[str] = None, + metadata: Optional[dict] = None, +) -> int: + """Insert a zero-duration, already-closed run row. + + Used when a terminal transition happens on a task that was never + claimed (CLI user calling ``hermes kanban complete + --summary X``, or dashboard "mark done" on a ready task). Without + this, the handoff fields (summary / metadata / error) would be + silently dropped: ``_end_run`` is a no-op because there's no + current run. + + The synthetic run has ``started_at == ended_at == now`` so it + shows up in attempt history as "instant" and doesn't skew elapsed + stats. Caller is responsible for leaving ``current_run_id`` NULL + (or for clearing it elsewhere in the same txn) since this + function does NOT touch the tasks row. + """ + now = int(time.time()) + trow = conn.execute( + "SELECT assignee, current_step_key FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + profile = trow["assignee"] if trow else None + step_key = trow["current_step_key"] if trow else None + cur = conn.execute( + """ + INSERT INTO task_runs ( + task_id, profile, step_key, + status, outcome, + summary, error, metadata, + started_at, ended_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + task_id, profile, step_key, + outcome, outcome, + summary, error, + json.dumps(metadata, ensure_ascii=False) if metadata else None, + now, now, + ), + ) + return int(cur.lastrowid or 0) + + # --------------------------------------------------------------------------- # Dependency resolution (todo -> ready) # --------------------------------------------------------------------------- @@ -1020,6 +1073,26 @@ def claim_task( lock = claimer or _claimer_id() expires = now + int(ttl_seconds) with write_txn(conn): + # Defensive: if a prior run somehow leaked (invariant violation from + # an unknown code path), close it as 'reclaimed' so we don't strand + # it when the CAS resets the pointer below. No-op when the invariant + # holds (the common case). + stale = conn.execute( + "SELECT current_run_id FROM tasks WHERE id = ? AND status = 'ready'", + (task_id,), + ).fetchone() + if stale and stale["current_run_id"]: + conn.execute( + """ + UPDATE task_runs + SET status = 'reclaimed', outcome = 'reclaimed', + summary = COALESCE(summary, 'invariant recovery on re-claim'), + ended_at = ?, + claim_lock = NULL, claim_expires = NULL, worker_pid = NULL + WHERE id = ? AND ended_at IS NULL + """, + (now, int(stale["current_run_id"])), + ) cur = conn.execute( """ UPDATE tasks @@ -1183,6 +1256,17 @@ def complete_task( summary=summary if summary is not None else result, metadata=metadata, ) + # If complete_task was called on a never-claimed task (ready or + # blocked → done with no run in flight), synthesize a + # zero-duration run so the handoff fields are persisted in + # attempt history instead of silently lost. + if run_id is None and (summary or metadata or result): + run_id = _synthesize_ended_run( + conn, task_id, + outcome="completed", + summary=summary if summary is not None else result, + metadata=metadata, + ) # Carry the handoff summary in the event payload so gateway # notifiers and dashboard WS consumers can render it without a # second SQL round-trip. First line only, 400 char cap — the @@ -1229,6 +1313,14 @@ def block_task( outcome="blocked", status="blocked", summary=reason, ) + # Synthesize a run when blocking a never-claimed task so the + # reason is preserved in attempt history. + if run_id is None and reason: + run_id = _synthesize_ended_run( + conn, task_id, + outcome="blocked", + summary=reason, + ) _append_event(conn, task_id, "blocked", {"reason": reason}, run_id=run_id) return True @@ -2147,6 +2239,7 @@ def unseen_events_for_sub( out.append(Event( id=r["id"], task_id=r["task_id"], kind=r["kind"], payload=payload, created_at=r["created_at"], + run_id=(int(r["run_id"]) if "run_id" in r.keys() and r["run_id"] is not None else None), )) max_id = max(max_id, int(r["id"])) return max_id, out diff --git a/plugins/kanban/dashboard/dist/index.js b/plugins/kanban/dashboard/dist/index.js index 1395ebb1481..58bab0404cb 100644 --- a/plugins/kanban/dashboard/dist/index.js +++ b/plugins/kanban/dashboard/dist/index.js @@ -259,6 +259,11 @@ const [selectedTaskId, setSelectedTaskId] = useState(null); const [selectedIds, setSelectedIds] = useState(() => new Set()); + // Per-task event counter incremented whenever the WS stream reports + // a new event for that task id. TaskDrawer useEffect-depends on its + // own task's counter so it reloads itself on live events instead of + // showing stale data. + const [taskEventTick, setTaskEventTick] = useState({}); const cursorRef = useRef(0); const reloadTimerRef = useRef(null); @@ -339,6 +344,14 @@ const msg = JSON.parse(ev.data); if (msg && Array.isArray(msg.events) && msg.events.length > 0) { cursorRef.current = msg.cursor || cursorRef.current; + // Stamp per-task signal so the TaskDrawer can reload itself. + setTaskEventTick(function (prev) { + const next = Object.assign({}, prev); + for (const e of msg.events) { + if (e && e.task_id) next[e.task_id] = (next[e.task_id] || 0) + 1; + } + return next; + }); scheduleReload(); } } catch (_e) { /* ignore */ } @@ -507,6 +520,7 @@ onRefresh: loadBoard, renderMarkdown: renderMd, allTasks: board.columns.reduce(function (acc, c) { return acc.concat(c.tasks); }, []), + eventTick: taskEventTick[selectedTaskId] || 0, }) : null, ), ); @@ -981,7 +995,10 @@ .finally(function () { setLoading(false); }); }, [props.taskId]); - useEffect(function () { load(); }, [load]); + // Reload when the WS stream reports new events for this task id + // (completion, block, crash, etc. — anything that'd make the drawer + // show stale data if we only loaded on mount). + useEffect(function () { load(); }, [load, props.eventTick]); useEffect(function () { function onKey(e) { if (e.key === "Escape" && !editing) props.onClose(); } window.addEventListener("keydown", onKey); diff --git a/plugins/kanban/dashboard/dist/style.css b/plugins/kanban/dashboard/dist/style.css index 802b19c87b8..6ac7f5d4b44 100644 --- a/plugins/kanban/dashboard/dist/style.css +++ b/plugins/kanban/dashboard/dist/style.css @@ -695,6 +695,7 @@ } .hermes-kanban-run--active { border-left-color: #3fb97d; } .hermes-kanban-run--completed { border-left-color: #4a8cd1; } +.hermes-kanban-run--ended { border-left-color: #6b7280; } /* generic fallback when outcome is unset */ .hermes-kanban-run--blocked { border-left-color: var(--color-destructive, #d14a4a); } .hermes-kanban-run--crashed, .hermes-kanban-run--timed_out, diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 25803998806..e9e532f8118 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -114,6 +114,7 @@ def _event_dict(event: kanban_db.Event) -> dict[str, Any]: "kind": event.kind, "payload": event.payload, "created_at": event.created_at, + "run_id": event.run_id, } @@ -788,7 +789,7 @@ async def stream_events(ws: WebSocket): conn = kanban_db.connect() try: rows = conn.execute( - "SELECT id, task_id, kind, payload, created_at " + "SELECT id, task_id, run_id, kind, payload, created_at " "FROM task_events WHERE id > ? ORDER BY id ASC LIMIT 200", (cursor_val,), ).fetchall() @@ -802,6 +803,7 @@ async def stream_events(ws: WebSocket): out.append({ "id": r["id"], "task_id": r["task_id"], + "run_id": r["run_id"], "kind": r["kind"], "payload": payload, "created_at": r["created_at"], diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index ddaf36a193f..2f721af0283 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -1526,3 +1526,157 @@ def test_completed_event_payload_summary_none_when_missing(kanban_home): assert comp.payload.get("summary") is None finally: conn.close() + + +# ------------------------------------------------------------------------- +# Deep-scan fixes (Apr 2026 second audit) +# ------------------------------------------------------------------------- + +def test_complete_never_claimed_task_synthesizes_run(kanban_home): + """complete_task on a ready (never-claimed) task must persist the + handoff instead of silently dropping summary/metadata.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="skip claim", assignee="worker") + # Task is in 'ready' state with no run opened. + assert kb.list_runs(conn, tid) == [] + ok = kb.complete_task( + conn, tid, + summary="did it manually", + metadata={"reason": "human intervention"}, + ) + assert ok is True + + runs = kb.list_runs(conn, tid) + assert len(runs) == 1, f"expected 1 synthetic run, got {len(runs)}" + r = runs[0] + assert r.outcome == "completed" + assert r.summary == "did it manually" + assert r.metadata == {"reason": "human intervention"} + # Zero-duration synthetic run. + assert r.started_at == r.ended_at + # Task pointer still NULL (we never claimed, never opened a run). + assert kb.get_task(conn, tid).current_run_id is None + + # Event carries the synthetic run_id. + evts = [e for e in kb.list_events(conn, tid) if e.kind == "completed"] + assert len(evts) == 1 + assert evts[0].run_id == r.id + finally: + conn.close() + + +def test_block_never_claimed_task_synthesizes_run(kanban_home): + """block_task on a ready task must persist --reason on a synthetic run.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="drop this", assignee="worker") + ok = kb.block_task(conn, tid, reason="deprioritized") + assert ok is True + + runs = kb.list_runs(conn, tid) + assert len(runs) == 1 + r = runs[0] + assert r.outcome == "blocked" + assert r.summary == "deprioritized" + assert r.started_at == r.ended_at + + evts = [e for e in kb.list_events(conn, tid) if e.kind == "blocked"] + assert evts[0].run_id == r.id + finally: + conn.close() + + +def test_complete_never_claimed_without_handoff_skips_synthesis(kanban_home): + """If a bulk-complete passes no summary/metadata/result, don't spam + the runs table with empty synthetic rows.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="simple", assignee="worker") + ok = kb.complete_task(conn, tid) # no handoff fields + assert ok is True + assert kb.list_runs(conn, tid) == [] # no synthetic row + finally: + conn.close() + + +def test_event_dataclass_carries_run_id(kanban_home): + """list_events and the Event dataclass must expose run_id so + downstream consumers (notifier, dashboard) can group by attempt.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + run_id = kb.latest_run(conn, tid).id + kb.complete_task(conn, tid, summary="done") + + events = kb.list_events(conn, tid) + kinds_with_run = { + e.kind: e.run_id for e in events if e.run_id is not None + } + # 'created' should NOT have a run_id (task-scoped). + created = [e for e in events if e.kind == "created"][0] + assert created.run_id is None + # 'claimed' and 'completed' must have run_id. + assert kinds_with_run.get("claimed") == run_id + assert kinds_with_run.get("completed") == run_id + finally: + conn.close() + + +def test_unseen_events_for_sub_includes_run_id(kanban_home): + """Gateway notifier path must also surface run_id on events.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="notify test", assignee="worker") + kb.add_notify_sub( + conn, task_id=tid, platform="telegram", + chat_id="12345", thread_id="", + ) + kb.claim_task(conn, tid) + run_id = kb.latest_run(conn, tid).id + kb.complete_task(conn, tid, summary="notify-ready") + + cursor, events = kb.unseen_events_for_sub( + conn, task_id=tid, platform="telegram", + chat_id="12345", thread_id="", + kinds=("completed",), + ) + assert len(events) == 1 + assert events[0].run_id == run_id + finally: + conn.close() + + +def test_claim_task_recovers_from_invariant_leak(kanban_home): + """Belt-and-suspenders: if a prior run somehow leaked (stranded + current_run_id on a ready task), claim_task should recover rather + than strand it further.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="invariant test", assignee="worker") + # Manually engineer the invariant violation: create a run, then + # flip status back to 'ready' without closing the run. + kb.claim_task(conn, tid) + leaked_run_id = kb.latest_run(conn, tid).id + conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL " + "WHERE id = ?", (tid,), + ) + conn.commit() + # The leaked run is still open. + assert kb.get_run(conn, leaked_run_id).ended_at is None + + # Now re-claim — the defensive recovery must close the leak. + claimed = kb.claim_task(conn, tid) + assert claimed is not None + leaked = kb.get_run(conn, leaked_run_id) + assert leaked.ended_at is not None + assert leaked.outcome == "reclaimed" + # New run opened and pointed to. + new_run = kb.latest_run(conn, tid) + assert new_run.id != leaked_run_id + assert new_run.ended_at is None + finally: + conn.close() diff --git a/tests/plugins/test_kanban_dashboard_plugin.py b/tests/plugins/test_kanban_dashboard_plugin.py index 071f0aea35b..043a1d6e13f 100644 --- a/tests/plugins/test_kanban_dashboard_plugin.py +++ b/tests/plugins/test_kanban_dashboard_plugin.py @@ -757,3 +757,27 @@ def test_patch_status_archive_closes_running_run(client): assert kb.latest_run(conn, tid).outcome == "reclaimed" finally: conn.close() + + +def test_event_dict_includes_run_id(client): + """GET /tasks/:id returns events with run_id populated.""" + r = client.post("/api/plugins/kanban/tasks", json={"title": "e", "assignee": "worker"}) + tid = r.json()["task"]["id"] + from hermes_cli import kanban_db as kb + conn = kb.connect() + try: + kb.claim_task(conn, tid) + run_id = kb.latest_run(conn, tid).id + kb.complete_task(conn, tid, summary="wss") + finally: + conn.close() + + r = client.get(f"/api/plugins/kanban/tasks/{tid}") + assert r.status_code == 200 + events = r.json()["events"] + # Every event in the response must have a run_id key (None or int). + for e in events: + assert "run_id" in e, f"missing run_id in event: {e}" + # completed event must have the actual run_id. + comp = [e for e in events if e["kind"] == "completed"] + assert comp[0]["run_id"] == run_id diff --git a/website/docs/user-guide/features/kanban.md b/website/docs/user-guide/features/kanban.md index 0873bb9c421..a0d7f0b94d8 100644 --- a/website/docs/user-guide/features/kanban.md +++ b/website/docs/user-guide/features/kanban.md @@ -386,6 +386,10 @@ Runs are exposed on the dashboard (Run History section in the drawer, one colour **Reclaimed runs from status changes.** If you drag a running task off `running` in the dashboard (back to `ready`, or straight to `todo`), or archive a task that was still running, the in-flight run closes with `outcome='reclaimed'` rather than being orphaned. The `task_runs` row is always in a terminal state when `tasks.current_run_id` is `NULL`, and vice versa — that invariant holds across CLI, dashboard, dispatcher, and notifier. +**Synthetic runs for never-claimed completions.** Completing or blocking a task that was never claimed (e.g. a human closes a `ready` task from the dashboard with a summary, or a CLI user runs `hermes kanban complete --summary X`) would otherwise drop the handoff. Instead the kernel inserts a zero-duration run row (`started_at == ended_at`) carrying the summary / metadata / reason so attempt history stays complete. The `completed` / `blocked` event's `run_id` points at that row. + +**Live drawer refresh.** When the dashboard's WebSocket event stream reports new events for the task the user is currently viewing, the drawer reloads itself (via a per-task event counter threaded into its `useEffect` dependency list). Closing and reopening is no longer required to see a run's new row or updated outcome. + ### Forward compatibility Two nullable columns on `tasks` are reserved for v2 workflow routing: `workflow_template_id` (which template this task belongs to) and `current_step_key` (which step in that template is active). The v1 kernel ignores them for routing but lets clients write them, so a v2 release can add the routing machinery without another schema migration. @@ -398,12 +402,12 @@ Every transition appends a row to `task_events`. Each row carries an optional `r | Kind | Payload | When | |---|---|---| -| `created` | `{assignee, status, parents, tenant}` | Task inserted. | -| `promoted` | — | `todo → ready` because all parents hit `done`. | +| `created` | `{assignee, status, parents, tenant}` | Task inserted. `run_id` is `NULL`. | +| `promoted` | — | `todo → ready` because all parents hit `done`. `run_id` is `NULL`. | | `claimed` | `{lock, expires, run_id}` | Dispatcher atomically claimed a `ready` task for spawn. | -| `completed` | `{result_len, summary?}` | Worker wrote `--result` / `--summary` and task hit `done`. `summary` is the first-line handoff (400-char cap); full version lives on the run row. | -| `blocked` | `{reason}` | Worker or human flipped the task to `blocked`. | -| `unblocked` | — | `blocked → ready`, either manually or via `/unblock`. | +| `completed` | `{result_len, summary?}` | Worker wrote `--result` / `--summary` and task hit `done`. `summary` is the first-line handoff (400-char cap); full version lives on the run row. If `complete_task` is called on a never-claimed task with handoff fields, a zero-duration run is synthesized so `run_id` still points at something. | +| `blocked` | `{reason}` | Worker or human flipped the task to `blocked`. Synthesizes a zero-duration run when called on a never-claimed task with `--reason`. | +| `unblocked` | — | `blocked → ready`, either manually or via `/unblock`. `run_id` is `NULL`. | | `archived` | — | Hidden from the default board. If the task was still running, carries the `run_id` of the run that was reclaimed as a side effect. | **Edits** (human-driven changes that aren't transitions):