Files
hermes-agent/tests/plugins/test_kanban_dashboard_plugin.py
Teknium 8ef2ae6502 fix(kanban): audit pass — close orphaned runs on archive / dashboard direct-status / drag-drop
Integration audit of the runs-as-first-class work (0146cb2bd) found five
bugs where structured runs got orphaned or dashboard parity was missing.
All behavioral fixes; no schema change needed.

Kernel
  - archive_task: when called on a running task, now closes the
    in-flight run with outcome='reclaimed' and clears current_run_id.
    Previously, dashboard bulk-archive or CLI `kanban archive <running>`
    would leave the task_runs row open with ended_at=NULL forever and
    strand the pointer. Adds the claim_lock / claim_expires / worker_pid
    clearing to the UPDATE so the task row is clean too.
  - complete_task: embeds the first-line handoff summary in the
    `completed` event payload (capped at 400 chars). Notifier can now
    render `✔ task done — <title>\n<summary>` without a second SQL hit,
    and the full summary still lives on the run row.

Dashboard plugin
  - _set_status_direct: drag-drop OFF 'running' (to 'ready', 'todo',
    'triage', 'done' — anywhere except back to 'running') now closes
    the active run with outcome='reclaimed'. Clears worker_pid too.
    Snapshots previous status + current_run_id before the UPDATE so
    the decision has the right before-state. status event rows now
    carry run_id when closing a run, NULL otherwise.
  - UpdateTaskBody: adds `summary` and `metadata` fields. PATCH
    /tasks/:id with status='done' now forwards them to complete_task,
    giving the dashboard parity with `hermes kanban complete --summary
    ... --metadata ...`. Previously these fields only existed on the
    CLI.

CLI
  - `hermes kanban complete a b c --summary X` or `--metadata Y`:
    refused with a clear stderr message instead of silently applying
    the same handoff to every task. Bulk-close without handoff flags
    still works. (Note: hermes_cli.main discards subcommand exit
    codes via `args.func(args)` without propagating; tracked
    separately. Side-effect check is the real guard.)

Gateway notifier
  - Completion message prefers run.summary (carried in event payload)
    over task.result. task.result remains the fallback for legacy rows
    written before runs shipped.
  - Docstring: renamed stale `spawn_auto_blocked` reference to
    `gave_up` / `timed_out` — matches the actual TERMINAL_KINDS
    tuple, which was already correct in code.

Tests (+8 in core functionality, +3 in dashboard plugin)
  - archive_of_running_task_closes_run
  - archive_of_ready_task_does_not_create_spurious_run
  - dashboard_direct_status_change_off_running_closes_run
  - dashboard_direct_status_change_within_same_state_is_noop_for_runs
  - cli_bulk_complete_with_summary_rejects (side-effect assertion)
  - cli_bulk_complete_without_summary_still_works
  - completed_event_payload_carries_summary
  - completed_event_payload_summary_none_when_missing
  - patch_status_done_with_summary_and_metadata
  - patch_status_done_without_summary_still_works (legacy path)
  - patch_status_archive_closes_running_run (E2E through FastAPI TestClient)

164/164 kanban suite pass under scripts/run_tests.sh. Live smoke
(execute_code with isolated HERMES_HOME) covered all five fixed paths
plus a re-claim-after-drag-drop to confirm the fresh run is tracked
correctly after the orphan close.
2026-04-27 07:44:39 -07:00

760 lines
26 KiB
Python

