diff --git a/gateway/run.py b/gateway/run.py index ad05de18aa..8917d13a41 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2489,7 +2489,7 @@ class GatewayRunner: logger.warning("kanban notifier: kanban_db not importable; notifier disabled") return - TERMINAL_KINDS = ("completed", "blocked", "spawn_auto_blocked", "crashed") + TERMINAL_KINDS = ("completed", "blocked", "gave_up", "crashed", "timed_out") # Initial delay so the gateway can finish wiring adapters. await asyncio.sleep(5) @@ -2560,12 +2560,12 @@ class GatewayRunner: if ev.payload and ev.payload.get("reason"): reason = f": {str(ev.payload['reason'])[:160]}" msg = f"⏸ Kanban {sub['task_id']} blocked{reason}" - elif kind == "spawn_auto_blocked": + elif kind == "gave_up": err = "" if ev.payload and ev.payload.get("error"): err = f"\n{str(ev.payload['error'])[:200]}" msg = ( - f"✖ Kanban {sub['task_id']} auto-blocked " + f"✖ Kanban {sub['task_id']} gave up " f"after repeated spawn failures{err}" ) elif kind == "crashed": @@ -2573,6 +2573,14 @@ class GatewayRunner: f"✖ Kanban {sub['task_id']} worker crashed " f"(pid gone); dispatcher will retry" ) + elif kind == "timed_out": + limit = 0 + if ev.payload and ev.payload.get("limit_seconds"): + limit = int(ev.payload["limit_seconds"]) + msg = ( + f"⏱ Kanban {sub['task_id']} timed out " + f"(max_runtime={limit}s); will retry" + ) else: continue metadata: dict[str, Any] = {} diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 8cbe001c7f..af4cb210dd 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -135,6 +135,11 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_create.add_argument("--idempotency-key", default=None, help="Dedup key. If a non-archived task with this key exists, " "its id is returned instead of creating a duplicate.") + p_create.add_argument("--max-runtime", default=None, + help="Per-task runtime cap. Accepts seconds (300) or " + "durations (90s, 30m, 2h, 1d). When exceeded, " + "the dispatcher SIGTERMs (then SIGKILLs) the worker " + "and re-queues the task.") p_create.add_argument("--created-by", default="user", help="Author name recorded on the task (default: user)") p_create.add_argument("--json", action="store_true", help="Emit JSON output") @@ -296,6 +301,23 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_log.add_argument("--tail", type=int, default=None, help="Only print the last N bytes") + # --- heartbeat (worker liveness signal) --- + p_hb = sub.add_parser( + "heartbeat", + help="Emit a heartbeat event for a running task (worker liveness signal)", + ) + p_hb.add_argument("task_id") + p_hb.add_argument("--note", default=None, + help="Optional short note attached to the heartbeat event") + + # --- assignees --- + p_asg = sub.add_parser( + "assignees", + help="List known profiles + per-profile task counts " + "(union of ~/.hermes/profiles/ and current assignees on the board)", + ) + p_asg.add_argument("--json", action="store_true") + # --- context --- (for spawned workers) p_ctx = sub.add_parser( "context", @@ -361,6 +383,8 @@ def kanban_command(args: argparse.Namespace) -> int: "watch": _cmd_watch, "stats": _cmd_stats, "log": _cmd_log, + "heartbeat": _cmd_heartbeat, + "assignees": _cmd_assignees, "notify-subscribe": _cmd_notify_subscribe, "notify-list": _cmd_notify_list, "notify-unsubscribe": _cmd_notify_unsubscribe, @@ -395,10 +419,53 @@ def _profile_author() -> str: return "user" +def _parse_duration(val) -> Optional[int]: + """Parse ``30s`` / ``5m`` / ``2h`` / ``1d`` or a raw integer → seconds. + + Returns None for empty input. Raises ValueError on malformed input so + the CLI can surface a usage error cleanly. + """ + if val is None or val == "": + return None + s = str(val).strip().lower() + # Bare integer → seconds. + try: + return int(s) + except ValueError: + pass + # Suffixed form. + units = {"s": 1, "m": 60, "h": 3600, "d": 86400} + if s and s[-1] in units: + try: + n = float(s[:-1]) + except ValueError as exc: + raise ValueError(f"malformed duration {val!r}") from exc + return int(n * units[s[-1]]) + raise ValueError(f"malformed duration {val!r} (expected 30s, 5m, 2h, 1d, or a number)") + + def _cmd_init(args: argparse.Namespace) -> int: path = kb.init_db() print(f"Kanban DB initialized at {path}") print() + # Enumerate profiles on disk so the user knows what assignees are + # already addressable. Multica does this auto-detection on its + # daemon start; we do it here at init time instead because our + # dispatcher doesn't need to enumerate — we just pass the name + # through to `hermes -p `. + try: + profiles = kb.list_profiles_on_disk() + except Exception: + profiles = [] + if profiles: + print(f"Discovered {len(profiles)} profile(s) on disk; any of these can " + f"be an --assignee:") + for name in profiles: + print(f" {name}") + else: + print("No profiles found under ~/.hermes/profiles/.") + print("Create one with `hermes -p setup` before assigning tasks.") + print() print("Next step: run the dispatcher so ready tasks actually get picked up.") print(" # Foreground (interactive, Ctrl-C to stop):") print(" hermes kanban daemon") @@ -410,8 +477,42 @@ def _cmd_init(args: argparse.Namespace) -> int: return 0 +def _cmd_heartbeat(args: argparse.Namespace) -> int: + with kb.connect() as conn: + ok = kb.heartbeat_worker(conn, args.task_id, note=getattr(args, "note", None)) + if not ok: + print(f"cannot heartbeat {args.task_id} (not running?)", file=sys.stderr) + return 1 + print(f"Heartbeat recorded for {args.task_id}") + return 0 + + +def _cmd_assignees(args: argparse.Namespace) -> int: + with kb.connect() as conn: + data = kb.known_assignees(conn) + if getattr(args, "json", False): + print(json.dumps(data, indent=2, ensure_ascii=False)) + return 0 + if not data: + print("(no assignees — create a profile with `hermes -p setup`)") + return 0 + # Header + print(f"{'NAME':20s} {'ON DISK':8s} COUNTS") + for entry in data: + on_disk = "yes" if entry["on_disk"] else "no" + counts = entry["counts"] or {} + count_str = ", ".join(f"{k}={v}" for k, v in sorted(counts.items())) or "(idle)" + print(f"{entry['name']:20s} {on_disk:8s} {count_str}") + return 0 + + def _cmd_create(args: argparse.Namespace) -> int: ws_kind, ws_path = _parse_workspace_flag(args.workspace) + try: + max_runtime = _parse_duration(getattr(args, "max_runtime", None)) + except ValueError as exc: + print(f"kanban: --max-runtime: {exc}", file=sys.stderr) + return 2 with kb.connect() as conn: task_id = kb.create_task( conn, @@ -426,6 +527,7 @@ def _cmd_create(args: argparse.Namespace) -> int: parents=tuple(args.parent or ()), triage=bool(getattr(args, "triage", False)), idempotency_key=getattr(args, "idempotency_key", None), + max_runtime_seconds=max_runtime, ) task = kb.get_task(conn, task_id) if getattr(args, "json", False): @@ -682,6 +784,7 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: print(json.dumps({ "reclaimed": res.reclaimed, "crashed": res.crashed, + "timed_out": res.timed_out, "auto_blocked": res.auto_blocked, "promoted": res.promoted, "spawned": [ @@ -695,6 +798,9 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: print(f"Crashed: {len(res.crashed)}") if res.crashed: print(f" {', '.join(res.crashed)}") + print(f"Timed out: {len(res.timed_out)}") + if res.timed_out: + print(f" {', '.join(res.timed_out)}") print(f"Auto-blocked: {len(res.auto_blocked)}") if res.auto_blocked: print(f" {', '.join(res.auto_blocked)}") @@ -730,13 +836,14 @@ def _cmd_daemon(args: argparse.Namespace) -> int: if not verbose: return did_work = ( - res.reclaimed or res.crashed or res.promoted + res.reclaimed or res.crashed or res.timed_out or res.promoted or res.spawned or res.auto_blocked ) if did_work: print( f"[{_fmt_ts(int(time.time()))}] " f"reclaimed={res.reclaimed} crashed={len(res.crashed)} " + f"timed_out={len(res.timed_out)} " f"promoted={res.promoted} spawned={len(res.spawned)} " f"auto_blocked={len(res.auto_blocked)}", flush=True, diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index c9f5126235..dd7d82cb85 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -88,6 +88,8 @@ class Task: spawn_failures: int = 0 worker_pid: Optional[int] = None last_spawn_error: Optional[str] = None + max_runtime_seconds: Optional[int] = None + last_heartbeat_at: Optional[int] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "Task": @@ -113,6 +115,12 @@ class Task: spawn_failures=row["spawn_failures"] if "spawn_failures" in keys else 0, worker_pid=row["worker_pid"] if "worker_pid" in keys else None, last_spawn_error=row["last_spawn_error"] if "last_spawn_error" in keys else None, + max_runtime_seconds=( + row["max_runtime_seconds"] if "max_runtime_seconds" in keys else None + ), + last_heartbeat_at=( + row["last_heartbeat_at"] if "last_heartbeat_at" in keys else None + ), ) @@ -140,26 +148,28 @@ class Event: SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS tasks ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - body TEXT, - assignee TEXT, - status TEXT NOT NULL, - priority INTEGER DEFAULT 0, - created_by TEXT, - created_at INTEGER NOT NULL, - started_at INTEGER, - completed_at INTEGER, - workspace_kind TEXT NOT NULL DEFAULT 'scratch', - workspace_path TEXT, - claim_lock TEXT, - claim_expires INTEGER, - tenant TEXT, - result TEXT, - idempotency_key TEXT, - spawn_failures INTEGER NOT NULL DEFAULT 0, - worker_pid INTEGER, - last_spawn_error TEXT + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + body TEXT, + assignee TEXT, + status TEXT NOT NULL, + priority INTEGER DEFAULT 0, + created_by TEXT, + created_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + workspace_kind TEXT NOT NULL DEFAULT 'scratch', + workspace_path TEXT, + claim_lock TEXT, + claim_expires INTEGER, + tenant TEXT, + result TEXT, + idempotency_key TEXT, + spawn_failures INTEGER NOT NULL DEFAULT 0, + worker_pid INTEGER, + last_spawn_error TEXT, + max_runtime_seconds INTEGER, + last_heartbeat_at INTEGER ); CREATE TABLE IF NOT EXISTS task_links ( @@ -264,6 +274,26 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER") if "last_spawn_error" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN last_spawn_error TEXT") + if "max_runtime_seconds" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER") + if "last_heartbeat_at" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER") + + # One-shot event-kind rename pass. The old names ("ready", "priority", + # "spawn_auto_blocked") still worked but were awkward on the wire; + # rename them in-place so existing DBs migrate cleanly. Fires once + # per DB because after the UPDATE no rows match the old kinds. + _EVENT_RENAMES = ( + # (old, new) + ("ready", "promoted"), + ("priority", "reprioritized"), + ("spawn_auto_blocked", "gave_up"), + ) + for old, new in _EVENT_RENAMES: + conn.execute( + "UPDATE task_events SET kind = ? WHERE kind = ?", + (new, old), + ) @contextlib.contextmanager @@ -325,6 +355,7 @@ def create_task( parents: Iterable[str] = (), triage: bool = False, idempotency_key: Optional[str] = None, + max_runtime_seconds: Optional[int] = None, ) -> str: """Create a new task and optionally link it under parent tasks. @@ -338,6 +369,10 @@ def create_task( same key already exists, returns the existing task's id instead of creating a duplicate. Useful for retried webhooks / automation that should not double-write. + + ``max_runtime_seconds`` caps how long a worker may run before the + dispatcher SIGTERMs (then SIGKILLs after a grace window) and + re-queues the task. ``None`` means no cap (default). """ if not title or not title.strip(): raise ValueError("title is required") @@ -400,8 +435,8 @@ def create_task( INSERT INTO tasks ( id, title, body, assignee, status, priority, created_by, created_at, workspace_kind, workspace_path, - tenant, idempotency_key - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + tenant, idempotency_key, max_runtime_seconds + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, @@ -416,6 +451,7 @@ def create_task( workspace_path, tenant, idempotency_key, + int(max_runtime_seconds) if max_runtime_seconds else None, ), ) for pid in parents: @@ -726,7 +762,7 @@ def recompute_ready(conn: sqlite3.Connection) -> int: "UPDATE tasks SET status = 'ready' WHERE id = ? AND status = 'todo'", (task_id,), ) - _append_event(conn, task_id, "ready", None) + _append_event(conn, task_id, "promoted", None) promoted += 1 return promoted @@ -989,6 +1025,8 @@ class DispatchResult: """Task ids reclaimed because their worker PID disappeared.""" auto_blocked: list[str] = field(default_factory=list) """Task ids auto-blocked by the spawn-failure circuit breaker.""" + timed_out: list[str] = field(default_factory=list) + """Task ids whose workers exceeded ``max_runtime_seconds``.""" def _pid_alive(pid: Optional[int]) -> bool: @@ -1013,6 +1051,137 @@ def _pid_alive(pid: Optional[int]) -> bool: return True +def heartbeat_worker( + conn: sqlite3.Connection, + task_id: str, + *, + note: Optional[str] = None, +) -> bool: + """Record a ``heartbeat`` event + touch ``last_heartbeat_at``. + + Called by long-running workers as a liveness signal orthogonal to + the PID check. A worker that forks a long-lived child (train loop, + video encode, web crawl) can have its Python still alive while the + actual work process is stuck; periodic heartbeats catch that. + + Returns True on success, False if the task is not in a state that + should be heartbeating (not running, or claim expired). + """ + now = int(time.time()) + with write_txn(conn): + cur = conn.execute( + "UPDATE tasks SET last_heartbeat_at = ? " + "WHERE id = ? AND status = 'running'", + (now, task_id), + ) + if cur.rowcount != 1: + return False + _append_event( + conn, task_id, "heartbeat", + {"note": note} if note else None, + ) + return True + + +def enforce_max_runtime( + conn: sqlite3.Connection, + *, + signal_fn=None, +) -> list[str]: + """Terminate workers whose per-task ``max_runtime_seconds`` has elapsed. + + Sends SIGTERM, waits a short grace window, then SIGKILL. Emits a + ``timed_out`` event and drops the task back to ``ready`` so the next + dispatcher tick re-spawns it — unless the spawn-failure circuit + breaker has already given up, in which case the task stays blocked + where ``_record_spawn_failure`` parked it. + + Runs host-local: only tasks claimed by this host are candidates + (same reasoning as ``detect_crashed_workers``). ``signal_fn`` is a + test hook; defaults to ``os.kill`` on POSIX. + """ + import signal + timed_out: list[str] = [] + now = int(time.time()) + host_prefix = f"{_claimer_id().split(':', 1)[0]}:" + + rows = conn.execute( + "SELECT id, worker_pid, started_at, max_runtime_seconds, claim_lock " + "FROM tasks " + "WHERE status = 'running' AND max_runtime_seconds IS NOT NULL " + " AND started_at IS NOT NULL AND worker_pid IS NOT NULL" + ).fetchall() + for row in rows: + lock = row["claim_lock"] or "" + if not lock.startswith(host_prefix): + continue + elapsed = now - int(row["started_at"]) + if elapsed < int(row["max_runtime_seconds"]): + continue + + pid = int(row["worker_pid"]) + tid = row["id"] + # SIGTERM then SIGKILL. Keep it simple: 5 s grace. Workers that + # want a cleaner shutdown can install their own SIGTERM handler + # before the grace expires. + killed = False + kill = signal_fn if signal_fn is not None else ( + os.kill if hasattr(os, "kill") else None + ) + if kill is not None: + try: + kill(pid, signal.SIGTERM) + except (ProcessLookupError, OSError): + pass + # Short polling wait — no time.sleep on the write txn. + for _ in range(10): + if not _pid_alive(pid): + break + time.sleep(0.5) + if _pid_alive(pid): + try: + kill(pid, signal.SIGKILL) + killed = True + except (ProcessLookupError, OSError): + pass + + with write_txn(conn): + cur = conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "last_heartbeat_at = NULL " + "WHERE id = ? AND status = 'running'", + (tid,), + ) + if cur.rowcount == 1: + _append_event( + conn, tid, "timed_out", + { + "pid": pid, + "elapsed_seconds": int(elapsed), + "limit_seconds": int(row["max_runtime_seconds"]), + "sigkill": killed, + }, + ) + timed_out.append(tid) + return timed_out + + +def set_max_runtime( + conn: sqlite3.Connection, + task_id: str, + seconds: Optional[int], +) -> bool: + """Set or clear the per-task max_runtime_seconds. Returns True on + success.""" + with write_txn(conn): + cur = conn.execute( + "UPDATE tasks SET max_runtime_seconds = ? WHERE id = ?", + (int(seconds) if seconds is not None else None, task_id), + ) + return cur.rowcount == 1 + + def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: """Reclaim ``running`` tasks whose worker PID is no longer alive. @@ -1081,7 +1250,7 @@ def _record_spawn_failure( (failures, error[:500], task_id), ) _append_event( - conn, task_id, "spawn_auto_blocked", + conn, task_id, "gave_up", {"failures": failures, "error": error[:500]}, ) blocked = True @@ -1101,11 +1270,18 @@ def _record_spawn_failure( def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None: + """Record the spawned child's pid + emit a ``spawned`` event. + + The event's payload carries the pid so a human reading ``hermes kanban + tail`` can correlate log lines with OS-level traces without opening + the drawer. + """ with write_txn(conn): conn.execute( "UPDATE tasks SET worker_pid = ? WHERE id = ?", (int(pid), task_id), ) + _append_event(conn, task_id, "spawned", {"pid": int(pid)}) def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None: @@ -1147,6 +1323,7 @@ def dispatch_once( result = DispatchResult() result.reclaimed = release_stale_claims(conn) result.crashed = detect_crashed_workers(conn) + result.timed_out = enforce_max_runtime(conn) result.promoted = recompute_ready(conn) ready_rows = conn.execute( @@ -1650,3 +1827,68 @@ def read_worker_log( return data.decode("utf-8", errors="replace") except OSError: return None + + +# --------------------------------------------------------------------------- +# Assignee enumeration (known profiles + per-profile board stats) +# --------------------------------------------------------------------------- + +def list_profiles_on_disk() -> list[str]: + """Return the set of named profiles discovered on disk. + + Reads ``~/.hermes/profiles/`` directly so this module has no import + dependency on ``hermes_cli.profiles`` (which pulls in a large chunk + of the CLI startup path). Only returns directories that contain a + ``config.yaml`` — a bare dir without config isn't a real profile. + """ + try: + home = Path.home() / ".hermes" / "profiles" + except Exception: + return [] + if not home.is_dir(): + return [] + names: list[str] = [] + try: + for entry in sorted(home.iterdir()): + if not entry.is_dir(): + continue + if (entry / "config.yaml").is_file(): + names.append(entry.name) + except OSError: + return names + return names + + +def known_assignees(conn: sqlite3.Connection) -> list[dict]: + """Return every assignee name known to the board or on disk. + + Each entry is ``{"name": str, "on_disk": bool, "counts": {status: n}}``. + A name is included when it's a configured profile on disk OR when + any non-archived task has it as the assignee. Used by: + + - ``hermes kanban assignees`` for the terminal. + - The dashboard assignee dropdown (so a fresh profile appears in + the picker even before it's been given any task). + - Router-profile heuristics ("who's overloaded?") without scanning + the whole board. + """ + on_disk = set(list_profiles_on_disk()) + + # Count tasks per (assignee, status), excluding archived. + counts: dict[str, dict[str, int]] = {} + for row in conn.execute( + "SELECT assignee, status, COUNT(*) AS n FROM tasks " + "WHERE status != 'archived' AND assignee IS NOT NULL " + "GROUP BY assignee, status" + ): + counts.setdefault(row["assignee"], {})[row["status"]] = int(row["n"]) + + names = sorted(on_disk | set(counts.keys())) + return [ + { + "name": name, + "on_disk": name in on_disk, + "counts": counts.get(name, {}), + } + for name in names + ] diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 0f7cac6f75..c54f81fb67 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -282,6 +282,7 @@ class CreateTaskBody(BaseModel): parents: list[str] = Field(default_factory=list) triage: bool = False idempotency_key: Optional[str] = None + max_runtime_seconds: Optional[int] = None @router.post("/tasks") @@ -301,6 +302,7 @@ def create_task(payload: CreateTaskBody): parents=payload.parents, triage=payload.triage, idempotency_key=payload.idempotency_key, + max_runtime_seconds=payload.max_runtime_seconds, ) task = kanban_db.get_task(conn, task_id) return {"task": _task_dict(task) if task else None} @@ -380,7 +382,7 @@ def update_task(task_id: str, payload: UpdateTaskBody): ) conn.execute( "INSERT INTO task_events (task_id, kind, payload, created_at) " - "VALUES (?, 'priority', ?, ?)", + "VALUES (?, 'reprioritized', ?, ?)", (task_id, json.dumps({"priority": int(payload.priority)}), int(time.time())), ) @@ -568,7 +570,7 @@ def bulk_update(payload: BulkTaskBody): ) conn.execute( "INSERT INTO task_events (task_id, kind, payload, created_at) " - "VALUES (?, 'priority', ?, ?)", + "VALUES (?, 'reprioritized', ?, ?)", (tid, json.dumps({"priority": int(payload.priority)}), int(time.time())), ) @@ -627,6 +629,22 @@ def get_stats(): conn.close() +@router.get("/assignees") +def get_assignees(): + """Known profiles + per-profile task counts. + + Returns the union of ``~/.hermes/profiles/*`` on disk and every + distinct assignee currently used on the board. The dashboard uses + this to populate its assignee dropdown so a freshly-created profile + appears in the picker before it's been given any task. + """ + conn = _conn() + try: + return {"assignees": kanban_db.known_assignees(conn)} + finally: + conn.close() + + # --------------------------------------------------------------------------- # Worker log (read-only; file written by _default_spawn) # --------------------------------------------------------------------------- diff --git a/skills/devops/kanban-worker/SKILL.md b/skills/devops/kanban-worker/SKILL.md index a6e6d54432..3ee456488d 100644 --- a/skills/devops/kanban-worker/SKILL.md +++ b/skills/devops/kanban-worker/SKILL.md @@ -104,6 +104,16 @@ hermes kanban comment $HERMES_KANBAN_TASK "note: skipped the sqlite driver path; Comments are the inter-agent protocol. Direct IPC does not exist; the board is the only channel. +## Heartbeat on long loops + +If your task forks a long-lived subprocess (training run, video encode, web crawl, batch upload), the dispatcher can't tell whether your Python is stuck or deliberately waiting. Call: + +```bash +hermes kanban heartbeat $HERMES_KANBAN_TASK --note "epoch 12/50, loss 0.31" +``` + +…every few minutes during the long wait. The note is optional; the signal itself is the point. Heartbeats show up in the event stream so humans reading `hermes kanban watch` can see you're still alive. Skip heartbeats for short tasks — they're noise below a few-minute runtime. + ## Do NOT - Do not call `delegate_task` as a substitute for creating kanban tasks — `delegate_task` is for short synchronous reasoning subtasks inside your own run, not for cross-agent handoffs. diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index f52a8f9cdd..f572162f92 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -610,3 +610,393 @@ def test_run_slash_every_verb_returns_sensible_output(kanban_home): out = run_slash(cmd) assert out is not None assert out.strip() != "", f"empty output for `/kanban {cmd}`" + + +# --------------------------------------------------------------------------- +# Max-runtime enforcement (item 1 from the Multica audit) +# --------------------------------------------------------------------------- + +def test_max_runtime_terminates_overrun_worker(kanban_home): + """A running task whose elapsed time exceeds max_runtime_seconds gets + SIGTERM'd, emits a ``timed_out`` event, and goes back to ready.""" + killed = [] + def _signal_fn(pid, sig): + killed.append((pid, sig)) + + # We bypass _pid_alive by stubbing it so the grace-poll exits fast. + import hermes_cli.kanban_db as _kb + original_alive = _kb._pid_alive + _kb._pid_alive = lambda pid: False # pretend SIGTERM worked immediately + + try: + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="long job", assignee="worker", + max_runtime_seconds=1, # one second cap + ) + # Spawn by hand: claim + set pid + set started_at to the past. + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, os.getpid()) # any live pid works + # Backdate started_at so elapsed > limit. + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", + (int(time.time()) - 30, tid), + ) + + timed_out = kb.enforce_max_runtime(conn, signal_fn=_signal_fn) + assert tid in timed_out + assert killed and killed[0][0] == os.getpid() + + task = kb.get_task(conn, tid) + assert task.status == "ready", f"timed-out task should reset to ready, got {task.status}" + assert task.worker_pid is None + assert task.last_heartbeat_at is None + + events = kb.list_events(conn, tid) + assert any(e.kind == "timed_out" for e in events) + to_event = next(e for e in events if e.kind == "timed_out") + assert to_event.payload["limit_seconds"] == 1 + assert to_event.payload["elapsed_seconds"] >= 30 + finally: + conn.close() + finally: + _kb._pid_alive = original_alive + + +def test_max_runtime_none_means_no_cap(kanban_home): + """A task with max_runtime_seconds=None is never timed out regardless + of how long it runs.""" + conn = kb.connect() + try: + tid = kb.create_task(conn, title="uncapped", assignee="worker") + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, os.getpid()) + # Backdate aggressively; no cap means we don't care. + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", + (int(time.time()) - 100_000, tid), + ) + timed_out = kb.enforce_max_runtime(conn) + assert timed_out == [] + task = kb.get_task(conn, tid) + assert task.status == "running" + finally: + conn.close() + + +def test_create_task_persists_max_runtime(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", max_runtime_seconds=600) + task = kb.get_task(conn, tid) + assert task.max_runtime_seconds == 600 + finally: + conn.close() + + +def test_enforce_max_runtime_integrates_with_dispatch(kanban_home, monkeypatch): + """enforce_max_runtime + dispatch_once integrate cleanly — a timed-out + task goes through ``timed_out`` → ``ready`` and dispatch_once can then + re-spawn it without re-reporting the timeout.""" + import hermes_cli.kanban_db as _kb + # Leave _pid_alive=True so the crash detector doesn't steal the task + # before timeout enforcement runs. After SIGTERM in enforce_max_runtime, + # pretend the worker died so the grace wait exits fast. + state = {"sent_term": False} + def _alive(pid): + return not state["sent_term"] + def _signal(pid, sig): + import signal as _sig + if sig == _sig.SIGTERM: + state["sent_term"] = True + monkeypatch.setattr(_kb, "_pid_alive", _alive) + + conn = kb.connect() + try: + tid = kb.create_task( + conn, title="timeout-me", assignee="worker", + max_runtime_seconds=1, + ) + kb.claim_task(conn, tid) + kb._set_worker_pid(conn, tid, os.getpid()) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET started_at = ? WHERE id = ?", + (int(time.time()) - 30, tid), + ) + # Use enforce_max_runtime directly with our signal stub — dispatch_once + # uses the default os.kill, but integration-wise calling + # enforce_max_runtime directly proves the kernel wiring. For the + # dispatch_once assertion, rely on its own code path by calling it + # after forcing SIGTERM via enforce_max_runtime. + before = kb.enforce_max_runtime(conn, signal_fn=_signal) + assert tid in before, "kernel enforce_max_runtime should catch the overrun" + + # Now a second dispatch_once run should be a no-op on this task + # (already released). Confirm the loop doesn't re-report it. + res = kb.dispatch_once(conn, spawn_fn=lambda t, ws: None) + task = kb.get_task(conn, tid) + # After timeout, task is back in 'ready' and will be re-spawned + # by the same pass. That's the intended behaviour. + assert task.status in ("ready", "running") + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Heartbeat (item 2 from the Multica audit) +# --------------------------------------------------------------------------- + +def test_heartbeat_on_running_task(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + ok = kb.heartbeat_worker(conn, tid, note="step 3/10") + assert ok is True + task = kb.get_task(conn, tid) + assert task.last_heartbeat_at is not None + events = kb.list_events(conn, tid) + hb = [e for e in events if e.kind == "heartbeat"] + assert len(hb) == 1 + assert hb[0].payload == {"note": "step 3/10"} + finally: + conn.close() + + +def test_heartbeat_refused_when_not_running(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") # lands in ready, not running + ok = kb.heartbeat_worker(conn, tid) + assert ok is False + task = kb.get_task(conn, tid) + assert task.last_heartbeat_at is None + finally: + conn.close() + + +def test_cli_heartbeat_verb(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.claim_task(conn, tid) + finally: + conn.close() + out = run_slash(f"heartbeat {tid}") + assert "Heartbeat recorded" in out + + # With --note. + out = run_slash(f"heartbeat {tid} --note 'step 42'") + assert "Heartbeat recorded" in out + conn = kb.connect() + try: + events = kb.list_events(conn, tid) + notes = [e.payload.get("note") for e in events if e.kind == "heartbeat" and e.payload] + assert "step 42" in notes + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Event vocab rename + spawned event (item 3 from Multica) +# --------------------------------------------------------------------------- + +def test_recompute_ready_emits_promoted_not_ready(kanban_home): + conn = kb.connect() + try: + parent = kb.create_task(conn, title="p") + child = kb.create_task(conn, title="c", parents=[parent]) + kb.complete_task(conn, parent, result="ok") + # recompute_ready runs inside complete_task too, but call it again + # defensively. + kb.recompute_ready(conn) + events = kb.list_events(conn, child) + kinds = [e.kind for e in events] + assert "promoted" in kinds + # Old name must not appear. + assert "ready" not in kinds + finally: + conn.close() + + +def test_spawn_failure_circuit_breaker_emits_gave_up(kanban_home): + def _bad(task, ws): + raise RuntimeError("nope") + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + for _ in range(5): + kb.dispatch_once(conn, spawn_fn=_bad, failure_limit=5) + events = kb.list_events(conn, tid) + kinds = [e.kind for e in events] + assert "gave_up" in kinds + assert "spawn_auto_blocked" not in kinds + finally: + conn.close() + + +def test_spawned_event_emitted_with_pid(kanban_home): + """Successful spawn must append a ``spawned`` event with the pid in + the payload so humans tailing events see pid tracking.""" + def _spawn_returns_pid(task, ws): + return 98765 + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + kb.dispatch_once(conn, spawn_fn=_spawn_returns_pid) + events = kb.list_events(conn, tid) + spawned = [e for e in events if e.kind == "spawned"] + assert len(spawned) == 1 + assert spawned[0].payload == {"pid": 98765} + finally: + conn.close() + + +def test_migration_renames_legacy_event_kinds(tmp_path, monkeypatch): + """A DB created with the old vocab must have its event rows renamed + in place on init_db().""" + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + # Init fresh. + kb.init_db() + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + # Inject legacy event kinds directly. + now = int(time.time()) + with kb.write_txn(conn): + for old in ("ready", "priority", "spawn_auto_blocked"): + conn.execute( + "INSERT INTO task_events (task_id, kind, payload, created_at) " + "VALUES (?, ?, NULL, ?)", + (tid, old, now), + ) + # Re-run init_db — the migration pass should rename them. + kb.init_db() + rows = conn.execute( + "SELECT kind FROM task_events WHERE task_id = ? ORDER BY id", (tid,), + ).fetchall() + kinds = [r["kind"] for r in rows] + assert "ready" not in kinds + assert "priority" not in kinds + assert "spawn_auto_blocked" not in kinds + assert "promoted" in kinds + assert "reprioritized" in kinds + assert "gave_up" in kinds + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Assignees (item 4 from Multica) +# --------------------------------------------------------------------------- + +def test_list_profiles_on_disk(tmp_path, monkeypatch): + """list_profiles_on_disk returns directories under ~/.hermes/profiles/ + that contain a config.yaml.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + profiles = tmp_path / ".hermes" / "profiles" + profiles.mkdir(parents=True) + (profiles / "researcher").mkdir() + (profiles / "researcher" / "config.yaml").write_text("model: {}\n") + (profiles / "writer").mkdir() + (profiles / "writer" / "config.yaml").write_text("model: {}\n") + (profiles / "empty_dir").mkdir() + # A stray file; should be ignored. + (profiles / "stray.txt").write_text("noise") + + names = kb.list_profiles_on_disk() + assert names == ["researcher", "writer"] + + +def test_known_assignees_merges_disk_and_board(tmp_path, monkeypatch): + """known_assignees unions profiles on disk with currently-assigned + names, and reports per-status counts.""" + monkeypatch.setattr(Path, "home", lambda: tmp_path) + profiles = tmp_path / ".hermes" / "profiles" + profiles.mkdir(parents=True) + monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes")) + + for name in ("researcher", "writer"): + d = profiles / name + d.mkdir() + (d / "config.yaml").write_text("model: {}\n") + + kb.init_db() + conn = kb.connect() + try: + # writer has a ready task; on_board_only has a task but no profile dir. + kb.create_task(conn, title="a", assignee="writer") + kb.create_task(conn, title="b", assignee="on_board_only") + data = kb.known_assignees(conn) + finally: + conn.close() + + by_name = {d["name"]: d for d in data} + assert by_name["researcher"]["on_disk"] is True + assert by_name["researcher"]["counts"] == {} + assert by_name["writer"]["on_disk"] is True + assert by_name["writer"]["counts"] == {"ready": 1} + assert by_name["on_board_only"]["on_disk"] is False + assert by_name["on_board_only"]["counts"] == {"ready": 1} + + +def test_cli_assignees_json(kanban_home): + conn = kb.connect() + try: + kb.create_task(conn, title="x", assignee="someone") + finally: + conn.close() + out = run_slash("assignees --json") + data = json.loads(out) + names = [e["name"] for e in data] + assert "someone" in names + + +# --------------------------------------------------------------------------- +# CLI --max-runtime flag + duration parser +# --------------------------------------------------------------------------- + +def test_parse_duration_accepts_formats(): + from hermes_cli.kanban import _parse_duration + assert _parse_duration(None) is None + assert _parse_duration("") is None + assert _parse_duration("42") == 42 + assert _parse_duration("30s") == 30 + assert _parse_duration("5m") == 300 + assert _parse_duration("2h") == 7200 + assert _parse_duration("1d") == 86400 + assert _parse_duration("1.5h") == 5400 + + +def test_parse_duration_rejects_garbage(): + from hermes_cli.kanban import _parse_duration + import pytest as _p + with _p.raises(ValueError): + _parse_duration("tenminutes") + with _p.raises(ValueError): + _parse_duration("fish") + + +def test_cli_create_max_runtime_via_duration(kanban_home): + """`hermes kanban create --max-runtime 2h` should persist 7200 seconds.""" + out = run_slash("create 'long task' --max-runtime 2h --json") + data = json.loads(out) + tid = data["id"] + conn = kb.connect() + try: + task = kb.get_task(conn, tid) + assert task.max_runtime_seconds == 7200 + finally: + conn.close() + + +def test_cli_create_max_runtime_bad_format_exits_nonzero(kanban_home): + out = run_slash("create 'bad' --max-runtime fish") + assert "max-runtime" in out.lower() or "malformed" in out.lower() diff --git a/website/docs/user-guide/features/kanban.md b/website/docs/user-guide/features/kanban.md index 62ff2d3c5f..e35b00858c 100644 --- a/website/docs/user-guide/features/kanban.md +++ b/website/docs/user-guide/features/kanban.md @@ -263,6 +263,7 @@ hermes kanban create "" [--body ...] [--assignee <profile>] [--parent <id>]... [--tenant <name>] [--workspace scratch|worktree|dir:<path>] [--priority N] [--triage] [--idempotency-key KEY] + [--max-runtime 30m|2h|1d|<seconds>] [--json] hermes kanban list [--mine] [--assignee P] [--status S] [--tenant T] [--archived] [--json] hermes kanban show <id> [--json] @@ -281,6 +282,8 @@ hermes kanban archive <id>... hermes kanban tail <id> # follow a single task's event stream hermes kanban watch [--assignee P] [--tenant T] # live stream ALL events to the terminal [--kinds completed,blocked,…] [--interval SECS] +hermes kanban heartbeat <id> [--note "..."] # worker liveness signal for long ops +hermes kanban assignees [--json] # profiles on disk + per-assignee task counts hermes kanban dispatch [--dry-run] [--max N] # one-shot pass [--failure-limit N] [--json] hermes kanban daemon [--interval SECS] [--max N] # long-lived loop @@ -332,7 +335,7 @@ Workers receive `$HERMES_TENANT` and namespace their memory writes by prefix. Th ## Gateway notifications -When you run `/kanban create …` from the gateway (Telegram, Discord, Slack, etc.), the originating chat is automatically subscribed to the new task. The gateway's background notifier polls `task_events` every few seconds and delivers one message per terminal event (`completed`, `blocked`, `spawn_auto_blocked`, `crashed`) to that chat. Completed tasks also send the first line of the worker's `--result` so you see the outcome without having to `/kanban show`. +When you run `/kanban create …` from the gateway (Telegram, Discord, Slack, etc.), the originating chat is automatically subscribed to the new task. The gateway's background notifier polls `task_events` every few seconds and delivers one message per terminal event (`completed`, `blocked`, `gave_up`, `crashed`, `timed_out`) to that chat. Completed tasks also send the first line of the worker's `--result` so you see the outcome without having to `/kanban show`. You can manage subscriptions explicitly from the CLI — useful when a script / cron job wants to notify a chat it didn't originate from: @@ -346,6 +349,45 @@ hermes kanban notify-unsubscribe t_abcd \ A subscription removes itself automatically once the task reaches `done` or `archived`; no cleanup needed. +## Event reference + +Every transition appends a row to `task_events`. The kinds group into three clusters so filtering is easy (`hermes kanban watch --kinds completed,gave_up,timed_out`): + +**Lifecycle** (what changed about the task as a logical unit): + +| Kind | When | +|---|---| +| `created` | Task inserted. | +| `promoted` | `todo → ready` because all parents hit `done`. | +| `claimed` | Dispatcher atomically claimed a `ready` task for spawn. | +| `completed` | Worker wrote `--result` and task hit `done`. | +| `blocked` | Worker or human flipped the task to `blocked`. | +| `unblocked` | `blocked → ready`, either manually or via `/unblock`. | +| `archived` | Hidden from the default board. | + +**Edits** (human-driven changes that aren't transitions): + +| Kind | When | +|---|---| +| `assigned` | Assignee changed (including unassignment). | +| `edited` | Title or body updated. | +| `reprioritized` | Priority changed. | +| `status` | Dashboard drag-drop wrote a status directly (e.g. `todo → ready`). | + +**Worker telemetry** (about the execution process, not the logical task): + +| Kind | Payload | When | +|---|---|---| +| `spawned` | `{pid}` | Dispatcher successfully started a worker process. | +| `heartbeat` | `{note?}` | Worker called `hermes kanban heartbeat $TASK` to signal liveness during long operations. | +| `reclaimed` | `{stale_lock}` | Claim TTL expired without a completion; task goes back to `ready`. | +| `crashed` | `{pid, claimer}` | Worker PID no longer alive but TTL hadn't expired yet. | +| `timed_out` | `{pid, elapsed_seconds, limit_seconds, sigkill}` | `max_runtime_seconds` exceeded; dispatcher SIGTERM'd (then SIGKILL'd after 5 s grace) and re-queued. | +| `spawn_failed` | `{error, failures}` | One spawn attempt failed (missing PATH, workspace unmountable, …). Counter increments; task returns to `ready` for retry. | +| `gave_up` | `{failures, error}` | Circuit breaker fired after N consecutive `spawn_failed`. Task auto-blocks with the last error. Default N = 5; override via `--failure-limit`. | + +`hermes kanban tail <id>` shows these for a single task. `hermes kanban watch` streams them board-wide. + ## Out of scope Kanban is deliberately single-host. `~/.hermes/kanban.db` is a local SQLite file and the dispatcher spawns workers on the same machine. Running a shared board across two hosts is not supported — there's no coordination primitive for "worker X on host A, worker Y on host B," and the crash-detection path assumes PIDs are host-local. If you need multi-host, run an independent board per host and use `delegate_task` / a message queue to bridge them.