mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 23:11:37 +08:00
feat(kanban): dashboard plugin — Linear/Fusion-style board UI
Ships plugins/kanban/dashboard/ as a bundled dashboard plugin. No core changes — uses the standard dashboard plugin contract (manifest.json + dist/index.js + plugin_api.py) documented in 'Extending the Dashboard'. What the tab gives you: - One column per kanban status (todo / ready / running / blocked / done; archived behind a toggle), column counts, coloured status dots. - Cards with id, title, priority badge, tenant tag, assignee, comment/link counts, 'created N ago'. - HTML5 drag-drop between columns — status change routes through the same kanban_db code the CLI /kanban verbs use, so the three surfaces (CLI, gateway, dashboard) can never drift. - Inline create per-column (title, assignee, priority). - Side drawer on card click: description, status action row (→ ready / → running / block / unblock / complete / archive), dependency links, comment thread with Enter-to-submit, last 20 events. - Toolbar: search, tenant filter, assignee filter, show-archived, nudge-dispatcher (skip the 60s wait), refresh. - Live updates via WebSocket tailing task_events — the board reflects CLI or gateway actions in real time. REST surface under /api/plugins/kanban/: GET /board, GET /tasks/:id, POST /tasks, PATCH /tasks/:id, POST /tasks/:id/comments, POST /links, DELETE /links, POST /dispatch, WS /events. Every handler is a thin wrapper around kanban_db — no new business logic. Visually theme-aware: the plugin CSS reads only --color-*, --radius, --font-mono etc. so it reskins with whichever dashboard theme is active. Tests (tests/plugins/test_kanban_dashboard_plugin.py, 16 tests): - empty board shape - create + appears in ready column with tenant/assignee rollups - tenant filter - detail includes parents/children/events - 404 on unknown task - PATCH status: complete / block / unblock / ready drag-drop / running - PATCH reassign, priority, edit, invalid-status rejection - POST comment (plus empty-body rejection) - POST link + DELETE link + cycle rejection - POST dispatch (dry run) All 76 kanban tests pass under scripts/run_tests.sh. Docs: website/docs/user-guide/features/kanban.md gains a full 'Dashboard (GUI)' section covering install, architecture, REST surface, live-updates mechanism, extending, and scope boundary.
This commit is contained in:
496
plugins/kanban/dashboard/plugin_api.py
Normal file
496
plugins/kanban/dashboard/plugin_api.py
Normal file
@@ -0,0 +1,496 @@
|
||||
"""Kanban dashboard plugin — backend API routes.
|
||||
|
||||
Mounted at /api/plugins/kanban/ by the dashboard plugin system.
|
||||
|
||||
This layer is intentionally thin: every handler is a small wrapper around
|
||||
``hermes_cli.kanban_db`` or a direct SQL query. Writes use the same code
|
||||
paths the CLI and gateway ``/kanban`` command use, so the three surfaces
|
||||
cannot drift.
|
||||
|
||||
Live updates arrive via the ``/events`` WebSocket, which tails the
|
||||
append-only ``task_events`` table on a short poll interval (WAL mode lets
|
||||
reads run alongside the dispatcher's IMMEDIATE write transactions).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
import time
|
||||
from dataclasses import asdict
|
||||
from typing import Any, Optional
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Query, WebSocket, WebSocketDisconnect
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from hermes_cli import kanban_db
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Serialization helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Columns shown by the dashboard, in left-to-right order. "archived" is
|
||||
# available via a filter toggle rather than a visible column.
|
||||
BOARD_COLUMNS: list[str] = ["todo", "ready", "running", "blocked", "done"]
|
||||
|
||||
|
||||
def _task_dict(task: kanban_db.Task) -> dict[str, Any]:
|
||||
d = asdict(task)
|
||||
# Keep body short on list endpoints; full body comes from /tasks/:id.
|
||||
return d
|
||||
|
||||
|
||||
def _event_dict(event: kanban_db.Event) -> dict[str, Any]:
|
||||
return {
|
||||
"id": event.id,
|
||||
"task_id": event.task_id,
|
||||
"kind": event.kind,
|
||||
"payload": event.payload,
|
||||
"created_at": event.created_at,
|
||||
}
|
||||
|
||||
|
||||
def _comment_dict(c: kanban_db.Comment) -> dict[str, Any]:
|
||||
return {
|
||||
"id": c.id,
|
||||
"task_id": c.task_id,
|
||||
"author": c.author,
|
||||
"body": c.body,
|
||||
"created_at": c.created_at,
|
||||
}
|
||||
|
||||
|
||||
def _links_for(conn: sqlite3.Connection, task_id: str) -> dict[str, list[str]]:
|
||||
"""Return {'parents': [...], 'children': [...]} for a task."""
|
||||
parents = [
|
||||
r["parent_id"]
|
||||
for r in conn.execute(
|
||||
"SELECT parent_id FROM task_links WHERE child_id = ? ORDER BY parent_id",
|
||||
(task_id,),
|
||||
)
|
||||
]
|
||||
children = [
|
||||
r["child_id"]
|
||||
for r in conn.execute(
|
||||
"SELECT child_id FROM task_links WHERE parent_id = ? ORDER BY child_id",
|
||||
(task_id,),
|
||||
)
|
||||
]
|
||||
return {"parents": parents, "children": children}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /board
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@router.get("/board")
|
||||
def get_board(
|
||||
tenant: Optional[str] = Query(None, description="Filter to a single tenant"),
|
||||
include_archived: bool = Query(False),
|
||||
):
|
||||
"""Return the full board grouped by status column."""
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
tasks = kanban_db.list_tasks(
|
||||
conn, tenant=tenant, include_archived=include_archived
|
||||
)
|
||||
# Pre-fetch link counts per task (cheap: one query).
|
||||
link_counts: dict[str, dict[str, int]] = {}
|
||||
for row in conn.execute(
|
||||
"SELECT parent_id, child_id FROM task_links"
|
||||
).fetchall():
|
||||
link_counts.setdefault(row["parent_id"], {"parents": 0, "children": 0})[
|
||||
"children"
|
||||
] += 1
|
||||
link_counts.setdefault(row["child_id"], {"parents": 0, "children": 0})[
|
||||
"parents"
|
||||
] += 1
|
||||
|
||||
# Comment + event counts (both cheap aggregates).
|
||||
comment_counts: dict[str, int] = {
|
||||
r["task_id"]: r["n"]
|
||||
for r in conn.execute(
|
||||
"SELECT task_id, COUNT(*) AS n FROM task_comments GROUP BY task_id"
|
||||
)
|
||||
}
|
||||
|
||||
latest_event_id = conn.execute(
|
||||
"SELECT COALESCE(MAX(id), 0) AS m FROM task_events"
|
||||
).fetchone()["m"]
|
||||
|
||||
columns: dict[str, list[dict]] = {c: [] for c in BOARD_COLUMNS}
|
||||
if include_archived:
|
||||
columns["archived"] = []
|
||||
|
||||
for t in tasks:
|
||||
d = _task_dict(t)
|
||||
d["link_counts"] = link_counts.get(t.id, {"parents": 0, "children": 0})
|
||||
d["comment_count"] = comment_counts.get(t.id, 0)
|
||||
col = t.status if t.status in columns else "todo"
|
||||
columns[col].append(d)
|
||||
|
||||
# Stable per-column ordering already applied by list_tasks
|
||||
# (priority DESC, created_at ASC), keep as-is.
|
||||
|
||||
# List of known tenants for the UI filter dropdown.
|
||||
tenants = [
|
||||
r["tenant"]
|
||||
for r in conn.execute(
|
||||
"SELECT DISTINCT tenant FROM tasks WHERE tenant IS NOT NULL ORDER BY tenant"
|
||||
)
|
||||
]
|
||||
# List of distinct assignees for the lane-by-profile sub-grouping.
|
||||
assignees = [
|
||||
r["assignee"]
|
||||
for r in conn.execute(
|
||||
"SELECT DISTINCT assignee FROM tasks WHERE assignee IS NOT NULL "
|
||||
"AND status != 'archived' ORDER BY assignee"
|
||||
)
|
||||
]
|
||||
|
||||
return {
|
||||
"columns": [
|
||||
{"name": name, "tasks": columns[name]} for name in columns.keys()
|
||||
],
|
||||
"tenants": tenants,
|
||||
"assignees": assignees,
|
||||
"latest_event_id": int(latest_event_id),
|
||||
"now": int(time.time()),
|
||||
}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /tasks/:id
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@router.get("/tasks/{task_id}")
|
||||
def get_task(task_id: str):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
task = kanban_db.get_task(conn, task_id)
|
||||
if task is None:
|
||||
raise HTTPException(status_code=404, detail=f"task {task_id} not found")
|
||||
return {
|
||||
"task": _task_dict(task),
|
||||
"comments": [_comment_dict(c) for c in kanban_db.list_comments(conn, task_id)],
|
||||
"events": [_event_dict(e) for e in kanban_db.list_events(conn, task_id)],
|
||||
"links": _links_for(conn, task_id),
|
||||
}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /tasks
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class CreateTaskBody(BaseModel):
|
||||
title: str
|
||||
body: Optional[str] = None
|
||||
assignee: Optional[str] = None
|
||||
tenant: Optional[str] = None
|
||||
priority: int = 0
|
||||
workspace_kind: str = "scratch"
|
||||
workspace_path: Optional[str] = None
|
||||
parents: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
@router.post("/tasks")
|
||||
def create_task(payload: CreateTaskBody):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
task_id = kanban_db.create_task(
|
||||
conn,
|
||||
title=payload.title,
|
||||
body=payload.body,
|
||||
assignee=payload.assignee,
|
||||
created_by="dashboard",
|
||||
workspace_kind=payload.workspace_kind,
|
||||
workspace_path=payload.workspace_path,
|
||||
tenant=payload.tenant,
|
||||
priority=payload.priority,
|
||||
parents=payload.parents,
|
||||
)
|
||||
task = kanban_db.get_task(conn, task_id)
|
||||
return {"task": _task_dict(task) if task else None}
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PATCH /tasks/:id (status / assignee / priority / title / body)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class UpdateTaskBody(BaseModel):
|
||||
status: Optional[str] = None
|
||||
assignee: Optional[str] = None
|
||||
priority: Optional[int] = None
|
||||
title: Optional[str] = None
|
||||
body: Optional[str] = None
|
||||
result: Optional[str] = None
|
||||
block_reason: Optional[str] = None
|
||||
|
||||
|
||||
@router.patch("/tasks/{task_id}")
|
||||
def update_task(task_id: str, payload: UpdateTaskBody):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
task = kanban_db.get_task(conn, task_id)
|
||||
if task is None:
|
||||
raise HTTPException(status_code=404, detail=f"task {task_id} not found")
|
||||
|
||||
# --- assignee ----------------------------------------------------
|
||||
if payload.assignee is not None:
|
||||
try:
|
||||
ok = kanban_db.assign_task(
|
||||
conn, task_id, payload.assignee or None,
|
||||
)
|
||||
except RuntimeError as e:
|
||||
raise HTTPException(status_code=409, detail=str(e))
|
||||
if not ok:
|
||||
raise HTTPException(status_code=404, detail="task not found")
|
||||
|
||||
# --- status -------------------------------------------------------
|
||||
if payload.status is not None:
|
||||
s = payload.status
|
||||
ok = True
|
||||
if s == "done":
|
||||
ok = kanban_db.complete_task(conn, task_id, result=payload.result)
|
||||
elif s == "blocked":
|
||||
ok = kanban_db.block_task(conn, task_id, reason=payload.block_reason)
|
||||
elif s == "ready":
|
||||
# Re-open a blocked task, or just an explicit status set.
|
||||
current = kanban_db.get_task(conn, task_id)
|
||||
if current and current.status == "blocked":
|
||||
ok = kanban_db.unblock_task(conn, task_id)
|
||||
else:
|
||||
# Direct status write for drag-drop (todo -> ready etc).
|
||||
ok = _set_status_direct(conn, task_id, "ready")
|
||||
elif s == "archived":
|
||||
ok = kanban_db.archive_task(conn, task_id)
|
||||
elif s in ("todo", "running"):
|
||||
ok = _set_status_direct(conn, task_id, s)
|
||||
else:
|
||||
raise HTTPException(status_code=400, detail=f"unknown status: {s}")
|
||||
if not ok:
|
||||
raise HTTPException(
|
||||
status_code=409,
|
||||
detail=f"status transition to {s!r} not valid from current state",
|
||||
)
|
||||
|
||||
# --- priority -----------------------------------------------------
|
||||
if payload.priority is not None:
|
||||
with kanban_db.write_txn(conn):
|
||||
conn.execute(
|
||||
"UPDATE tasks SET priority = ? WHERE id = ?",
|
||||
(int(payload.priority), task_id),
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO task_events (task_id, kind, payload, created_at) "
|
||||
"VALUES (?, 'priority', ?, ?)",
|
||||
(task_id, json.dumps({"priority": int(payload.priority)}),
|
||||
int(time.time())),
|
||||
)
|
||||
|
||||
# --- title / body -------------------------------------------------
|
||||
if payload.title is not None or payload.body is not None:
|
||||
with kanban_db.write_txn(conn):
|
||||
sets, vals = [], []
|
||||
if payload.title is not None:
|
||||
if not payload.title.strip():
|
||||
raise HTTPException(status_code=400, detail="title cannot be empty")
|
||||
sets.append("title = ?")
|
||||
vals.append(payload.title.strip())
|
||||
if payload.body is not None:
|
||||
sets.append("body = ?")
|
||||
vals.append(payload.body)
|
||||
vals.append(task_id)
|
||||
conn.execute(
|
||||
f"UPDATE tasks SET {', '.join(sets)} WHERE id = ?", vals,
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO task_events (task_id, kind, payload, created_at) "
|
||||
"VALUES (?, 'edited', NULL, ?)",
|
||||
(task_id, int(time.time())),
|
||||
)
|
||||
|
||||
updated = kanban_db.get_task(conn, task_id)
|
||||
return {"task": _task_dict(updated) if updated else None}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def _set_status_direct(
|
||||
conn: sqlite3.Connection, task_id: str, new_status: str,
|
||||
) -> bool:
|
||||
"""Direct status write for drag-drop moves that aren't covered by the
|
||||
structured complete/block/unblock/archive verbs (e.g. todo<->ready,
|
||||
running<->ready). Appends a ``status`` event row for the live feed."""
|
||||
with kanban_db.write_txn(conn):
|
||||
cur = conn.execute(
|
||||
"UPDATE tasks SET status = ?, "
|
||||
" claim_lock = CASE WHEN ? = 'running' THEN claim_lock ELSE NULL END, "
|
||||
" claim_expires = CASE WHEN ? = 'running' THEN claim_expires ELSE NULL END "
|
||||
"WHERE id = ?",
|
||||
(new_status, new_status, new_status, task_id),
|
||||
)
|
||||
if cur.rowcount != 1:
|
||||
return False
|
||||
conn.execute(
|
||||
"INSERT INTO task_events (task_id, kind, payload, created_at) "
|
||||
"VALUES (?, 'status', ?, ?)",
|
||||
(task_id, json.dumps({"status": new_status}), int(time.time())),
|
||||
)
|
||||
# If we re-opened something, children may have gone stale.
|
||||
if new_status in ("done", "ready"):
|
||||
kanban_db.recompute_ready(conn)
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Comments
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class CommentBody(BaseModel):
|
||||
body: str
|
||||
author: Optional[str] = "dashboard"
|
||||
|
||||
|
||||
@router.post("/tasks/{task_id}/comments")
|
||||
def add_comment(task_id: str, payload: CommentBody):
|
||||
if not payload.body.strip():
|
||||
raise HTTPException(status_code=400, detail="body is required")
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
if kanban_db.get_task(conn, task_id) is None:
|
||||
raise HTTPException(status_code=404, detail=f"task {task_id} not found")
|
||||
kanban_db.add_comment(
|
||||
conn, task_id, author=payload.author or "dashboard", body=payload.body,
|
||||
)
|
||||
return {"ok": True}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Links
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class LinkBody(BaseModel):
|
||||
parent_id: str
|
||||
child_id: str
|
||||
|
||||
|
||||
@router.post("/links")
|
||||
def add_link(payload: LinkBody):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
kanban_db.link_tasks(conn, payload.parent_id, payload.child_id)
|
||||
return {"ok": True}
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@router.delete("/links")
|
||||
def delete_link(parent_id: str = Query(...), child_id: str = Query(...)):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
ok = kanban_db.unlink_tasks(conn, parent_id, child_id)
|
||||
return {"ok": bool(ok)}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch nudge (optional quick-path so the UI doesn't wait 60 s)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@router.post("/dispatch")
|
||||
def dispatch(dry_run: bool = Query(False), max_n: int = Query(8, alias="max")):
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
result = kanban_db.dispatch_once(
|
||||
conn, dry_run=dry_run, max_spawn=max_n,
|
||||
)
|
||||
# DispatchResult is a dataclass.
|
||||
try:
|
||||
return asdict(result)
|
||||
except TypeError:
|
||||
return {"result": str(result)}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WebSocket: /events?since=<event_id>
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Poll interval for the event tail loop. SQLite WAL + 300 ms polling is
|
||||
# the simplest and most robust approach; it adds a fraction of a percent
|
||||
# of CPU and has no shared state to synchronize across workers.
|
||||
_EVENT_POLL_SECONDS = 0.3
|
||||
|
||||
|
||||
@router.websocket("/events")
|
||||
async def stream_events(ws: WebSocket):
|
||||
await ws.accept()
|
||||
try:
|
||||
since_raw = ws.query_params.get("since", "0")
|
||||
try:
|
||||
cursor = int(since_raw)
|
||||
except ValueError:
|
||||
cursor = 0
|
||||
|
||||
def _fetch_new(cursor_val: int) -> tuple[int, list[dict]]:
|
||||
conn = kanban_db.connect()
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"SELECT id, task_id, kind, payload, created_at "
|
||||
"FROM task_events WHERE id > ? ORDER BY id ASC LIMIT 200",
|
||||
(cursor_val,),
|
||||
).fetchall()
|
||||
out: list[dict] = []
|
||||
new_cursor = cursor_val
|
||||
for r in rows:
|
||||
try:
|
||||
payload = json.loads(r["payload"]) if r["payload"] else None
|
||||
except Exception:
|
||||
payload = None
|
||||
out.append({
|
||||
"id": r["id"],
|
||||
"task_id": r["task_id"],
|
||||
"kind": r["kind"],
|
||||
"payload": payload,
|
||||
"created_at": r["created_at"],
|
||||
})
|
||||
new_cursor = r["id"]
|
||||
return new_cursor, out
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
while True:
|
||||
cursor, events = await asyncio.to_thread(_fetch_new, cursor)
|
||||
if events:
|
||||
await ws.send_json({"events": events, "cursor": cursor})
|
||||
await asyncio.sleep(_EVENT_POLL_SECONDS)
|
||||
except WebSocketDisconnect:
|
||||
return
|
||||
except Exception as exc: # defensive: never crash the dashboard worker
|
||||
log.warning("Kanban event stream error: %s", exc)
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user