diff --git a/gateway/run.py b/gateway/run.py index 8917d13a411..9071d6b8502 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2473,10 +2473,10 @@ class GatewayRunner: For each subscription row, fetches ``task_events`` newer than the stored cursor with kind in the terminal set (``completed``, - ``blocked``, ``spawn_auto_blocked``, ``crashed``). Sends one message - per new event to ``(platform, chat_id, thread_id)``, then advances - the cursor. When a task reaches a terminal state (``completed`` / - ``archived``), the subscription is removed. + ``blocked``, ``gave_up``, ``crashed``, ``timed_out``). Sends one + message per new event to ``(platform, chat_id, thread_id)``, + then advances the cursor. When a task reaches a terminal state + (``completed`` / ``archived``), the subscription is removed. Runs in the gateway event loop; all SQLite work is pushed to a thread via ``asyncio.to_thread`` so the loop never blocks on the @@ -2547,13 +2547,24 @@ class GatewayRunner: for ev in d["events"]: kind = ev.kind if kind == "completed": - result_preview = "" - if task and task.result: + # Prefer the run's summary (the worker's + # intentional human-facing handoff, carried + # in the event payload), then fall back to + # task.result for legacy rows written before + # runs shipped. + handoff = "" + payload_summary = None + if ev.payload and ev.payload.get("summary"): + payload_summary = str(ev.payload["summary"]) + if payload_summary: + h = payload_summary.strip().splitlines()[0][:200] + handoff = f"\n{h}" + elif task and task.result: r = task.result.strip().splitlines()[0][:160] - result_preview = f"\n{r}" + handoff = f"\n{r}" msg = ( f"✔ Kanban {sub['task_id']} done" - f" — {title}{result_preview}" + f" — {title}{handoff}" ) elif kind == "blocked": reason = "" diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 5c35b69a611..cd03a01d27a 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -710,8 +710,20 @@ def _cmd_complete(args: argparse.Namespace) -> int: if not ids: print("at least one task_id is required", file=sys.stderr) return 1 - metadata = None + summary = getattr(args, "summary", None) raw_meta = getattr(args, "metadata", None) + # Guard: structured handoff fields are per-run, so they'd be + # copy-pasted identically across N runs — almost always a footgun. + # Refuse instead of silently doing the wrong thing. + if len(ids) > 1 and (summary or raw_meta): + print( + "kanban: --summary / --metadata are per-task and can't be used " + "with multiple ids (would apply the same handoff to every task). " + "Complete tasks one at a time, or drop the flags for the bulk close.", + file=sys.stderr, + ) + return 2 + metadata = None if raw_meta: try: metadata = json.loads(raw_meta) @@ -726,7 +738,7 @@ def _cmd_complete(args: argparse.Namespace) -> int: if not kb.complete_task( conn, tid, result=args.result, - summary=getattr(args, "summary", None), + summary=summary, metadata=metadata, ): failed.append(tid) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 4d43017945e..8dcd5354b19 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1183,9 +1183,18 @@ def complete_task( summary=summary if summary is not None else result, metadata=metadata, ) + # Carry the handoff summary in the event payload so gateway + # notifiers and dashboard WS consumers can render it without a + # second SQL round-trip. First line only, 400 char cap — the + # full summary stays on the run row. + ev_summary = (summary if summary is not None else result) or "" + ev_summary = ev_summary.strip().splitlines()[0][:400] if ev_summary else "" _append_event( conn, task_id, "completed", - {"result_len": len(result) if result else 0}, + { + "result_len": len(result) if result else 0, + "summary": ev_summary or None, + }, run_id=run_id, ) # Recompute ready status for dependents (separate txn so children see done). @@ -1240,12 +1249,22 @@ def unblock_task(conn: sqlite3.Connection, task_id: str) -> bool: def archive_task(conn: sqlite3.Connection, task_id: str) -> bool: with write_txn(conn): cur = conn.execute( - "UPDATE tasks SET status = 'archived' WHERE id = ? AND status != 'archived'", + "UPDATE tasks SET status = 'archived', " + " claim_lock = NULL, claim_expires = NULL, worker_pid = NULL " + "WHERE id = ? AND status != 'archived'", (task_id,), ) if cur.rowcount != 1: return False - _append_event(conn, task_id, "archived", None) + # If archive happened while a run was still in flight (e.g. user + # archived a running task from the dashboard), close that run with + # outcome='reclaimed' so attempt history isn't orphaned. + run_id = _end_run( + conn, task_id, + outcome="reclaimed", status="reclaimed", + summary="task archived with run still active", + ) + _append_event(conn, task_id, "archived", None, run_id=run_id) return True diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 0f35c92b1a5..25803998806 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -347,6 +347,11 @@ class UpdateTaskBody(BaseModel): body: Optional[str] = None result: Optional[str] = None block_reason: Optional[str] = None + # Structured handoff fields — forwarded to complete_task when status + # transitions to 'done'. Dashboard parity with ``hermes kanban + # complete --summary ... --metadata ...``. + summary: Optional[str] = None + metadata: Optional[dict] = None @router.patch("/tasks/{task_id}") @@ -373,7 +378,12 @@ def update_task(task_id: str, payload: UpdateTaskBody): s = payload.status ok = True if s == "done": - ok = kanban_db.complete_task(conn, task_id, result=payload.result) + ok = kanban_db.complete_task( + conn, task_id, + result=payload.result, + summary=payload.summary, + metadata=payload.metadata, + ) elif s == "blocked": ok = kanban_db.block_task(conn, task_id, reason=payload.block_reason) elif s == "ready": @@ -443,21 +453,45 @@ def _set_status_direct( ) -> bool: """Direct status write for drag-drop moves that aren't covered by the structured complete/block/unblock/archive verbs (e.g. todo<->ready, - running<->ready). Appends a ``status`` event row for the live feed.""" + running<->ready). Appends a ``status`` event row for the live feed. + + When this transitions OFF ``running`` to anything other than the + terminal verbs above (which own their own run closing), we close the + active run with outcome='reclaimed' so attempt history isn't + orphaned. ``running -> ready`` via drag-drop is the common case + (user yanking a stuck worker back to the queue). + """ with kanban_db.write_txn(conn): + # Snapshot current state so we know whether to close a run. + prev = conn.execute( + "SELECT status, current_run_id FROM tasks WHERE id = ?", + (task_id,), + ).fetchone() + if prev is None: + return False + was_running = prev["status"] == "running" + cur = conn.execute( "UPDATE tasks SET status = ?, " " claim_lock = CASE WHEN ? = 'running' THEN claim_lock ELSE NULL END, " - " claim_expires = CASE WHEN ? = 'running' THEN claim_expires ELSE NULL END " + " claim_expires = CASE WHEN ? = 'running' THEN claim_expires ELSE NULL END, " + " worker_pid = CASE WHEN ? = 'running' THEN worker_pid ELSE NULL END " "WHERE id = ?", - (new_status, new_status, new_status, task_id), + (new_status, new_status, new_status, new_status, task_id), ) if cur.rowcount != 1: return False + run_id = None + if was_running and new_status != "running" and prev["current_run_id"]: + run_id = kanban_db._end_run( + conn, task_id, + outcome="reclaimed", status="reclaimed", + summary=f"status changed to {new_status} (dashboard/direct)", + ) conn.execute( - "INSERT INTO task_events (task_id, kind, payload, created_at) " - "VALUES (?, 'status', ?, ?)", - (task_id, json.dumps({"status": new_status}), int(time.time())), + "INSERT INTO task_events (task_id, run_id, kind, payload, created_at) " + "VALUES (?, ?, 'status', ?, ?)", + (task_id, run_id, json.dumps({"status": new_status}), int(time.time())), ) # If we re-opened something, children may have gone stale. if new_status in ("done", "ready"): diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index 113657d5263..ddaf36a193f 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -1364,3 +1364,165 @@ def test_cli_complete_bad_metadata_exits_nonzero(kanban_home): conn.close() out = run_slash(f"complete {tid} --metadata not-json") assert "metadata" in out.lower() + + +# ------------------------------------------------------------------------- +# Integration hardening (Apr 2026 audit fixes) +# ------------------------------------------------------------------------- + +def test_archive_of_running_task_closes_run(kanban_home): + """Archiving a claimed task must close the in-flight run with + outcome='reclaimed', not orphan it.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + run = kb.latest_run(conn, tid) + assert run.ended_at is None + open_run_id = run.id + + assert kb.archive_task(conn, tid) is True + + task = kb.get_task(conn, tid) + assert task.status == "archived" + assert task.current_run_id is None + # The previously-active run must now be closed. + closed = kb.get_run(conn, open_run_id) + assert closed.ended_at is not None + assert closed.outcome == "reclaimed" + finally: + conn.close() + + +def test_archive_of_ready_task_does_not_create_spurious_run(kanban_home): + """No active run → archive shouldn't synthesize one.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + # Never claimed. Move to ready (task starts in 'ready' here). + assert kb.archive_task(conn, tid) is True + runs = kb.list_runs(conn, tid) + assert runs == [] # No run was ever opened; archive didn't fabricate one. + finally: + conn.close() + + +def test_dashboard_direct_status_change_off_running_closes_run(kanban_home): + """Dashboard drag-drop running->ready must close the active run. + + Importing _set_status_direct directly to simulate the PATCH handler + without spinning up FastAPI. + """ + from plugins.kanban.dashboard.plugin_api import _set_status_direct + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + open_run = kb.latest_run(conn, tid) + assert open_run.ended_at is None + prev_run_id = open_run.id + + # Simulate yanking the worker back to the queue. + assert _set_status_direct(conn, tid, "ready") is True + + task = kb.get_task(conn, tid) + assert task.status == "ready" + assert task.current_run_id is None + closed = kb.get_run(conn, prev_run_id) + assert closed.ended_at is not None + assert closed.outcome == "reclaimed" + finally: + conn.close() + + +def test_dashboard_direct_status_change_within_same_state_is_noop_for_runs(kanban_home): + """todo -> ready on an unclaimed task must not create any run rows.""" + from plugins.kanban.dashboard.plugin_api import _set_status_direct + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + # Force to todo for the sake of the test. + conn.execute("UPDATE tasks SET status='todo' WHERE id=?", (tid,)) + conn.commit() + assert _set_status_direct(conn, tid, "ready") is True + assert kb.list_runs(conn, tid) == [] + finally: + conn.close() + + +def test_cli_bulk_complete_with_summary_rejects(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a", assignee="worker") + b = kb.create_task(conn, title="b", assignee="worker") + kb.claim_task(conn, a); kb.claim_task(conn, b) + finally: + conn.close() + # Bulk + summary is refused (stderr message, no mutation). + # Note: hermes_cli.main doesn't propagate sub-command exit codes + # (args.func(args) discards the return value), so we check the side + # effects instead. + from subprocess import run as _run + import os, sys + env = os.environ.copy() + r = _run( + [sys.executable, "-m", "hermes_cli.main", "kanban", + "complete", a, b, "--summary", "oops"], + capture_output=True, text=True, env=env, + ) + assert "per-task" in r.stderr, r.stderr + # The tasks must still be running (no partial apply). + conn = kb.connect() + try: + assert kb.get_task(conn, a).status == "running" + assert kb.get_task(conn, b).status == "running" + finally: + conn.close() + + +def test_cli_bulk_complete_without_summary_still_works(kanban_home): + """Bulk close with no per-task handoff is allowed — the common case.""" + conn = kb.connect() + try: + a = kb.create_task(conn, title="a", assignee="worker") + b = kb.create_task(conn, title="b", assignee="worker") + kb.claim_task(conn, a); kb.claim_task(conn, b) + finally: + conn.close() + out = run_slash(f"complete {a} {b}") + assert f"Completed {a}" in out + assert f"Completed {b}" in out + + +def test_completed_event_payload_carries_summary(kanban_home): + """The 'completed' event must embed the run summary so gateway + notifiers render structured handoffs without a second SQL hit.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.complete_task(conn, tid, summary="handoff line 1\nextra", + metadata={"n": 3}) + events = kb.list_events(conn, tid) + comp = [e for e in events if e.kind == "completed"] + assert len(comp) == 1 + # First-line-only, within the 400-char cap, preserved verbatim. + assert comp[0].payload["summary"] == "handoff line 1" + finally: + conn.close() + + +def test_completed_event_payload_summary_none_when_missing(kanban_home): + """If the caller passes no summary AND no result, payload.summary is None.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + kb.complete_task(conn, tid) # no summary, no result + events = kb.list_events(conn, tid) + comp = [e for e in events if e.kind == "completed"][0] + assert comp.payload.get("summary") is None + finally: + conn.close() diff --git a/tests/plugins/test_kanban_dashboard_plugin.py b/tests/plugins/test_kanban_dashboard_plugin.py index 7f6c37df09b..071f0aea35b 100644 --- a/tests/plugins/test_kanban_dashboard_plugin.py +++ b/tests/plugins/test_kanban_dashboard_plugin.py @@ -672,3 +672,88 @@ def test_task_detail_runs_empty_before_claim(client): r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json() d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json() assert d["runs"] == [] + + +def test_patch_status_done_with_summary_and_metadata(client): + """PATCH /tasks/:id with status=done + summary + metadata must + reach complete_task, so the dashboard has CLI parity.""" + # Create + claim. + r = client.post("/api/plugins/kanban/tasks", json={"title": "x", "assignee": "worker"}) + tid = r.json()["task"]["id"] + from hermes_cli import kanban_db as kb + conn = kb.connect() + try: + kb.claim_task(conn, tid) + finally: + conn.close() + + r = client.patch( + f"/api/plugins/kanban/tasks/{tid}", + json={ + "status": "done", + "summary": "shipped the thing", + "metadata": {"changed_files": ["a.py", "b.py"], "tests_run": 7}, + }, + ) + assert r.status_code == 200, r.text + + # The run must have the summary + metadata attached. + conn = kb.connect() + try: + run = kb.latest_run(conn, tid) + assert run.outcome == "completed" + assert run.summary == "shipped the thing" + assert run.metadata == {"changed_files": ["a.py", "b.py"], "tests_run": 7} + finally: + conn.close() + + +def test_patch_status_done_without_summary_still_works(client): + """Back-compat: PATCH without the new fields still completes.""" + r = client.post("/api/plugins/kanban/tasks", json={"title": "y", "assignee": "worker"}) + tid = r.json()["task"]["id"] + from hermes_cli import kanban_db as kb + conn = kb.connect() + try: + kb.claim_task(conn, tid) + finally: + conn.close() + r = client.patch( + f"/api/plugins/kanban/tasks/{tid}", + json={"status": "done", "result": "legacy shape"}, + ) + assert r.status_code == 200, r.text + conn = kb.connect() + try: + run = kb.latest_run(conn, tid) + assert run.outcome == "completed" + assert run.summary == "legacy shape" # falls back to result + finally: + conn.close() + + +def test_patch_status_archive_closes_running_run(client): + """PATCH to archived while running must close the in-flight run.""" + r = client.post("/api/plugins/kanban/tasks", json={"title": "z", "assignee": "worker"}) + tid = r.json()["task"]["id"] + from hermes_cli import kanban_db as kb + conn = kb.connect() + try: + kb.claim_task(conn, tid) + open_run = kb.latest_run(conn, tid) + assert open_run.ended_at is None + finally: + conn.close() + r = client.patch( + f"/api/plugins/kanban/tasks/{tid}", + json={"status": "archived"}, + ) + assert r.status_code == 200, r.text + conn = kb.connect() + try: + task = kb.get_task(conn, tid) + assert task.status == "archived" + assert task.current_run_id is None + assert kb.latest_run(conn, tid).outcome == "reclaimed" + finally: + conn.close()