Files
hermes-agent/tests/plugins/test_kanban_dashboard_plugin.py
Teknium 0146cb2bd2 feat(kanban): runs as first-class (v1); structured handoffs; forward-compat for v2 workflows
Addresses vulcan-artivus's RFC review on issue #16102. Picks up the
structural changes that are expensive to retrofit later and zero-cost
to land now; defers workflow-template routing + per-stage lanes to v2
(kept forward-compat hooks in the schema).

Kernel
  - New `task_runs` table. Each claim opens a run (pid, claim_lock,
    heartbeat, max_runtime, started_at), each terminal transition
    closes it with an outcome (completed / blocked / crashed /
    timed_out / spawn_failed / gave_up / reclaimed). Multiple rows per
    task when retries happen, preserving full attempt history.
  - `tasks.current_run_id` points at the active run (NULL when idle);
    denormalised for cheap reads.
  - `task_events.run_id` carries the run a given event belongs to so
    UIs group events by attempt. claim/spawned/complete/block/crash/
    timeout/spawn_fail/gave_up/heartbeat events are all run-scoped;
    created/promoted/assigned/edited stay task-scoped (run_id=NULL).
  - Legacy DBs: migration adds the columns + indexes + synthesizes a
    run row for any task that's 'running' before the runs table
    existed, so subsequent complete/heartbeat/reclaim calls have a
    target. Idempotent.

Structured handoff
  - `complete_task(summary=, metadata=)` persists both on the closing
    run. `summary` falls back to `result` when omitted so single-run
    callers don't duplicate. `metadata` is a free-form dict
    ({changed_files, tests_run, findings, ...}).
  - `build_worker_context` rewrites: now reads "Prior attempts on this
    task" (closed runs: outcome, summary, error, metadata) and
    "Parent task results" pulls run.summary + run.metadata of the
    most-recent completed run per parent, falling back to task.result
    for legacy rows without runs. Retrying workers see why earlier
    attempts failed; downstream workers see parent handoffs
    structurally, not as loose `result` strings.

CLI
  - `hermes kanban complete <id> --summary "..." --metadata '{"files":1}'`.
    JSON is parsed and rejected with exit-2 if malformed.
  - New `hermes kanban runs <id> [--json]` verb. Shows per-run rows:
    outcome, profile, elapsed, summary, error. JSON mode serializes
    the full run dataclass for scripting.

Dashboard plugin
  - GET /tasks/:id now carries a runs[] array alongside task / events /
    comments / links. Each run serialised with outcome, summary,
    metadata, worker_pid, elapsed fields.
  - New Run History section in the drawer. Outcome-coloured left
    border (green=active, blue=completed, amber=reclaimed,
    red=crashed/timed_out/gave_up/blocked). Collapsed when >3 runs
    with a '+N earlier' toggle. Shows summary + error + metadata
    inline.