"""Tests for the Kanban dashboard plugin backend (plugins/kanban/dashboard/plugin_api.py).
The plugin mounts as /api/plugins/kanban/ inside the dashboard's FastAPI app,
but here we attach its router to a bare FastAPI instance so we can test the
REST surface without spinning up the whole dashboard.
"""
from __future__ import annotations
import importlib.util
import os
import sys
import time
from pathlib import Path
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from hermes_cli import kanban_db as kb
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _load_plugin_router():
"""Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router."""
repo_root = Path(__file__).resolve().parents[2]
plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py"
assert plugin_file.exists(), f"plugin file missing: {plugin_file}"
spec = importlib.util.spec_from_file_location(
"hermes_dashboard_plugin_kanban_test", plugin_file,
)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod.router
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.fixture
def client(kanban_home):
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
return TestClient(app)
# ---------------------------------------------------------------------------
# GET /board on an empty DB
# ---------------------------------------------------------------------------
def test_board_empty(client):
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
# All canonical columns present (triage + the rest), each empty.
names = [c["name"] for c in data["columns"]]
for expected in ("triage", "todo", "ready", "running", "blocked", "done"):
assert expected in names, f"missing column {expected}: {names}"
assert all(len(c["tasks"]) == 0 for c in data["columns"])
assert data["tenants"] == []
assert data["assignees"] == []
assert data["latest_event_id"] == 0
# ---------------------------------------------------------------------------
# POST /tasks then GET /board sees it
# ---------------------------------------------------------------------------
def test_create_task_appears_on_board(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "Research LLM caching",
"assignee": "researcher",
"priority": 3,
"tenant": "acme",
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["title"] == "Research LLM caching"
assert task["assignee"] == "researcher"
assert task["status"] == "ready" # no parents -> immediately ready
assert task["priority"] == 3
assert task["tenant"] == "acme"
task_id = task["id"]
# Board now lists it under 'ready'.
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
ready = next(c for c in data["columns"] if c["name"] == "ready")
assert len(ready["tasks"]) == 1
assert ready["tasks"][0]["id"] == task_id
assert "acme" in data["tenants"]
assert "researcher" in data["assignees"]
def test_tenant_filter(client):
client.post("/api/plugins/kanban/tasks", json={"title": "A", "tenant": "t1"})
client.post("/api/plugins/kanban/tasks", json={"title": "B", "tenant": "t2"})
r = client.get("/api/plugins/kanban/board?tenant=t1")
counts = {c["name"]: len(c["tasks"]) for c in r.json()["columns"]}
total = sum(counts.values())
assert total == 1
r = client.get("/api/plugins/kanban/board?tenant=t2")
total = sum(len(c["tasks"]) for c in r.json()["columns"])
assert total == 1
# ---------------------------------------------------------------------------
# GET /tasks/:id returns body + comments + events + links
# ---------------------------------------------------------------------------
def test_task_detail_includes_links_and_events(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "child", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo" # parent not done yet
# Detail for the child shows the parent link.
r = client.get(f"/api/plugins/kanban/tasks/{child['id']}")
assert r.status_code == 200
data = r.json()
assert data["task"]["id"] == child["id"]
assert parent["id"] in data["links"]["parents"]
# Detail for the parent shows the child.
r = client.get(f"/api/plugins/kanban/tasks/{parent['id']}")
assert child["id"] in r.json()["links"]["children"]
# Events exist from creation.
assert len(data["events"]) >= 1
def test_task_detail_404_on_unknown(client):
r = client.get("/api/plugins/kanban/tasks/does-not-exist")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# PATCH /tasks/:id — status transitions
# ---------------------------------------------------------------------------
def test_patch_status_complete(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "done", "result": "shipped"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "done"
# Board reflects the move.
done = next(
c for c in client.get("/api/plugins/kanban/board").json()["columns"]
if c["name"] == "done"
)
assert any(x["id"] == t["id"] for x in done["tasks"])
def test_patch_block_then_unblock(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "blocked", "block_reason": "need input"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "blocked"
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_drag_drop_move_todo_to_ready(client):
"""Direct status write: the drag-drop path for statuses without a
dedicated verb (e.g. manually promoting todo -> ready)."""
parent = client.post("/api/plugins/kanban/tasks", json={"title": "p"}).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "c", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo"
r = client.patch(
f"/api/plugins/kanban/tasks/{child['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_reassign(client):
t = client.post(
"/api/plugins/kanban/tasks",
json={"title": "x", "assignee": "a"},
).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"assignee": "b"},
)
assert r.status_code == 200
assert r.json()["task"]["assignee"] == "b"
def test_patch_priority_and_edit(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"priority": 5, "title": "renamed"},
)
assert r.status_code == 200
data = r.json()["task"]
assert data["priority"] == 5
assert data["title"] == "renamed"
def test_patch_invalid_status(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "banana"},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Comments + Links
# ---------------------------------------------------------------------------
def test_add_comment(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": "how's progress?", "author": "teknium"},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{t['id']}")
comments = r.json()["comments"]
assert len(comments) == 1
assert comments[0]["body"] == "how's progress?"
assert comments[0]["author"] == "teknium"
def test_add_comment_empty_rejected(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": " "},
)
assert r.status_code == 400
def test_add_link_and_delete_link(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{b['id']}")
assert a["id"] in r.json()["links"]["parents"]
r = client.delete(
"/api/plugins/kanban/links",
params={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_add_link_cycle_rejected(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": b["id"], "child_id": a["id"]},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Dispatch nudge
# ---------------------------------------------------------------------------
def test_dispatch_dry_run(client):
client.post(
"/api/plugins/kanban/tasks",
json={"title": "work", "assignee": "researcher"},
)
r = client.post("/api/plugins/kanban/dispatch?dry_run=true&max=4")
assert r.status_code == 200
body = r.json()
# DispatchResult is serialized as a dataclass dict.
assert isinstance(body, dict)
# ---------------------------------------------------------------------------
# Triage column (new v1 status)
# ---------------------------------------------------------------------------
def test_create_triage_lands_in_triage_column(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "rough idea, spec me", "triage": True},
)
assert r.status_code == 200
task = r.json()["task"]
assert task["status"] == "triage"
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
assert len(triage["tasks"]) == 1
assert triage["tasks"][0]["title"] == "rough idea, spec me"
def test_triage_task_not_promoted_to_ready(client):
"""Triage tasks must stay in triage even when they have no parents."""
client.post(
"/api/plugins/kanban/tasks",
json={"title": "must stay put", "triage": True},
)
# Run the dispatcher — it should NOT promote the triage task.
client.post("/api/plugins/kanban/dispatch?dry_run=false&max=4")
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
ready = next(c for c in r.json()["columns"] if c["name"] == "ready")
assert len(triage["tasks"]) == 1
assert len(ready["tasks"]) == 0
def test_patch_status_triage_works(client):
"""A user (or specifier) can push a task back into triage, and out of it."""
t = client.post(
"/api/plugins/kanban/tasks", json={"title": "x"},
).json()["task"]
# Normal creation is 'ready'; push to triage.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "triage"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "triage"
# Now promote to todo.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "todo"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "todo"
# ---------------------------------------------------------------------------
# Progress rollup (done children / total children)
# ---------------------------------------------------------------------------
def test_board_progress_rollup(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child_a = client.post(
"/api/plugins/kanban/tasks",
json={"title": "a", "parents": [parent["id"]]},
).json()["task"]
child_b = client.post(
"/api/plugins/kanban/tasks",
json={"title": "b", "parents": [parent["id"]]},
).json()["task"]
# Children start as "todo" because the parent isn't done yet; promote
# them to "ready" so complete_task will accept the transition.
for cid in (child_a["id"], child_b["id"]):
r = client.patch(
f"/api/plugins/kanban/tasks/{cid}", json={"status": "ready"},
)
assert r.status_code == 200
# 0/2 done.
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 0, "total": 2}
# Complete one child. 1/2.
r = client.patch(
f"/api/plugins/kanban/tasks/{child_a['id']}",
json={"status": "done"},
)
assert r.status_code == 200
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 1, "total": 2}
# Childless tasks report progress=None, not {0/0}.
assert next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == child_b["id"]
)["progress"] is None
# ---------------------------------------------------------------------------
# Auto-init on first board read
# ---------------------------------------------------------------------------
def test_board_auto_initializes_missing_db(tmp_path, monkeypatch):
"""If kanban.db doesn't exist yet, GET /board must create it, not 500."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Deliberately DO NOT call kb.init_db().
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
r = c.get("/api/plugins/kanban/board")
assert r.status_code == 200
assert (home / "kanban.db").exists(), "init_db wasn't invoked by /board"
# ---------------------------------------------------------------------------
# WebSocket auth (query-param token)
# ---------------------------------------------------------------------------
def test_ws_events_rejects_when_token_required(tmp_path, monkeypatch):
"""When _SESSION_TOKEN is set (normal dashboard context), a missing or
wrong ?token= query param must be rejected with policy-violation."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
# Stub web_server so _check_ws_token has a token to compare against.
import types
stub = types.SimpleNamespace(_SESSION_TOKEN="secret-xyz")
monkeypatch.setitem(sys.modules, "hermes_cli.web_server", stub)
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
# No token → policy violation close.
from starlette.websockets import WebSocketDisconnect
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events"):
pass
assert exc.value.code == 1008
# Wrong token → policy violation close.
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events?token=nope"):
pass
assert exc.value.code == 1008
# Correct token → accepted (connect then close cleanly from our side).
with c.websocket_connect(
"/api/plugins/kanban/events?token=secret-xyz"
) as ws:
assert ws is not None # handshake succeeded
# ---------------------------------------------------------------------------
# Bulk actions
# ---------------------------------------------------------------------------
def test_bulk_status_ready(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
# Parent-less tasks land in "ready" already; push them to blocked first.
for tid in (a["id"], b["id"], c2["id"]):
client.patch(f"/api/plugins/kanban/tasks/{tid}",
json={"status": "blocked", "block_reason": "wait"})
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"], c2["id"]], "status": "ready"})
assert r.status_code == 200
results = r.json()["results"]
assert all(r["ok"] for r in results)
# All three are now ready.
board = client.get("/api/plugins/kanban/board").json()
ready = next(col for col in board["columns"] if col["name"] == "ready")
ids = {t["id"] for t in ready["tasks"]}
assert {a["id"], b["id"], c2["id"]}.issubset(ids)
def test_bulk_archive(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "archive": True})
assert r.status_code == 200
assert all(r["ok"] for r in r.json()["results"])
# Default board (archived hidden) — both gone.
board = client.get("/api/plugins/kanban/board").json()
ids = {t["id"] for col in board["columns"] for t in col["tasks"]}
assert a["id"] not in ids
assert b["id"] not in ids
def test_bulk_reassign(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "old"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks",
json={"title": "b", "assignee": "old"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "assignee": "new"})
assert r.status_code == 200
for tid in (a["id"], b["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["assignee"] == "new"
def test_bulk_unassign_via_empty_string(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "x"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"]], "assignee": ""})
assert r.status_code == 200
t = client.get(f"/api/plugins/kanban/tasks/{a['id']}").json()["task"]
assert t["assignee"] is None
def test_bulk_partial_failure_doesnt_abort_siblings(client):
"""One bad id in the middle of a batch must not prevent others from
applying."""
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], "bogus-id", c2["id"]], "priority": 7})
assert r.status_code == 200
results = r.json()["results"]
assert len(results) == 3
ok_ids = {r["id"] for r in results if r["ok"]}
assert a["id"] in ok_ids
assert c2["id"] in ok_ids
assert any(not r["ok"] and r["id"] == "bogus-id" for r in results)
# Good siblings actually got the priority bump.
for tid in (a["id"], c2["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["priority"] == 7
def test_bulk_empty_ids_400(client):
r = client.post("/api/plugins/kanban/tasks/bulk", json={"ids": []})
assert r.status_code == 400
# ---------------------------------------------------------------------------
# /config endpoint
# ---------------------------------------------------------------------------
def test_config_returns_defaults_when_section_missing(client):
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
# Defaults when dashboard.kanban is missing.
assert data["default_tenant"] == ""
assert data["lane_by_profile"] is True
assert data["include_archived_by_default"] is False
assert data["render_markdown"] is True
def test_config_reads_dashboard_kanban_section(tmp_path, monkeypatch, client):
home = Path(os.environ["HERMES_HOME"])
(home / "config.yaml").write_text(
"dashboard:\n"
" kanban:\n"
" default_tenant: acme\n"
" lane_by_profile: false\n"
" include_archived_by_default: true\n"
" render_markdown: false\n"
)
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
assert data["default_tenant"] == "acme"
assert data["lane_by_profile"] is False
assert data["include_archived_by_default"] is True
assert data["render_markdown"] is False
# ---------------------------------------------------------------------------
# Runs surfacing (vulcan-artivus RFC feedback)
# ---------------------------------------------------------------------------
def test_task_detail_includes_runs(client):
"""GET /tasks/:id carries a runs[] array with the attempt history."""
r = client.post("/api/plugins/kanban/tasks",
json={"title": "port x", "assignee": "worker"}).json()
tid = r["task"]["id"]
# Drive status running to force a run creation: PATCH to running
# doesn't call claim_task (the PATCH path uses _set_status_direct),
# so use the bulk/claim indirection via the kernel.
import hermes_cli.kanban_db as _kb
conn = _kb.connect()
try:
_kb.claim_task(conn, tid)
_kb.complete_task(
conn, tid,
result="done",
summary="tested on rate limiter",
metadata={"changed_files": ["limiter.py"]},
)
finally:
conn.close()
d = client.get(f"/api/plugins/kanban/tasks/{tid}").json()
assert "runs" in d
assert len(d["runs"]) == 1
run = d["runs"][0]
assert run["outcome"] == "completed"
assert run["profile"] == "worker"
assert run["summary"] == "tested on rate limiter"
assert run["metadata"] == {"changed_files": ["limiter.py"]}
assert run["ended_at"] is not None
def test_task_detail_runs_empty_before_claim(client):
"""A task that's never been claimed has an empty runs[] list, not
a missing key."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json()
d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json()
assert d["runs"] == []
def test_patch_status_done_with_summary_and_metadata(client):
"""PATCH /tasks/:id with status=done + summary + metadata must
reach complete_task, so the dashboard has CLI parity."""
# Create + claim.
r = client.post("/api/plugins/kanban/tasks", json={"title": "x", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={
"status": "done",
"summary": "shipped the thing",
"metadata": {"changed_files": ["a.py", "b.py"], "tests_run": 7},
},
)
assert r.status_code == 200, r.text
# The run must have the summary + metadata attached.
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "shipped the thing"
assert run.metadata == {"changed_files": ["a.py", "b.py"], "tests_run": 7}
finally:
conn.close()
def test_patch_status_done_without_summary_still_works(client):
"""Back-compat: PATCH without the new fields still completes."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "y", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "done", "result": "legacy shape"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
run = kb.latest_run(conn, tid)
assert run.outcome == "completed"
assert run.summary == "legacy shape" # falls back to result
finally:
conn.close()
def test_patch_status_archive_closes_running_run(client):
"""PATCH to archived while running must close the in-flight run."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "z", "assignee": "worker"})
tid = r.json()["task"]["id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
kb.claim_task(conn, tid)
open_run = kb.latest_run(conn, tid)
assert open_run.ended_at is None
finally:
conn.close()
r = client.patch(
f"/api/plugins/kanban/tasks/{tid}",
json={"status": "archived"},
)
assert r.status_code == 200, r.text
conn = kb.connect()
try:
task = kb.get_task(conn, tid)
assert task.status == "archived"
assert task.current_run_id is None
assert kb.latest_run(conn, tid).outcome == "reclaimed"
finally:
conn.close()