Forward-compat for v2 (vulcan's workflow templates + stages)
  - `tasks.workflow_template_id` and `tasks.current_step_key` added as
    nullable columns. v1 kernel ignores them for routing; v2 will add
    workflow_templates + workflow_steps tables and wire the dispatcher
    to consult them. task_runs has a matching `step_key` column. Lets
    a v2 release land additively without another schema migration.

Tests (+22 in test_kanban_core_functionality.py, +2 in dashboard)
  - run_created_on_claim / run_closed_on_complete_with_summary
  - run_summary_falls_back_to_result
  - multiple_attempts_preserved_as_runs (3 attempts: reclaimed →
    crashed → completed, all visible in list_runs)
  - run_on_block_with_reason / run_on_spawn_failure_records_failed_runs
    (5 spawn_failed runs + 1 gave_up run)
  - event_rows_carry_run_id (task-scoped vs run-scoped split)
  - build_worker_context_includes_prior_attempts
  - build_worker_context_uses_parent_run_summary (metadata JSON in context)
  - migration_backfills_inflight_run_for_legacy_db (simulates a
    pre-migration running task, re-runs init_db, asserts backfill)
  - forward_compat_columns_writable
  - cli_runs_verb + cli_runs_json
  - cli_complete_with_summary_and_metadata (JSON round-trip through
    shlex + argparse)
  - cli_complete_bad_metadata_exits_nonzero
  - task_detail_includes_runs / task_detail_runs_empty_before_claim

269/269 kanban suite pass under scripts/run_tests.sh. Live-smoke
covered: single-attempt complete → run closed + summary persisted;
retry scenario → two runs visible (blocked + completed); parent run
summary + metadata surfaced to child via build_worker_context;
forward-compat columns writable via UPDATE; GET /tasks/:id returns
runs[].

Docs
  - New 'Runs — one row per attempt' section in kanban.md: the
    why (full attempt history, structured metadata), the two-table
    model (task is logical, run is execution), the structured handoff
    shape (--summary / --metadata), example CLI + dashboard output,
    forward-compat note for v2.
  - Event reference updated to mention task_events.run_id.
  - CLI reference gains 'hermes kanban runs <id>'.

Not in v1 (deferred to v2):
  - Workflow templates (workflow_templates + workflow_steps tables,
    stage-based routing, success/failure step links).
  - 'stage' as a distinct axis from status in the UI.
  - Shared-by-default workspace binding across stages of the same
    workflow run.
  - Pipeline replacement for the kanban-orchestrator skill (the
    orchestrator's 'decompose, don't execute' guidance is still
    correct; it becomes partly redundant once workflows land).
2026-04-27 06:54:19 -07:00

675 lines
24 KiB
Python

"""Tests for the Kanban dashboard plugin backend (plugins/kanban/dashboard/plugin_api.py).
The plugin mounts as /api/plugins/kanban/ inside the dashboard's FastAPI app,
but here we attach its router to a bare FastAPI instance so we can test the
REST surface without spinning up the whole dashboard.
"""
from __future__ import annotations
import importlib.util
import os
import sys
import time
from pathlib import Path
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from hermes_cli import kanban_db as kb
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _load_plugin_router():
"""Dynamically load plugins/kanban/dashboard/plugin_api.py and return its router."""
repo_root = Path(__file__).resolve().parents[2]
plugin_file = repo_root / "plugins" / "kanban" / "dashboard" / "plugin_api.py"
assert plugin_file.exists(), f"plugin file missing: {plugin_file}"
spec = importlib.util.spec_from_file_location(
"hermes_dashboard_plugin_kanban_test", plugin_file,
)
assert spec is not None and spec.loader is not None
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod.router
@pytest.fixture
def kanban_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty kanban DB."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
return home
@pytest.fixture
def client(kanban_home):
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
return TestClient(app)
# ---------------------------------------------------------------------------
# GET /board on an empty DB
# ---------------------------------------------------------------------------
def test_board_empty(client):
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
# All canonical columns present (triage + the rest), each empty.
names = [c["name"] for c in data["columns"]]
for expected in ("triage", "todo", "ready", "running", "blocked", "done"):
assert expected in names, f"missing column {expected}: {names}"
assert all(len(c["tasks"]) == 0 for c in data["columns"])
assert data["tenants"] == []
assert data["assignees"] == []
assert data["latest_event_id"] == 0
# ---------------------------------------------------------------------------
# POST /tasks then GET /board sees it
# ---------------------------------------------------------------------------
def test_create_task_appears_on_board(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={
"title": "Research LLM caching",
"assignee": "researcher",
"priority": 3,
"tenant": "acme",
},
)
assert r.status_code == 200, r.text
task = r.json()["task"]
assert task["title"] == "Research LLM caching"
assert task["assignee"] == "researcher"
assert task["status"] == "ready" # no parents -> immediately ready
assert task["priority"] == 3
assert task["tenant"] == "acme"
task_id = task["id"]
# Board now lists it under 'ready'.
r = client.get("/api/plugins/kanban/board")
assert r.status_code == 200
data = r.json()
ready = next(c for c in data["columns"] if c["name"] == "ready")
assert len(ready["tasks"]) == 1
assert ready["tasks"][0]["id"] == task_id
assert "acme" in data["tenants"]
assert "researcher" in data["assignees"]
def test_tenant_filter(client):
client.post("/api/plugins/kanban/tasks", json={"title": "A", "tenant": "t1"})
client.post("/api/plugins/kanban/tasks", json={"title": "B", "tenant": "t2"})
r = client.get("/api/plugins/kanban/board?tenant=t1")
counts = {c["name"]: len(c["tasks"]) for c in r.json()["columns"]}
total = sum(counts.values())
assert total == 1
r = client.get("/api/plugins/kanban/board?tenant=t2")
total = sum(len(c["tasks"]) for c in r.json()["columns"])
assert total == 1
# ---------------------------------------------------------------------------
# GET /tasks/:id returns body + comments + events + links
# ---------------------------------------------------------------------------
def test_task_detail_includes_links_and_events(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "child", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo" # parent not done yet
# Detail for the child shows the parent link.
r = client.get(f"/api/plugins/kanban/tasks/{child['id']}")
assert r.status_code == 200
data = r.json()
assert data["task"]["id"] == child["id"]
assert parent["id"] in data["links"]["parents"]
# Detail for the parent shows the child.
r = client.get(f"/api/plugins/kanban/tasks/{parent['id']}")
assert child["id"] in r.json()["links"]["children"]
# Events exist from creation.
assert len(data["events"]) >= 1
def test_task_detail_404_on_unknown(client):
r = client.get("/api/plugins/kanban/tasks/does-not-exist")
assert r.status_code == 404
# ---------------------------------------------------------------------------
# PATCH /tasks/:id — status transitions
# ---------------------------------------------------------------------------
def test_patch_status_complete(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "done", "result": "shipped"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "done"
# Board reflects the move.
done = next(
c for c in client.get("/api/plugins/kanban/board").json()["columns"]
if c["name"] == "done"
)
assert any(x["id"] == t["id"] for x in done["tasks"])
def test_patch_block_then_unblock(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "blocked", "block_reason": "need input"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "blocked"
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_drag_drop_move_todo_to_ready(client):
"""Direct status write: the drag-drop path for statuses without a
dedicated verb (e.g. manually promoting todo -> ready)."""
parent = client.post("/api/plugins/kanban/tasks", json={"title": "p"}).json()["task"]
child = client.post(
"/api/plugins/kanban/tasks",
json={"title": "c", "parents": [parent["id"]]},
).json()["task"]
assert child["status"] == "todo"
r = client.patch(
f"/api/plugins/kanban/tasks/{child['id']}",
json={"status": "ready"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "ready"
def test_patch_reassign(client):
t = client.post(
"/api/plugins/kanban/tasks",
json={"title": "x", "assignee": "a"},
).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"assignee": "b"},
)
assert r.status_code == 200
assert r.json()["task"]["assignee"] == "b"
def test_patch_priority_and_edit(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"priority": 5, "title": "renamed"},
)
assert r.status_code == 200
data = r.json()["task"]
assert data["priority"] == 5
assert data["title"] == "renamed"
def test_patch_invalid_status(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}",
json={"status": "banana"},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Comments + Links
# ---------------------------------------------------------------------------
def test_add_comment(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": "how's progress?", "author": "teknium"},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{t['id']}")
comments = r.json()["comments"]
assert len(comments) == 1
assert comments[0]["body"] == "how's progress?"
assert comments[0]["author"] == "teknium"
def test_add_comment_empty_rejected(client):
t = client.post("/api/plugins/kanban/tasks", json={"title": "x"}).json()["task"]
r = client.post(
f"/api/plugins/kanban/tasks/{t['id']}/comments",
json={"body": " "},
)
assert r.status_code == 400
def test_add_link_and_delete_link(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
r = client.get(f"/api/plugins/kanban/tasks/{b['id']}")
assert a["id"] in r.json()["links"]["parents"]
r = client.delete(
"/api/plugins/kanban/links",
params={"parent_id": a["id"], "child_id": b["id"]},
)
assert r.status_code == 200
assert r.json()["ok"] is True
def test_add_link_cycle_rejected(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
client.post(
"/api/plugins/kanban/links",
json={"parent_id": a["id"], "child_id": b["id"]},
)
r = client.post(
"/api/plugins/kanban/links",
json={"parent_id": b["id"], "child_id": a["id"]},
)
assert r.status_code == 400
# ---------------------------------------------------------------------------
# Dispatch nudge
# ---------------------------------------------------------------------------
def test_dispatch_dry_run(client):
client.post(
"/api/plugins/kanban/tasks",
json={"title": "work", "assignee": "researcher"},
)
r = client.post("/api/plugins/kanban/dispatch?dry_run=true&max=4")
assert r.status_code == 200
body = r.json()
# DispatchResult is serialized as a dataclass dict.
assert isinstance(body, dict)
# ---------------------------------------------------------------------------
# Triage column (new v1 status)
# ---------------------------------------------------------------------------
def test_create_triage_lands_in_triage_column(client):
r = client.post(
"/api/plugins/kanban/tasks",
json={"title": "rough idea, spec me", "triage": True},
)
assert r.status_code == 200
task = r.json()["task"]
assert task["status"] == "triage"
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
assert len(triage["tasks"]) == 1
assert triage["tasks"][0]["title"] == "rough idea, spec me"
def test_triage_task_not_promoted_to_ready(client):
"""Triage tasks must stay in triage even when they have no parents."""
client.post(
"/api/plugins/kanban/tasks",
json={"title": "must stay put", "triage": True},
)
# Run the dispatcher — it should NOT promote the triage task.
client.post("/api/plugins/kanban/dispatch?dry_run=false&max=4")
r = client.get("/api/plugins/kanban/board")
triage = next(c for c in r.json()["columns"] if c["name"] == "triage")
ready = next(c for c in r.json()["columns"] if c["name"] == "ready")
assert len(triage["tasks"]) == 1
assert len(ready["tasks"]) == 0
def test_patch_status_triage_works(client):
"""A user (or specifier) can push a task back into triage, and out of it."""
t = client.post(
"/api/plugins/kanban/tasks", json={"title": "x"},
).json()["task"]
# Normal creation is 'ready'; push to triage.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "triage"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "triage"
# Now promote to todo.
r = client.patch(
f"/api/plugins/kanban/tasks/{t['id']}", json={"status": "todo"},
)
assert r.status_code == 200
assert r.json()["task"]["status"] == "todo"
# ---------------------------------------------------------------------------
# Progress rollup (done children / total children)
# ---------------------------------------------------------------------------
def test_board_progress_rollup(client):
parent = client.post(
"/api/plugins/kanban/tasks", json={"title": "parent"},
).json()["task"]
child_a = client.post(
"/api/plugins/kanban/tasks",
json={"title": "a", "parents": [parent["id"]]},
).json()["task"]
child_b = client.post(
"/api/plugins/kanban/tasks",
json={"title": "b", "parents": [parent["id"]]},
).json()["task"]
# Children start as "todo" because the parent isn't done yet; promote
# them to "ready" so complete_task will accept the transition.
for cid in (child_a["id"], child_b["id"]):
r = client.patch(
f"/api/plugins/kanban/tasks/{cid}", json={"status": "ready"},
)
assert r.status_code == 200
# 0/2 done.
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 0, "total": 2}
# Complete one child. 1/2.
r = client.patch(
f"/api/plugins/kanban/tasks/{child_a['id']}",
json={"status": "done"},
)
assert r.status_code == 200
r = client.get("/api/plugins/kanban/board")
parent_row = next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == parent["id"]
)
assert parent_row["progress"] == {"done": 1, "total": 2}
# Childless tasks report progress=None, not {0/0}.
assert next(
t for col in r.json()["columns"] for t in col["tasks"]
if t["id"] == child_b["id"]
)["progress"] is None
# ---------------------------------------------------------------------------
# Auto-init on first board read
# ---------------------------------------------------------------------------
def test_board_auto_initializes_missing_db(tmp_path, monkeypatch):
"""If kanban.db doesn't exist yet, GET /board must create it, not 500."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
# Deliberately DO NOT call kb.init_db().
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
r = c.get("/api/plugins/kanban/board")
assert r.status_code == 200
assert (home / "kanban.db").exists(), "init_db wasn't invoked by /board"
# ---------------------------------------------------------------------------
# WebSocket auth (query-param token)
# ---------------------------------------------------------------------------
def test_ws_events_rejects_when_token_required(tmp_path, monkeypatch):
"""When _SESSION_TOKEN is set (normal dashboard context), a missing or
wrong ?token= query param must be rejected with policy-violation."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
kb.init_db()
# Stub web_server so _check_ws_token has a token to compare against.
import types
stub = types.SimpleNamespace(_SESSION_TOKEN="secret-xyz")
monkeypatch.setitem(sys.modules, "hermes_cli.web_server", stub)
app = FastAPI()
app.include_router(_load_plugin_router(), prefix="/api/plugins/kanban")
c = TestClient(app)
# No token → policy violation close.
from starlette.websockets import WebSocketDisconnect
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events"):
pass
assert exc.value.code == 1008
# Wrong token → policy violation close.
with pytest.raises(WebSocketDisconnect) as exc:
with c.websocket_connect("/api/plugins/kanban/events?token=nope"):
pass
assert exc.value.code == 1008
# Correct token → accepted (connect then close cleanly from our side).
with c.websocket_connect(
"/api/plugins/kanban/events?token=secret-xyz"
) as ws:
assert ws is not None # handshake succeeded
# ---------------------------------------------------------------------------
# Bulk actions
# ---------------------------------------------------------------------------
def test_bulk_status_ready(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
# Parent-less tasks land in "ready" already; push them to blocked first.
for tid in (a["id"], b["id"], c2["id"]):
client.patch(f"/api/plugins/kanban/tasks/{tid}",
json={"status": "blocked", "block_reason": "wait"})
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"], c2["id"]], "status": "ready"})
assert r.status_code == 200
results = r.json()["results"]
assert all(r["ok"] for r in results)
# All three are now ready.
board = client.get("/api/plugins/kanban/board").json()
ready = next(col for col in board["columns"] if col["name"] == "ready")
ids = {t["id"] for t in ready["tasks"]}
assert {a["id"], b["id"], c2["id"]}.issubset(ids)
def test_bulk_archive(client):
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks", json={"title": "b"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "archive": True})
assert r.status_code == 200
assert all(r["ok"] for r in r.json()["results"])
# Default board (archived hidden) — both gone.
board = client.get("/api/plugins/kanban/board").json()
ids = {t["id"] for col in board["columns"] for t in col["tasks"]}
assert a["id"] not in ids
assert b["id"] not in ids
def test_bulk_reassign(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "old"}).json()["task"]
b = client.post("/api/plugins/kanban/tasks",
json={"title": "b", "assignee": "old"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], b["id"]], "assignee": "new"})
assert r.status_code == 200
for tid in (a["id"], b["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["assignee"] == "new"
def test_bulk_unassign_via_empty_string(client):
a = client.post("/api/plugins/kanban/tasks",
json={"title": "a", "assignee": "x"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"]], "assignee": ""})
assert r.status_code == 200
t = client.get(f"/api/plugins/kanban/tasks/{a['id']}").json()["task"]
assert t["assignee"] is None
def test_bulk_partial_failure_doesnt_abort_siblings(client):
"""One bad id in the middle of a batch must not prevent others from
applying."""
a = client.post("/api/plugins/kanban/tasks", json={"title": "a"}).json()["task"]
c2 = client.post("/api/plugins/kanban/tasks", json={"title": "c"}).json()["task"]
r = client.post("/api/plugins/kanban/tasks/bulk",
json={"ids": [a["id"], "bogus-id", c2["id"]], "priority": 7})
assert r.status_code == 200
results = r.json()["results"]
assert len(results) == 3
ok_ids = {r["id"] for r in results if r["ok"]}
assert a["id"] in ok_ids
assert c2["id"] in ok_ids
assert any(not r["ok"] and r["id"] == "bogus-id" for r in results)
# Good siblings actually got the priority bump.
for tid in (a["id"], c2["id"]):
t = client.get(f"/api/plugins/kanban/tasks/{tid}").json()["task"]
assert t["priority"] == 7
def test_bulk_empty_ids_400(client):
r = client.post("/api/plugins/kanban/tasks/bulk", json={"ids": []})
assert r.status_code == 400
# ---------------------------------------------------------------------------
# /config endpoint
# ---------------------------------------------------------------------------
def test_config_returns_defaults_when_section_missing(client):
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
# Defaults when dashboard.kanban is missing.
assert data["default_tenant"] == ""
assert data["lane_by_profile"] is True
assert data["include_archived_by_default"] is False
assert data["render_markdown"] is True
def test_config_reads_dashboard_kanban_section(tmp_path, monkeypatch, client):
home = Path(os.environ["HERMES_HOME"])
(home / "config.yaml").write_text(
"dashboard:\n"
" kanban:\n"
" default_tenant: acme\n"
" lane_by_profile: false\n"
" include_archived_by_default: true\n"
" render_markdown: false\n"
)
r = client.get("/api/plugins/kanban/config")
assert r.status_code == 200
data = r.json()
assert data["default_tenant"] == "acme"
assert data["lane_by_profile"] is False
assert data["include_archived_by_default"] is True
assert data["render_markdown"] is False
# ---------------------------------------------------------------------------
# Runs surfacing (vulcan-artivus RFC feedback)
# ---------------------------------------------------------------------------
def test_task_detail_includes_runs(client):
"""GET /tasks/:id carries a runs[] array with the attempt history."""
r = client.post("/api/plugins/kanban/tasks",
json={"title": "port x", "assignee": "worker"}).json()
tid = r["task"]["id"]
# Drive status running to force a run creation: PATCH to running
# doesn't call claim_task (the PATCH path uses _set_status_direct),
# so use the bulk/claim indirection via the kernel.
import hermes_cli.kanban_db as _kb
conn = _kb.connect()
try:
_kb.claim_task(conn, tid)
_kb.complete_task(
conn, tid,
result="done",
summary="tested on rate limiter",
metadata={"changed_files": ["limiter.py"]},
)
finally:
conn.close()
d = client.get(f"/api/plugins/kanban/tasks/{tid}").json()
assert "runs" in d
assert len(d["runs"]) == 1
run = d["runs"][0]
assert run["outcome"] == "completed"
assert run["profile"] == "worker"
assert run["summary"] == "tested on rate limiter"
assert run["metadata"] == {"changed_files": ["limiter.py"]}
assert run["ended_at"] is not None
def test_task_detail_runs_empty_before_claim(client):
"""A task that's never been claimed has an empty runs[] list, not
a missing key."""
r = client.post("/api/plugins/kanban/tasks", json={"title": "fresh"}).json()
d = client.get(f"/api/plugins/kanban/tasks/{r['task']['id']}").json()
assert d["runs"] == []