diff --git a/gateway/run.py b/gateway/run.py index c85210515f..ad05de18aa 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2288,6 +2288,11 @@ class GatewayRunner: # Start background session expiry watcher to finalize expired sessions asyncio.create_task(self._session_expiry_watcher()) + # Start background kanban notifier — delivers `completed`, `blocked`, + # `spawn_auto_blocked`, and `crashed` events to gateway subscribers + # so human-in-the-loop workflows hear back without polling. + asyncio.create_task(self._kanban_notifier_watcher()) + # Start background reconnection watcher for platforms that failed at startup if self._failed_platforms: logger.info( @@ -2463,6 +2468,174 @@ class GatewayRunner: break await asyncio.sleep(1) + async def _kanban_notifier_watcher(self, interval: float = 5.0) -> None: + """Poll ``kanban_notify_subs`` and deliver terminal events to users. + + For each subscription row, fetches ``task_events`` newer than the + stored cursor with kind in the terminal set (``completed``, + ``blocked``, ``spawn_auto_blocked``, ``crashed``). Sends one message + per new event to ``(platform, chat_id, thread_id)``, then advances + the cursor. When a task reaches a terminal state (``completed`` / + ``archived``), the subscription is removed. + + Runs in the gateway event loop; all SQLite work is pushed to a + thread via ``asyncio.to_thread`` so the loop never blocks on the + WAL lock. Failures in one tick don't stop subsequent ticks. + """ + from gateway.config import Platform as _Platform + try: + from hermes_cli import kanban_db as _kb + except Exception: + logger.warning("kanban notifier: kanban_db not importable; notifier disabled") + return + + TERMINAL_KINDS = ("completed", "blocked", "spawn_auto_blocked", "crashed") + # Initial delay so the gateway can finish wiring adapters. + await asyncio.sleep(5) + + while self._running: + try: + def _collect(): + conn = _kb.connect() + try: + _kb.init_db() # idempotent; handles first-run + except Exception: + pass + try: + subs = _kb.list_notify_subs(conn) + deliveries: list[dict] = [] + for sub in subs: + cursor, events = _kb.unseen_events_for_sub( + conn, + task_id=sub["task_id"], + platform=sub["platform"], + chat_id=sub["chat_id"], + thread_id=sub.get("thread_id") or "", + kinds=TERMINAL_KINDS, + ) + if not events: + continue + task = _kb.get_task(conn, sub["task_id"]) + deliveries.append({ + "sub": sub, + "cursor": cursor, + "events": events, + "task": task, + }) + return deliveries + finally: + conn.close() + + deliveries = await asyncio.to_thread(_collect) + for d in deliveries: + sub = d["sub"] + task = d["task"] + platform_str = (sub["platform"] or "").lower() + try: + plat = _Platform(platform_str) + except ValueError: + # Unknown platform string; skip and advance cursor so + # we don't replay forever. + await asyncio.to_thread( + self._kanban_advance, sub, d["cursor"], + ) + continue + adapter = self.adapters.get(plat) + if adapter is None: + continue # platform not currently connected + title = (task.title if task else sub["task_id"])[:120] + for ev in d["events"]: + kind = ev.kind + if kind == "completed": + result_preview = "" + if task and task.result: + r = task.result.strip().splitlines()[0][:160] + result_preview = f"\n{r}" + msg = ( + f"✔ Kanban {sub['task_id']} done" + f" — {title}{result_preview}" + ) + elif kind == "blocked": + reason = "" + if ev.payload and ev.payload.get("reason"): + reason = f": {str(ev.payload['reason'])[:160]}" + msg = f"⏸ Kanban {sub['task_id']} blocked{reason}" + elif kind == "spawn_auto_blocked": + err = "" + if ev.payload and ev.payload.get("error"): + err = f"\n{str(ev.payload['error'])[:200]}" + msg = ( + f"✖ Kanban {sub['task_id']} auto-blocked " + f"after repeated spawn failures{err}" + ) + elif kind == "crashed": + msg = ( + f"✖ Kanban {sub['task_id']} worker crashed " + f"(pid gone); dispatcher will retry" + ) + else: + continue + metadata: dict[str, Any] = {} + if sub.get("thread_id"): + metadata["thread_id"] = sub["thread_id"] + try: + await adapter.send( + sub["chat_id"], msg, metadata=metadata, + ) + except Exception as exc: + logger.warning( + "kanban notifier: send failed for %s on %s: %s", + sub["task_id"], platform_str, exc, + ) + # Don't advance cursor on send failure — retry next tick. + break + else: + # All events delivered; advance cursor + maybe unsub. + await asyncio.to_thread( + self._kanban_advance, sub, d["cursor"], + ) + if task and task.status in ("done", "archived"): + await asyncio.to_thread( + self._kanban_unsub, sub, + ) + except Exception as exc: + logger.warning("kanban notifier tick failed: %s", exc) + # Sleep with cancellation checks. + for _ in range(int(max(1, interval))): + if not self._running: + return + await asyncio.sleep(1) + + def _kanban_advance(self, sub: dict, cursor: int) -> None: + """Sync helper: advance a subscription's cursor. Runs in to_thread.""" + from hermes_cli import kanban_db as _kb + conn = _kb.connect() + try: + _kb.advance_notify_cursor( + conn, + task_id=sub["task_id"], + platform=sub["platform"], + chat_id=sub["chat_id"], + thread_id=sub.get("thread_id") or "", + new_cursor=cursor, + ) + finally: + conn.close() + + def _kanban_unsub(self, sub: dict) -> None: + from hermes_cli import kanban_db as _kb + conn = _kb.connect() + try: + _kb.remove_notify_sub( + conn, + task_id=sub["task_id"], + platform=sub["platform"], + chat_id=sub["chat_id"], + thread_id=sub.get("thread_id") or "", + ) + finally: + conn.close() + async def _platform_reconnect_watcher(self) -> None: """Background task that periodically retries connecting failed platforms. @@ -5174,8 +5347,14 @@ class GatewayRunner: show, context, tail) are permitted while an agent is running; mutations are allowed too because the board is profile-agnostic and does not touch the running agent's state. + + For ``/kanban create`` invocations we also auto-subscribe the + originating gateway source (platform + chat + thread) to the new + task's terminal events, so the user hears back when the worker + completes / blocks / auto-blocks / crashes without having to poll. """ import asyncio + import re from hermes_cli.kanban import run_slash text = (event.text or "").strip() @@ -5185,11 +5364,52 @@ class GatewayRunner: if text.startswith("kanban"): text = text[len("kanban"):].lstrip() + is_create = text.split(None, 1)[:1] == ["create"] + try: output = await asyncio.to_thread(run_slash, text) except Exception as exc: # pragma: no cover - defensive return f"⚠ kanban error: {exc}" + # Auto-subscribe on create. Parse the task id from the CLI's standard + # success line ("Created t_abcd (ready, assignee=...)"). If the user + # passed --json we don't subscribe; they're clearly scripting and + # can call /kanban notify-subscribe explicitly. + if is_create and output: + m = re.search(r"Created\s+(t_[0-9a-f]+)\b", output) + if m: + task_id = m.group(1) + try: + source = event.source + platform = getattr(source, "platform", None) + platform_str = ( + platform.value if hasattr(platform, "value") else str(platform or "") + ).lower() + chat_id = str(getattr(source, "chat_id", "") or "") + thread_id = str(getattr(source, "thread_id", "") or "") + user_id = str(getattr(source, "user_id", "") or "") or None + if platform_str and chat_id: + def _sub(): + from hermes_cli import kanban_db as _kb + conn = _kb.connect() + try: + _kb.add_notify_sub( + conn, task_id=task_id, + platform=platform_str, chat_id=chat_id, + thread_id=thread_id or None, + user_id=user_id, + ) + finally: + conn.close() + await asyncio.to_thread(_sub) + output = ( + output.rstrip() + + f"\n(subscribed — you'll be notified when {task_id} " + f"completes or blocks)" + ) + except Exception as exc: + logger.warning("kanban create auto-subscribe failed: %s", exc) + # Gateway messages have practical length caps; truncate long # listings to keep the UX reasonable. if len(output) > 3800: diff --git a/hermes_cli/kanban.py b/hermes_cli/kanban.py index 85af5e89f6..8cbe001c7f 100644 --- a/hermes_cli/kanban.py +++ b/hermes_cli/kanban.py @@ -132,6 +132,9 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_create.add_argument("--priority", type=int, default=0, help="Priority tiebreaker") p_create.add_argument("--triage", action="store_true", help="Park in triage — a specifier will flesh out the spec and promote to todo") + p_create.add_argument("--idempotency-key", default=None, + help="Dedup key. If a non-archived task with this key exists, " + "its id is returned instead of creating a duplicate.") p_create.add_argument("--created-by", default="user", help="Author name recorded on the task (default: user)") p_create.add_argument("--json", action="store_true", help="Emit JSON output") @@ -182,19 +185,22 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_comment.add_argument("--author", default=None, help="Author name (default: $HERMES_PROFILE or 'user')") - p_complete = sub.add_parser("complete", help="Mark a task done") - p_complete.add_argument("task_id") + p_complete = sub.add_parser("complete", help="Mark one or more tasks done") + p_complete.add_argument("task_ids", nargs="+", + help="One or more task ids (only --result applies to all of them)") p_complete.add_argument("--result", default=None, help="Result summary") - p_block = sub.add_parser("block", help="Mark a task blocked (needs input)") + p_block = sub.add_parser("block", help="Mark one or more tasks blocked") p_block.add_argument("task_id") p_block.add_argument("reason", nargs="*", help="Reason (also appended as a comment)") + p_block.add_argument("--ids", nargs="+", default=None, + help="Additional task ids to block with the same reason (bulk mode)") - p_unblock = sub.add_parser("unblock", help="Return a blocked task to ready") - p_unblock.add_argument("task_id") + p_unblock = sub.add_parser("unblock", help="Return one or more blocked tasks to ready") + p_unblock.add_argument("task_ids", nargs="+") - p_archive = sub.add_parser("archive", help="Archive a task (hide from default list)") - p_archive.add_argument("task_id") + p_archive = sub.add_parser("archive", help="Archive one or more tasks") + p_archive.add_argument("task_ids", nargs="+") # --- tail --- p_tail = sub.add_parser("tail", help="Follow a task's event stream") @@ -210,8 +216,86 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu help="Don't actually spawn processes; just print what would happen") p_disp.add_argument("--max", type=int, default=None, help="Cap number of spawns this pass") + p_disp.add_argument("--failure-limit", type=int, + default=kb.DEFAULT_SPAWN_FAILURE_LIMIT, + help=f"Auto-block a task after this many consecutive spawn failures " + f"(default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})") p_disp.add_argument("--json", action="store_true") + # --- daemon --- + p_daemon = sub.add_parser( + "daemon", + help="Run the dispatcher continuously until SIGINT/SIGTERM", + ) + p_daemon.add_argument("--interval", type=float, default=60.0, + help="Seconds between dispatch ticks (default: 60)") + p_daemon.add_argument("--max", type=int, default=None, + help="Cap number of spawns per tick") + p_daemon.add_argument("--failure-limit", type=int, + default=kb.DEFAULT_SPAWN_FAILURE_LIMIT) + p_daemon.add_argument("--pidfile", default=None, + help="Write the daemon's PID to this file on start") + p_daemon.add_argument("--verbose", "-v", action="store_true", + help="Log each tick's outcome to stdout") + + # --- watch --- + p_watch = sub.add_parser( + "watch", + help="Live-stream task_events to the terminal (Ctrl+C to exit)", + ) + p_watch.add_argument("--assignee", default=None, + help="Only show events for tasks assigned to this profile") + p_watch.add_argument("--tenant", default=None, + help="Only show events from tasks in this tenant") + p_watch.add_argument("--kinds", default=None, + help="Comma-separated event kinds to include " + "(e.g. 'completed,blocked,spawn_auto_blocked')") + p_watch.add_argument("--interval", type=float, default=0.5, + help="Poll interval in seconds (default: 0.5)") + + # --- stats --- + p_stats = sub.add_parser( + "stats", help="Per-status + per-assignee counts + oldest-ready age", + ) + p_stats.add_argument("--json", action="store_true") + + # --- notify subscribe / list / remove --- + p_nsub = sub.add_parser( + "notify-subscribe", + help="Subscribe a gateway source to a task's terminal events " + "(used by /kanban subscribe in the gateway adapter)", + ) + p_nsub.add_argument("task_id") + p_nsub.add_argument("--platform", required=True) + p_nsub.add_argument("--chat-id", required=True) + p_nsub.add_argument("--thread-id", default=None) + p_nsub.add_argument("--user-id", default=None) + + p_nlist = sub.add_parser( + "notify-list", + help="List notification subscriptions (optionally for a single task)", + ) + p_nlist.add_argument("task_id", nargs="?", default=None) + p_nlist.add_argument("--json", action="store_true") + + p_nrm = sub.add_parser( + "notify-unsubscribe", + help="Remove a gateway subscription from a task", + ) + p_nrm.add_argument("task_id") + p_nrm.add_argument("--platform", required=True) + p_nrm.add_argument("--chat-id", required=True) + p_nrm.add_argument("--thread-id", default=None) + + # --- log --- + p_log = sub.add_parser( + "log", + help="Print the worker log for a task (from $HERMES_HOME/kanban/logs/)", + ) + p_log.add_argument("task_id") + p_log.add_argument("--tail", type=int, default=None, + help="Only print the last N bytes") + # --- context --- (for spawned workers) p_ctx = sub.add_parser( "context", @@ -221,9 +305,13 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu p_ctx.add_argument("task_id") # --- gc --- - sub.add_parser( - "gc", help="Garbage-collect workspaces of archived tasks" + p_gc = sub.add_parser( + "gc", help="Garbage-collect archived-task workspaces, old events, and old logs", ) + p_gc.add_argument("--event-retention-days", type=int, default=30, + help="Delete task_events older than N days for terminal tasks (default: 30)") + p_gc.add_argument("--log-retention-days", type=int, default=30, + help="Delete worker log files older than N days (default: 30)") kanban_parser.set_defaults(_kanban_parser=kanban_parser) return kanban_parser @@ -269,6 +357,13 @@ def kanban_command(args: argparse.Namespace) -> int: "archive": _cmd_archive, "tail": _cmd_tail, "dispatch": _cmd_dispatch, + "daemon": _cmd_daemon, + "watch": _cmd_watch, + "stats": _cmd_stats, + "log": _cmd_log, + "notify-subscribe": _cmd_notify_subscribe, + "notify-list": _cmd_notify_list, + "notify-unsubscribe": _cmd_notify_unsubscribe, "context": _cmd_context, "gc": _cmd_gc, } @@ -303,6 +398,15 @@ def _profile_author() -> str: def _cmd_init(args: argparse.Namespace) -> int: path = kb.init_db() print(f"Kanban DB initialized at {path}") + print() + print("Next step: run the dispatcher so ready tasks actually get picked up.") + print(" # Foreground (interactive, Ctrl-C to stop):") + print(" hermes kanban daemon") + print() + print(" # As a systemd user unit (persists across logins):") + print(" systemctl --user enable --now hermes-kanban-dispatcher.service") + print() + print("Without a running dispatcher, tasks stay in 'ready' forever.") return 0 @@ -321,6 +425,7 @@ def _cmd_create(args: argparse.Namespace) -> int: priority=args.priority, parents=tuple(args.parent or ()), triage=bool(getattr(args, "triage", False)), + idempotency_key=getattr(args, "idempotency_key", None), ) task = kb.get_task(conn, task_id) if getattr(args, "json", False): @@ -482,47 +587,69 @@ def _cmd_comment(args: argparse.Namespace) -> int: def _cmd_complete(args: argparse.Namespace) -> int: - with kb.connect() as conn: - ok = kb.complete_task(conn, args.task_id, result=args.result) - if not ok: - print(f"cannot complete {args.task_id} (unknown id or terminal state)", file=sys.stderr) + """Mark one or more tasks done. Supports a single id or a list.""" + ids = list(args.task_ids or []) + if not ids: + print("at least one task_id is required", file=sys.stderr) return 1 - print(f"Completed {args.task_id}") - return 0 + failed: list[str] = [] + with kb.connect() as conn: + for tid in ids: + if not kb.complete_task(conn, tid, result=args.result): + failed.append(tid) + print(f"cannot complete {tid} (unknown id or terminal state)", file=sys.stderr) + else: + print(f"Completed {tid}") + return 0 if not failed else 1 def _cmd_block(args: argparse.Namespace) -> int: reason = " ".join(args.reason).strip() if args.reason else None author = _profile_author() + ids = [args.task_id] + list(getattr(args, "ids", None) or []) + failed: list[str] = [] with kb.connect() as conn: - if reason: - kb.add_comment(conn, args.task_id, author, f"BLOCKED: {reason}") - ok = kb.block_task(conn, args.task_id, reason=reason) - if not ok: - print(f"cannot block {args.task_id}", file=sys.stderr) - return 1 - print(f"Blocked {args.task_id}" + (f": {reason}" if reason else "")) - return 0 + for tid in ids: + if reason: + kb.add_comment(conn, tid, author, f"BLOCKED: {reason}") + if not kb.block_task(conn, tid, reason=reason): + failed.append(tid) + print(f"cannot block {tid}", file=sys.stderr) + else: + print(f"Blocked {tid}" + (f": {reason}" if reason else "")) + return 0 if not failed else 1 def _cmd_unblock(args: argparse.Namespace) -> int: - with kb.connect() as conn: - ok = kb.unblock_task(conn, args.task_id) - if not ok: - print(f"cannot unblock {args.task_id} (not blocked?)", file=sys.stderr) + ids = list(args.task_ids or []) + if not ids: + print("at least one task_id is required", file=sys.stderr) return 1 - print(f"Unblocked {args.task_id}") - return 0 + failed: list[str] = [] + with kb.connect() as conn: + for tid in ids: + if not kb.unblock_task(conn, tid): + failed.append(tid) + print(f"cannot unblock {tid} (not blocked?)", file=sys.stderr) + else: + print(f"Unblocked {tid}") + return 0 if not failed else 1 def _cmd_archive(args: argparse.Namespace) -> int: - with kb.connect() as conn: - ok = kb.archive_task(conn, args.task_id) - if not ok: - print(f"cannot archive {args.task_id}", file=sys.stderr) + ids = list(args.task_ids or []) + if not ids: + print("at least one task_id is required", file=sys.stderr) return 1 - print(f"Archived {args.task_id}") - return 0 + failed: list[str] = [] + with kb.connect() as conn: + for tid in ids: + if not kb.archive_task(conn, tid): + failed.append(tid) + print(f"cannot archive {tid}", file=sys.stderr) + else: + print(f"Archived {tid}") + return 0 if not failed else 1 def _cmd_tail(args: argparse.Namespace) -> int: @@ -549,10 +676,13 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: conn, dry_run=args.dry_run, max_spawn=args.max, + failure_limit=getattr(args, "failure_limit", kb.DEFAULT_SPAWN_FAILURE_LIMIT), ) if getattr(args, "json", False): print(json.dumps({ "reclaimed": res.reclaimed, + "crashed": res.crashed, + "auto_blocked": res.auto_blocked, "promoted": res.promoted, "spawned": [ {"task_id": tid, "assignee": who, "workspace": ws} @@ -561,9 +691,15 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: "skipped_unassigned": res.skipped_unassigned, }, indent=2)) return 0 - print(f"Reclaimed: {res.reclaimed}") - print(f"Promoted: {res.promoted}") - print(f"Spawned: {len(res.spawned)}") + print(f"Reclaimed: {res.reclaimed}") + print(f"Crashed: {len(res.crashed)}") + if res.crashed: + print(f" {', '.join(res.crashed)}") + print(f"Auto-blocked: {len(res.auto_blocked)}") + if res.auto_blocked: + print(f" {', '.join(res.auto_blocked)}") + print(f"Promoted: {res.promoted}") + print(f"Spawned: {len(res.spawned)}") for tid, who, ws in res.spawned: tag = " (dry)" if args.dry_run else "" print(f" - {tid} -> {who} @ {ws or '-'}{tag}") @@ -572,6 +708,184 @@ def _cmd_dispatch(args: argparse.Namespace) -> int: return 0 +def _cmd_daemon(args: argparse.Namespace) -> int: + """Run the dispatcher continuously. Foreground-safe, signal-clean.""" + # Make sure the DB exists before printing "started" so the user sees the + # correct DB path and any init error surfaces immediately. + kb.init_db() + + pidfile = getattr(args, "pidfile", None) + if pidfile: + try: + Path(pidfile).parent.mkdir(parents=True, exist_ok=True) + Path(pidfile).write_text(str(os.getpid()), encoding="utf-8") + except OSError as exc: + print(f"warning: could not write pidfile {pidfile}: {exc}", file=sys.stderr) + + verbose = bool(getattr(args, "verbose", False)) + print(f"Kanban dispatcher running (interval={args.interval}s, pid={os.getpid()}). " + f"Ctrl-C to stop.") + + def _on_tick(res): + if not verbose: + return + did_work = ( + res.reclaimed or res.crashed or res.promoted + or res.spawned or res.auto_blocked + ) + if did_work: + print( + f"[{_fmt_ts(int(time.time()))}] " + f"reclaimed={res.reclaimed} crashed={len(res.crashed)} " + f"promoted={res.promoted} spawned={len(res.spawned)} " + f"auto_blocked={len(res.auto_blocked)}", + flush=True, + ) + + try: + kb.run_daemon( + interval=args.interval, + max_spawn=args.max, + failure_limit=getattr(args, "failure_limit", kb.DEFAULT_SPAWN_FAILURE_LIMIT), + on_tick=_on_tick, + ) + finally: + if pidfile: + try: + Path(pidfile).unlink() + except OSError: + pass + print("(dispatcher stopped)") + return 0 + + +def _cmd_watch(args: argparse.Namespace) -> int: + """Live-stream task_events to the terminal.""" + kinds = ( + {k.strip() for k in args.kinds.split(",") if k.strip()} + if args.kinds else None + ) + cursor = 0 + print("Watching kanban events. Ctrl-C to stop.", flush=True) + # Seed cursor at the latest id so we don't replay history. + with kb.connect() as conn: + row = conn.execute( + "SELECT COALESCE(MAX(id), 0) AS m FROM task_events" + ).fetchone() + cursor = int(row["m"]) + + try: + while True: + with kb.connect() as conn: + rows = conn.execute( + "SELECT e.id, e.task_id, e.kind, e.payload, e.created_at, " + " t.assignee, t.tenant " + "FROM task_events e LEFT JOIN tasks t ON t.id = e.task_id " + "WHERE e.id > ? ORDER BY e.id ASC LIMIT 200", + (cursor,), + ).fetchall() + for r in rows: + cursor = max(cursor, int(r["id"])) + if kinds and r["kind"] not in kinds: + continue + if args.assignee and r["assignee"] != args.assignee: + continue + if args.tenant and r["tenant"] != args.tenant: + continue + try: + payload = json.loads(r["payload"]) if r["payload"] else None + except Exception: + payload = None + pl = f" {payload}" if payload else "" + print( + f"[{_fmt_ts(r['created_at'])}] {r['task_id']:10s} " + f"{r['kind']:18s} (@{r['assignee'] or '-'}){pl}", + flush=True, + ) + time.sleep(max(0.1, args.interval)) + except KeyboardInterrupt: + print("\n(stopped)") + return 0 + + +def _cmd_stats(args: argparse.Namespace) -> int: + with kb.connect() as conn: + stats = kb.board_stats(conn) + if getattr(args, "json", False): + print(json.dumps(stats, indent=2, ensure_ascii=False)) + return 0 + print("By status:") + for k in ("triage", "todo", "ready", "running", "blocked", "done"): + print(f" {k:8s} {stats['by_status'].get(k, 0)}") + if stats["by_assignee"]: + print("\nBy assignee:") + for who, counts in sorted(stats["by_assignee"].items()): + parts = ", ".join(f"{k}={v}" for k, v in sorted(counts.items())) + print(f" {who:20s} {parts}") + age = stats["oldest_ready_age_seconds"] + if age is not None: + print(f"\nOldest ready task age: {int(age)}s") + return 0 + + +def _cmd_notify_subscribe(args: argparse.Namespace) -> int: + with kb.connect() as conn: + if kb.get_task(conn, args.task_id) is None: + print(f"no such task: {args.task_id}", file=sys.stderr) + return 1 + kb.add_notify_sub( + conn, task_id=args.task_id, + platform=args.platform, chat_id=args.chat_id, + thread_id=args.thread_id, user_id=args.user_id, + ) + print(f"Subscribed {args.platform}:{args.chat_id}" + + (f":{args.thread_id}" if args.thread_id else "") + + f" to {args.task_id}") + return 0 + + +def _cmd_notify_list(args: argparse.Namespace) -> int: + with kb.connect() as conn: + subs = kb.list_notify_subs(conn, args.task_id) + if getattr(args, "json", False): + print(json.dumps(subs, indent=2, ensure_ascii=False)) + return 0 + if not subs: + print("(no subscriptions)") + return 0 + for s in subs: + thr = f":{s['thread_id']}" if s.get("thread_id") else "" + print(f" {s['task_id']:10s} {s['platform']}:{s['chat_id']}{thr}" + f" (since event {s['last_event_id']})") + return 0 + + +def _cmd_notify_unsubscribe(args: argparse.Namespace) -> int: + with kb.connect() as conn: + ok = kb.remove_notify_sub( + conn, task_id=args.task_id, + platform=args.platform, chat_id=args.chat_id, + thread_id=args.thread_id, + ) + if not ok: + print("(no such subscription)", file=sys.stderr) + return 1 + print(f"Unsubscribed from {args.task_id}") + return 0 + + +def _cmd_log(args: argparse.Namespace) -> int: + content = kb.read_worker_log(args.task_id, tail_bytes=args.tail) + if content is None: + print(f"(no log for {args.task_id} — task may not have spawned yet)", + file=sys.stderr) + return 1 + sys.stdout.write(content) + if not content.endswith("\n"): + sys.stdout.write("\n") + return 0 + + def _cmd_context(args: argparse.Namespace) -> int: with kb.connect() as conn: text = kb.build_worker_context(conn, args.task_id) @@ -580,14 +894,11 @@ def _cmd_context(args: argparse.Namespace) -> int: def _cmd_gc(args: argparse.Namespace) -> int: - """Remove scratch workspaces of archived tasks. - - Only touches directories under the default scratch root; leaves user - ``dir:`` workspaces and ``worktree`` dirs alone (user owns those). - """ + """Remove scratch workspaces of archived tasks, prune old events, and + delete old worker logs.""" import shutil scratch_root = kb.workspaces_root() - removed = 0 + removed_ws = 0 with kb.connect() as conn: rows = conn.execute( "SELECT id, workspace_kind, workspace_path FROM tasks WHERE status = 'archived'" @@ -601,15 +912,25 @@ def _cmd_gc(args: argparse.Namespace) -> int: except OSError: continue try: - scratch_root.resolve().relative_to(scratch_root.resolve()) path.relative_to(scratch_root.resolve()) except ValueError: # Safety: never delete outside the scratch root. continue if path.exists() and path.is_dir(): shutil.rmtree(path, ignore_errors=True) - removed += 1 - print(f"GC complete: removed {removed} scratch workspace(s)") + removed_ws += 1 + + event_days = getattr(args, "event_retention_days", 30) + log_days = getattr(args, "log_retention_days", 30) + with kb.connect() as conn: + removed_events = kb.gc_events( + conn, older_than_seconds=event_days * 24 * 3600, + ) + removed_logs = kb.gc_worker_logs( + older_than_seconds=log_days * 24 * 3600, + ) + print(f"GC complete: {removed_ws} workspace(s), " + f"{removed_events} event row(s), {removed_logs} log file(s) removed") return 0 diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 4c3a8bddfa..c9f5126235 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -84,9 +84,14 @@ class Task: claim_expires: Optional[int] tenant: Optional[str] result: Optional[str] = None + idempotency_key: Optional[str] = None + spawn_failures: int = 0 + worker_pid: Optional[int] = None + last_spawn_error: Optional[str] = None @classmethod def from_row(cls, row: sqlite3.Row) -> "Task": + keys = set(row.keys()) return cls( id=row["id"], title=row["title"], @@ -102,8 +107,12 @@ class Task: workspace_path=row["workspace_path"], claim_lock=row["claim_lock"], claim_expires=row["claim_expires"], - tenant=row["tenant"] if "tenant" in row.keys() else None, - result=row["result"] if "result" in row.keys() else None, + tenant=row["tenant"] if "tenant" in keys else None, + result=row["result"] if "result" in keys else None, + idempotency_key=row["idempotency_key"] if "idempotency_key" in keys else None, + spawn_failures=row["spawn_failures"] if "spawn_failures" in keys else 0, + worker_pid=row["worker_pid"] if "worker_pid" in keys else None, + last_spawn_error=row["last_spawn_error"] if "last_spawn_error" in keys else None, ) @@ -131,22 +140,26 @@ class Event: SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS tasks ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - body TEXT, - assignee TEXT, - status TEXT NOT NULL, - priority INTEGER DEFAULT 0, - created_by TEXT, - created_at INTEGER NOT NULL, - started_at INTEGER, - completed_at INTEGER, - workspace_kind TEXT NOT NULL DEFAULT 'scratch', - workspace_path TEXT, - claim_lock TEXT, - claim_expires INTEGER, - tenant TEXT, - result TEXT + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + body TEXT, + assignee TEXT, + status TEXT NOT NULL, + priority INTEGER DEFAULT 0, + created_by TEXT, + created_at INTEGER NOT NULL, + started_at INTEGER, + completed_at INTEGER, + workspace_kind TEXT NOT NULL DEFAULT 'scratch', + workspace_path TEXT, + claim_lock TEXT, + claim_expires INTEGER, + tenant TEXT, + result TEXT, + idempotency_key TEXT, + spawn_failures INTEGER NOT NULL DEFAULT 0, + worker_pid INTEGER, + last_spawn_error TEXT ); CREATE TABLE IF NOT EXISTS task_links ( @@ -171,13 +184,30 @@ CREATE TABLE IF NOT EXISTS task_events ( created_at INTEGER NOT NULL ); +-- Subscription from a gateway source (platform + chat + thread) to a +-- task. The gateway's kanban-notifier watcher tails task_events and +-- pushes ``completed`` / ``blocked`` / ``spawn_auto_blocked`` events to +-- the original requester so human-in-the-loop workflows close the loop. +CREATE TABLE IF NOT EXISTS kanban_notify_subs ( + task_id TEXT NOT NULL, + platform TEXT NOT NULL, + chat_id TEXT NOT NULL, + thread_id TEXT NOT NULL DEFAULT '', + user_id TEXT, + created_at INTEGER NOT NULL, + last_event_id INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (task_id, platform, chat_id, thread_id) +); + CREATE INDEX IF NOT EXISTS idx_tasks_assignee_status ON tasks(assignee, status); CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status); CREATE INDEX IF NOT EXISTS idx_tasks_tenant ON tasks(tenant); +CREATE INDEX IF NOT EXISTS idx_tasks_idempotency ON tasks(idempotency_key); CREATE INDEX IF NOT EXISTS idx_links_child ON task_links(child_id); CREATE INDEX IF NOT EXISTS idx_links_parent ON task_links(parent_id); CREATE INDEX IF NOT EXISTS idx_comments_task ON task_comments(task_id, created_at); CREATE INDEX IF NOT EXISTS idx_events_task ON task_events(task_id, created_at); +CREATE INDEX IF NOT EXISTS idx_notify_task ON kanban_notify_subs(task_id); """ @@ -220,6 +250,20 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: conn.execute("ALTER TABLE tasks ADD COLUMN tenant TEXT") if "result" not in cols: conn.execute("ALTER TABLE tasks ADD COLUMN result TEXT") + if "idempotency_key" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN idempotency_key TEXT") + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_tasks_idempotency " + "ON tasks(idempotency_key)" + ) + if "spawn_failures" not in cols: + conn.execute( + "ALTER TABLE tasks ADD COLUMN spawn_failures INTEGER NOT NULL DEFAULT 0" + ) + if "worker_pid" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER") + if "last_spawn_error" not in cols: + conn.execute("ALTER TABLE tasks ADD COLUMN last_spawn_error TEXT") @contextlib.contextmanager @@ -280,6 +324,7 @@ def create_task( priority: int = 0, parents: Iterable[str] = (), triage: bool = False, + idempotency_key: Optional[str] = None, ) -> str: """Create a new task and optionally link it under parent tasks. @@ -288,6 +333,11 @@ def create_task( If ``triage=True``, status is forced to ``triage`` regardless of parents — a specifier/triager is expected to promote the task to ``todo`` once the spec is fleshed out. + + If ``idempotency_key`` is provided and a non-archived task with the + same key already exists, returns the existing task's id instead of + creating a duplicate. Useful for retried webhooks / automation that + should not double-write. """ if not title or not title.strip(): raise ValueError("title is required") @@ -298,6 +348,21 @@ def create_task( ) parents = tuple(p for p in parents if p) + # Idempotency check — return the existing task instead of creating a + # duplicate. Done BEFORE entering write_txn to keep the fast path fast + # and to avoid holding a write lock during the lookup. Race is + # acceptable: two concurrent creators with the same key might both + # insert, at which point both rows exist but the next lookup stabilises. + if idempotency_key: + row = conn.execute( + "SELECT id FROM tasks WHERE idempotency_key = ? " + "AND status != 'archived' " + "ORDER BY created_at DESC LIMIT 1", + (idempotency_key,), + ).fetchone() + if row: + return row["id"] + now = int(time.time()) # Retry once on the extremely unlikely id collision. @@ -335,8 +400,8 @@ def create_task( INSERT INTO tasks ( id, title, body, assignee, status, priority, created_by, created_at, workspace_kind, workspace_path, - tenant - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + tenant, idempotency_key + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( task_id, @@ -350,6 +415,7 @@ def create_task( workspace_kind, workspace_path, tenant, + idempotency_key, ), ) for pid in parents: @@ -743,7 +809,7 @@ def release_stale_claims(conn: sqlite3.Connection) -> int: for row in stale: conn.execute( "UPDATE tasks SET status = 'ready', claim_lock = NULL, " - "claim_expires = NULL " + "claim_expires = NULL, worker_pid = NULL " "WHERE id = ? AND status = 'running'", (row["id"],), ) @@ -776,7 +842,8 @@ def complete_task( result = ?, completed_at = ?, claim_lock = NULL, - claim_expires= NULL + claim_expires= NULL, + worker_pid = NULL WHERE id = ? AND status IN ('running', 'ready', 'blocked') """, @@ -806,7 +873,8 @@ def block_task( UPDATE tasks SET status = 'blocked', claim_lock = NULL, - claim_expires= NULL + claim_expires= NULL, + worker_pid = NULL WHERE id = ? AND status IN ('running', 'ready') """, @@ -897,6 +965,17 @@ def set_workspace_path( # Dispatcher (one-shot pass) # --------------------------------------------------------------------------- +# After this many consecutive `spawn_failed` events on a task, the dispatcher +# stops retrying and parks the task in ``blocked`` with a reason so a human +# can investigate. Prevents the dispatcher from thrashing forever on a task +# whose profile doesn't exist, whose workspace is unmountable, etc. +DEFAULT_SPAWN_FAILURE_LIMIT = 5 + +# Max bytes to keep in a single worker log file. The dispatcher truncates +# and rotates on spawn if the file is larger than this at spawn time. +DEFAULT_LOG_ROTATE_BYTES = 2 * 1024 * 1024 # 2 MiB + + @dataclass class DispatchResult: """Outcome of a single ``dispatch`` pass.""" @@ -906,6 +985,137 @@ class DispatchResult: spawned: list[tuple[str, str, str]] = field(default_factory=list) """List of ``(task_id, assignee, workspace_path)`` triples.""" skipped_unassigned: list[str] = field(default_factory=list) + crashed: list[str] = field(default_factory=list) + """Task ids reclaimed because their worker PID disappeared.""" + auto_blocked: list[str] = field(default_factory=list) + """Task ids auto-blocked by the spawn-failure circuit breaker.""" + + +def _pid_alive(pid: Optional[int]) -> bool: + """Return True if ``pid`` is still running on this host. + + Cross-platform: uses ``os.kill(pid, 0)`` on POSIX and ``OpenProcess`` + on Windows. Returns False for falsy PIDs or on any OS error. + """ + if not pid or pid <= 0: + return False + try: + if hasattr(os, "kill"): + os.kill(int(pid), 0) + return True + except ProcessLookupError: + return False + except PermissionError: + # Process exists, we just can't signal it. + return True + except OSError: + return False + return True + + +def detect_crashed_workers(conn: sqlite3.Connection) -> list[str]: + """Reclaim ``running`` tasks whose worker PID is no longer alive. + + Appends a ``crashed`` event and drops the task back to ``ready``. + Different from ``release_stale_claims``: this checks liveness + immediately rather than waiting for the claim TTL. + + Only considers tasks claimed by *this host* — PIDs from other hosts + are meaningless here. The host-local check is enough because + ``_default_spawn`` always runs the worker on the same host as the + dispatcher (the whole design is single-host). + """ + crashed: list[str] = [] + with write_txn(conn): + rows = conn.execute( + "SELECT id, worker_pid, claim_lock FROM tasks " + "WHERE status = 'running' AND worker_pid IS NOT NULL" + ).fetchall() + host_prefix = f"{_claimer_id().split(':', 1)[0]}:" + for row in rows: + # Only check liveness for claims owned by this host. + lock = row["claim_lock"] or "" + if not lock.startswith(host_prefix): + continue + if _pid_alive(row["worker_pid"]): + continue + cur = conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL " + "WHERE id = ? AND status = 'running'", + (row["id"],), + ) + if cur.rowcount == 1: + _append_event( + conn, row["id"], "crashed", + {"pid": int(row["worker_pid"]), "claimer": row["claim_lock"]}, + ) + crashed.append(row["id"]) + return crashed + + +def _record_spawn_failure( + conn: sqlite3.Connection, + task_id: str, + error: str, + *, + failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, +) -> bool: + """Release the claim, increment the failure counter, maybe auto-block. + + Returns True when the task was auto-blocked (N failures exceeded), + False when it was just released back to ``ready`` for another try. + """ + blocked = False + with write_txn(conn): + row = conn.execute( + "SELECT spawn_failures FROM tasks WHERE id = ?", (task_id,), + ).fetchone() + failures = int(row["spawn_failures"]) + 1 if row else 1 + if failures >= failure_limit: + conn.execute( + "UPDATE tasks SET status = 'blocked', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "spawn_failures = ?, last_spawn_error = ? " + "WHERE id = ? AND status IN ('running', 'ready')", + (failures, error[:500], task_id), + ) + _append_event( + conn, task_id, "spawn_auto_blocked", + {"failures": failures, "error": error[:500]}, + ) + blocked = True + else: + conn.execute( + "UPDATE tasks SET status = 'ready', claim_lock = NULL, " + "claim_expires = NULL, worker_pid = NULL, " + "spawn_failures = ?, last_spawn_error = ? " + "WHERE id = ? AND status = 'running'", + (failures, error[:500], task_id), + ) + _append_event( + conn, task_id, "spawn_failed", + {"error": error[:500], "failures": failures}, + ) + return blocked + + +def _set_worker_pid(conn: sqlite3.Connection, task_id: str, pid: int) -> None: + with write_txn(conn): + conn.execute( + "UPDATE tasks SET worker_pid = ? WHERE id = ?", + (int(pid), task_id), + ) + + +def _clear_spawn_failures(conn: sqlite3.Connection, task_id: str) -> None: + """Reset the failure counter after a successful spawn.""" + with write_txn(conn): + conn.execute( + "UPDATE tasks SET spawn_failures = 0, last_spawn_error = NULL " + "WHERE id = ?", + (task_id,), + ) def dispatch_once( @@ -915,21 +1125,28 @@ def dispatch_once( ttl_seconds: int = DEFAULT_CLAIM_TTL_SECONDS, dry_run: bool = False, max_spawn: Optional[int] = None, + failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, ) -> DispatchResult: """Run one dispatcher tick. Steps: - 1. Reclaim stale running tasks. - 2. Promote todo -> ready where all parents are done. - 3. For each ready task with an assignee, atomically claim and call - ``spawn_fn(task, workspace_path)``. + 1. Reclaim stale running tasks (TTL expired). + 2. Reclaim crashed running tasks (host-local PID no longer alive). + 3. Promote todo -> ready where all parents are done. + 4. For each ready task with an assignee, atomically claim and call + ``spawn_fn(task, workspace_path) -> Optional[int]``. The return + value (if any) is recorded as ``worker_pid`` so subsequent ticks + can detect crashes before the TTL expires. - ``spawn_fn`` defaults to ``_default_spawn`` which invokes - ``hermes -p chat -q "..."`` in the background. Tests pass - a stub. + Spawn failures are counted per-task. After ``failure_limit`` consecutive + failures the task is auto-blocked with the last error as its reason — + prevents the dispatcher from thrashing forever on an unfixable task. + + ``spawn_fn`` defaults to ``_default_spawn``. Tests pass a stub. """ result = DispatchResult() result.reclaimed = release_stale_claims(conn) + result.crashed = detect_crashed_workers(conn) result.promoted = recompute_ready(conn) ready_rows = conn.execute( @@ -950,35 +1167,66 @@ def dispatch_once( claimed = claim_task(conn, row["id"], ttl_seconds=ttl_seconds) if claimed is None: continue - workspace = resolve_workspace(claimed) + try: + workspace = resolve_workspace(claimed) + except Exception as exc: + auto = _record_spawn_failure( + conn, claimed.id, f"workspace: {exc}", + failure_limit=failure_limit, + ) + if auto: + result.auto_blocked.append(claimed.id) + continue # Persist the resolved workspace path so the worker can cd there. set_workspace_path(conn, claimed.id, str(workspace)) - if spawn_fn is None: - spawn_fn = _default_spawn + _spawn = spawn_fn if spawn_fn is not None else _default_spawn try: - spawn_fn(claimed, str(workspace)) + pid = _spawn(claimed, str(workspace)) + if pid: + _set_worker_pid(conn, claimed.id, int(pid)) + _clear_spawn_failures(conn, claimed.id) result.spawned.append((claimed.id, claimed.assignee or "", str(workspace))) spawned += 1 except Exception as exc: - # Spawn failed: release the claim so the next tick can retry. - with write_txn(conn): - conn.execute( - "UPDATE tasks SET status = 'ready', claim_lock = NULL, " - "claim_expires = NULL WHERE id = ? AND status = 'running'", - (claimed.id,), - ) - _append_event( - conn, claimed.id, "spawn_failed", - {"error": str(exc)[:500]}, - ) + auto = _record_spawn_failure( + conn, claimed.id, str(exc), + failure_limit=failure_limit, + ) + if auto: + result.auto_blocked.append(claimed.id) return result -def _default_spawn(task: Task, workspace: str) -> None: +def _rotate_worker_log(log_path: Path, max_bytes: int) -> None: + """Rotate ```` to ``.1`` if it exceeds ``max_bytes``. + + Single-generation rotation — one old file kept, newer one replaces it. + Keeps disk usage bounded while still giving the user a chance to grab + the prior run's output. + """ + try: + if not log_path.exists(): + return + if log_path.stat().st_size <= max_bytes: + return + rotated = log_path.with_suffix(log_path.suffix + ".1") + try: + if rotated.exists(): + rotated.unlink() + except OSError: + pass + log_path.rename(rotated) + except OSError: + pass + + +def _default_spawn(task: Task, workspace: str) -> Optional[int]: """Fire-and-forget ``hermes -p chat -q ...`` subprocess. - We don't wait for the child; its completion is observed by polling - the board ``complete``/``block`` transitions that the worker writes. + Returns the spawned child's PID so the dispatcher can detect crashes + before the claim TTL expires. The child's completion is still observed + via the ``complete`` / ``block`` transitions the worker writes itself; + the PID check is a safety net for crashes, OOM kills, and Ctrl+C. """ import subprocess if not task.assignee: @@ -997,17 +1245,17 @@ def _default_spawn(task: Task, workspace: str) -> None: "chat", "-q", prompt, ] - # Use Popen with DEVNULL stdin so the child doesn't inherit our tty. # Redirect output to a per-task log under HERMES_HOME/kanban/logs/. from hermes_constants import get_hermes_home log_dir = get_hermes_home() / "kanban" / "logs" log_dir.mkdir(parents=True, exist_ok=True) log_path = log_dir / f"{task.id}.log" + _rotate_worker_log(log_path, DEFAULT_LOG_ROTATE_BYTES) # Use 'a' so a re-run on unblock appends rather than overwrites. log_f = open(log_path, "ab") try: - subprocess.Popen( # noqa: S603 -- argv is a fixed list built above + proc = subprocess.Popen( # noqa: S603 -- argv is a fixed list built above cmd, cwd=workspace if os.path.isdir(workspace) else None, stdin=subprocess.DEVNULL, @@ -1027,6 +1275,66 @@ def _default_spawn(task: Task, workspace: str) -> None: # handle is kept alive by the child's inheritance. The parent's # reference goes out of scope and is GC'd, but the OS-level FD stays # open in the child until the child exits. + return proc.pid + + +# --------------------------------------------------------------------------- +# Long-lived dispatcher daemon +# --------------------------------------------------------------------------- + +def run_daemon( + *, + interval: float = 60.0, + max_spawn: Optional[int] = None, + failure_limit: int = DEFAULT_SPAWN_FAILURE_LIMIT, + stop_event=None, + on_tick=None, +) -> None: + """Run the dispatcher in a loop until interrupted. + + Calls :func:`dispatch_once` every ``interval`` seconds. Exits cleanly + on SIGINT / SIGTERM so ``hermes kanban daemon`` is systemd-friendly. + ``stop_event`` (a :class:`threading.Event`) and ``on_tick`` (a + callable receiving the :class:`DispatchResult`) are test hooks. + """ + import signal + import threading + + if stop_event is None: + stop_event = threading.Event() + + def _handle(_signum, _frame): + stop_event.set() + + # Install handlers only when running on the main thread — tests call + # this inline from worker threads and signal() would raise there. + if threading.current_thread() is threading.main_thread(): + for sig_name in ("SIGINT", "SIGTERM"): + sig = getattr(signal, sig_name, None) + if sig is not None: + try: + signal.signal(sig, _handle) + except (ValueError, OSError): + pass + + while not stop_event.is_set(): + try: + with contextlib.closing(connect()) as conn: + res = dispatch_once( + conn, + max_spawn=max_spawn, + failure_limit=failure_limit, + ) + if on_tick is not None: + try: + on_tick(res) + except Exception: + pass + except Exception: + # Don't let any single tick kill the daemon. + import traceback + traceback.print_exc() + stop_event.wait(timeout=interval) # --------------------------------------------------------------------------- @@ -1079,3 +1387,266 @@ def build_worker_context(conn: sqlite3.Connection, task_id: str) -> str: lines.append("") return "\n".join(lines).rstrip() + "\n" + + +# --------------------------------------------------------------------------- +# Stats + SLA helpers +# --------------------------------------------------------------------------- + +def board_stats(conn: sqlite3.Connection) -> dict: + """Per-status + per-assignee counts, plus the oldest ``ready`` age in + seconds (the clearest staleness signal for a router or HUD). + """ + by_status: dict[str, int] = {} + for row in conn.execute( + "SELECT status, COUNT(*) AS n FROM tasks " + "WHERE status != 'archived' GROUP BY status" + ): + by_status[row["status"]] = int(row["n"]) + + by_assignee: dict[str, dict[str, int]] = {} + for row in conn.execute( + "SELECT assignee, status, COUNT(*) AS n FROM tasks " + "WHERE status != 'archived' AND assignee IS NOT NULL " + "GROUP BY assignee, status" + ): + by_assignee.setdefault(row["assignee"], {})[row["status"]] = int(row["n"]) + + oldest_row = conn.execute( + "SELECT MIN(created_at) AS ts FROM tasks WHERE status = 'ready'" + ).fetchone() + now = int(time.time()) + oldest_ready_age = ( + (now - int(oldest_row["ts"])) + if oldest_row and oldest_row["ts"] is not None else None + ) + + return { + "by_status": by_status, + "by_assignee": by_assignee, + "oldest_ready_age_seconds": oldest_ready_age, + "now": now, + } + + +def task_age(task: Task) -> dict: + """Return age metrics for a single task. All values are seconds or None.""" + now = int(time.time()) + age_since_created = now - int(task.created_at) if task.created_at else None + age_since_started = ( + now - int(task.started_at) if task.started_at else None + ) + time_to_complete = ( + int(task.completed_at) - int(task.started_at or task.created_at) + if task.completed_at else None + ) + return { + "created_age_seconds": age_since_created, + "started_age_seconds": age_since_started, + "time_to_complete_seconds": time_to_complete, + } + + +# --------------------------------------------------------------------------- +# Notification subscriptions (used by the gateway kanban-notifier) +# --------------------------------------------------------------------------- + +def add_notify_sub( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, + user_id: Optional[str] = None, +) -> None: + """Register a gateway source that wants terminal-state notifications + for ``task_id``. Idempotent on (task, platform, chat, thread).""" + now = int(time.time()) + with write_txn(conn): + conn.execute( + """ + INSERT OR IGNORE INTO kanban_notify_subs + (task_id, platform, chat_id, thread_id, user_id, created_at) + VALUES (?, ?, ?, ?, ?, ?) + """, + (task_id, platform, chat_id, thread_id or "", user_id, now), + ) + + +def list_notify_subs( + conn: sqlite3.Connection, task_id: Optional[str] = None, +) -> list[dict]: + if task_id is not None: + rows = conn.execute( + "SELECT * FROM kanban_notify_subs WHERE task_id = ?", (task_id,), + ).fetchall() + else: + rows = conn.execute("SELECT * FROM kanban_notify_subs").fetchall() + return [dict(r) for r in rows] + + +def remove_notify_sub( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, +) -> bool: + with write_txn(conn): + cur = conn.execute( + "DELETE FROM kanban_notify_subs WHERE task_id = ? " + "AND platform = ? AND chat_id = ? AND thread_id = ?", + (task_id, platform, chat_id, thread_id or ""), + ) + return cur.rowcount > 0 + + +def unseen_events_for_sub( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, + kinds: Optional[Iterable[str]] = None, +) -> tuple[int, list[Event]]: + """Return ``(new_cursor, events)`` for a given subscription. + + Only events with ``id > last_event_id`` are returned. The subscription's + cursor is NOT advanced here; call :func:`advance_notify_cursor` after + the gateway has successfully delivered the notifications. + """ + row = conn.execute( + "SELECT last_event_id FROM kanban_notify_subs " + "WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?", + (task_id, platform, chat_id, thread_id or ""), + ).fetchone() + if row is None: + return 0, [] + cursor = int(row["last_event_id"]) + kind_list = list(kinds) if kinds else None + q = ( + "SELECT * FROM task_events WHERE task_id = ? AND id > ? " + + ("AND kind IN (" + ",".join("?" * len(kind_list)) + ") " if kind_list else "") + + "ORDER BY id ASC" + ) + params: list[Any] = [task_id, cursor] + if kind_list: + params.extend(kind_list) + rows = conn.execute(q, params).fetchall() + out: list[Event] = [] + max_id = cursor + for r in rows: + try: + payload = json.loads(r["payload"]) if r["payload"] else None + except Exception: + payload = None + out.append(Event( + id=r["id"], task_id=r["task_id"], kind=r["kind"], + payload=payload, created_at=r["created_at"], + )) + max_id = max(max_id, int(r["id"])) + return max_id, out + + +def advance_notify_cursor( + conn: sqlite3.Connection, + *, + task_id: str, + platform: str, + chat_id: str, + thread_id: Optional[str] = None, + new_cursor: int, +) -> None: + with write_txn(conn): + conn.execute( + "UPDATE kanban_notify_subs SET last_event_id = ? " + "WHERE task_id = ? AND platform = ? AND chat_id = ? AND thread_id = ?", + (int(new_cursor), task_id, platform, chat_id, thread_id or ""), + ) + + +# --------------------------------------------------------------------------- +# Retention + garbage collection +# --------------------------------------------------------------------------- + +def gc_events( + conn: sqlite3.Connection, *, older_than_seconds: int = 30 * 24 * 3600, +) -> int: + """Delete task_events rows older than ``older_than_seconds`` for tasks + in a terminal state (``done`` or ``archived``). Returns the number of + rows deleted. Running / ready / blocked tasks keep their full event + history.""" + cutoff = int(time.time()) - int(older_than_seconds) + with write_txn(conn): + cur = conn.execute( + "DELETE FROM task_events WHERE created_at < ? AND task_id IN " + "(SELECT id FROM tasks WHERE status IN ('done', 'archived'))", + (cutoff,), + ) + return int(cur.rowcount or 0) + + +def gc_worker_logs( + *, older_than_seconds: int = 30 * 24 * 3600, +) -> int: + """Delete worker log files older than ``older_than_seconds``. Returns + the number of files removed. Kept separate from ``gc_events`` because + log files live on disk, not in SQLite.""" + from hermes_constants import get_hermes_home + log_dir = get_hermes_home() / "kanban" / "logs" + if not log_dir.exists(): + return 0 + cutoff = time.time() - older_than_seconds + removed = 0 + for p in log_dir.iterdir(): + try: + if p.is_file() and p.stat().st_mtime < cutoff: + p.unlink() + removed += 1 + except OSError: + continue + return removed + + +# --------------------------------------------------------------------------- +# Worker log accessor +# --------------------------------------------------------------------------- + +def worker_log_path(task_id: str) -> Path: + """Return the path to a worker's log file. The file may not exist + (task never spawned, or log already GC'd).""" + from hermes_constants import get_hermes_home + return get_hermes_home() / "kanban" / "logs" / f"{task_id}.log" + + +def read_worker_log( + task_id: str, *, tail_bytes: Optional[int] = None, +) -> Optional[str]: + """Read the worker log for ``task_id``. Returns None if the file + doesn't exist. If ``tail_bytes`` is set, only the last N bytes are + returned (useful for the dashboard drawer which shouldn't page megabytes).""" + path = worker_log_path(task_id) + if not path.exists(): + return None + try: + if tail_bytes is None: + return path.read_text(encoding="utf-8", errors="replace") + size = path.stat().st_size + with open(path, "rb") as f: + if size > tail_bytes: + f.seek(size - tail_bytes) + # Skip a partial line if we tailed mid-line. But if the + # window has no newline at all (one giant log line), + # readline() would eat everything — in that case don't + # skip and return the raw tail. + probe = f.tell() + partial = f.readline() + if not partial.endswith(b"\n") and f.tell() >= size: + f.seek(probe) + data = f.read() + return data.decode("utf-8", errors="replace") + except OSError: + return None diff --git a/plugins/kanban/dashboard/dist/index.js b/plugins/kanban/dashboard/dist/index.js index 75ab34653d..abcea9afd3 100644 --- a/plugins/kanban/dashboard/dist/index.js +++ b/plugins/kanban/dashboard/dist/index.js @@ -777,6 +777,27 @@ // Card // ------------------------------------------------------------------------- + // Staleness tiers — amber after a grace window, red when clearly stuck. + // Values below are seconds. + const STALENESS = { + ready: { amber: 1 * 60 * 60, red: 24 * 60 * 60 }, + running: { amber: 10 * 60, red: 60 * 60 }, + blocked: { amber: 1 * 60 * 60, red: 24 * 60 * 60 }, + todo: { amber: 7 * 24 * 60 * 60, red: 30 * 24 * 60 * 60 }, + }; + + function stalenessClass(task) { + if (!task || !task.age) return ""; + const age = task.status === "running" + ? task.age.started_age_seconds + : task.age.created_age_seconds; + const tier = STALENESS[task.status]; + if (!tier || age == null) return ""; + if (age >= tier.red) return "hermes-kanban-card--stale-red"; + if (age >= tier.amber) return "hermes-kanban-card--stale-amber"; + return ""; + } + function TaskCard(props) { const t = props.task; const cardRef = useRef(null); @@ -811,6 +832,7 @@ className: cn( "hermes-kanban-card", props.selected ? "hermes-kanban-card--selected" : "", + stalenessClass(t), ), draggable: true, onDragStart: handleDragStart, @@ -1148,6 +1170,54 @@ ); }), ), + h(WorkerLogSection, { taskId: t.id }), + ); + } + + // Worker log: loads lazily (one GET on mount), refresh button, tail cap. + function WorkerLogSection(props) { + const [state, setState] = useState({ loading: false, data: null, err: null }); + const load = useCallback(function () { + setState({ loading: true, data: null, err: null }); + SDK.fetchJSON(`${API}/tasks/${encodeURIComponent(props.taskId)}/log?tail=100000`) + .then(function (d) { setState({ loading: false, data: d, err: null }); }) + .catch(function (e) { setState({ loading: false, data: null, err: String(e.message || e) }); }); + }, [props.taskId]); + + // Auto-load when the section mounts; the user opened the drawer so the + // cost is one small HTTP round-trip. + useEffect(function () { load(); }, [load]); + + const data = state.data; + let body; + if (state.loading) { + body = h("div", { className: "text-xs text-muted-foreground" }, "Loading log…"); + } else if (state.err) { + body = h("div", { className: "text-xs text-destructive" }, state.err); + } else if (!data || !data.exists) { + body = h("div", { className: "text-xs text-muted-foreground italic" }, + "— no worker log yet (task hasn't spawned or log was rotated away) —"); + } else { + body = h("pre", { className: "hermes-kanban-pre hermes-kanban-log" }, + data.content || "(empty)"); + } + + return h("div", { className: "hermes-kanban-section" }, + h("div", { className: "hermes-kanban-section-head-row" }, + h("span", { className: "hermes-kanban-section-head" }, + "Worker log" + (data && data.size_bytes ? ` (${data.size_bytes} B)` : "")), + h("button", { + type: "button", + onClick: load, + className: "hermes-kanban-edit-link", + title: "Refresh log", + }, "refresh"), + ), + body, + data && data.truncated + ? h("div", { className: "text-xs text-muted-foreground" }, + "(showing last 100 KB — full log at ", data.path, ")") + : null, ); } diff --git a/plugins/kanban/dashboard/dist/style.css b/plugins/kanban/dashboard/dist/style.css index 5d944071db..8d1d83ebd9 100644 --- a/plugins/kanban/dashboard/dist/style.css +++ b/plugins/kanban/dashboard/dist/style.css @@ -654,3 +654,31 @@ transform: scale(1.02); transition: none; } + + +/* ---- Staleness tiers ------------------------------------------------ */ + +.hermes-kanban-card--stale-amber :where(.hermes-kanban-card-content) { + box-shadow: 0 0 0 1px #d4b34888 inset; +} +.hermes-kanban-card--stale-amber:hover :where(.hermes-kanban-card-content) { + box-shadow: 0 0 0 2px #d4b348 inset; +} +.hermes-kanban-card--stale-red :where(.hermes-kanban-card-content) { + box-shadow: 0 0 0 1px var(--color-destructive, #d14a4a) inset, + 0 0 8px color-mix(in srgb, var(--color-destructive, #d14a4a) 30%, transparent); +} +.hermes-kanban-card--stale-red:hover :where(.hermes-kanban-card-content) { + box-shadow: 0 0 0 2px var(--color-destructive, #d14a4a) inset, + 0 0 10px color-mix(in srgb, var(--color-destructive, #d14a4a) 45%, transparent); +} + +/* ---- Worker log pane ------------------------------------------------ */ + +.hermes-kanban-log { + max-height: 340px; + overflow: auto; + white-space: pre; + font-size: 0.7rem; + line-height: 1.45; +} diff --git a/plugins/kanban/dashboard/plugin_api.py b/plugins/kanban/dashboard/plugin_api.py index 7de40a276d..0f7cac6f75 100644 --- a/plugins/kanban/dashboard/plugin_api.py +++ b/plugins/kanban/dashboard/plugin_api.py @@ -100,6 +100,9 @@ BOARD_COLUMNS: list[str] = [ def _task_dict(task: kanban_db.Task) -> dict[str, Any]: d = asdict(task) + # Add derived age metrics so the UI can colour stale cards without + # computing deltas client-side. + d["age"] = kanban_db.task_age(task) # Keep body short on list endpoints; full body comes from /tasks/:id. return d @@ -278,6 +281,7 @@ class CreateTaskBody(BaseModel): workspace_path: Optional[str] = None parents: list[str] = Field(default_factory=list) triage: bool = False + idempotency_key: Optional[str] = None @router.post("/tasks") @@ -296,6 +300,7 @@ def create_task(payload: CreateTaskBody): priority=payload.priority, parents=payload.parents, triage=payload.triage, + idempotency_key=payload.idempotency_key, ) task = kanban_db.get_task(conn, task_id) return {"task": _task_dict(task) if task else None} @@ -603,6 +608,60 @@ def get_config(): } +# --------------------------------------------------------------------------- +# Stats (per-profile / per-status counts + oldest-ready age) +# --------------------------------------------------------------------------- + +@router.get("/stats") +def get_stats(): + """Per-status + per-assignee counts + oldest-ready age. + + Designed for the dashboard HUD and for router profiles that need to + answer "is this specialist overloaded?" without scanning the whole + board themselves. + """ + conn = _conn() + try: + return kanban_db.board_stats(conn) + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Worker log (read-only; file written by _default_spawn) +# --------------------------------------------------------------------------- + +@router.get("/tasks/{task_id}/log") +def get_task_log(task_id: str, tail: Optional[int] = Query(None, ge=1, le=2_000_000)): + """Return the worker's stdout/stderr log. + + ``tail`` caps the response size (bytes) so the dashboard drawer + doesn't paginate megabytes into the browser. Returns 404 if the task + has never spawned. The on-disk log is rotated at 2 MiB per + ``_rotate_worker_log`` — a single ``.log.1`` is kept, no further + generations, so disk usage per task is bounded at ~4 MiB. + """ + conn = _conn() + try: + task = kanban_db.get_task(conn, task_id) + finally: + conn.close() + if task is None: + raise HTTPException(status_code=404, detail=f"task {task_id} not found") + content = kanban_db.read_worker_log(task_id, tail_bytes=tail) + log_path = kanban_db.worker_log_path(task_id) + size = log_path.stat().st_size if log_path.exists() else 0 + return { + "task_id": task_id, + "path": str(log_path), + "exists": content is not None, + "size_bytes": size, + "content": content or "", + # Truncated when the on-disk file was larger than the tail cap. + "truncated": bool(tail and size > tail), + } + + # --------------------------------------------------------------------------- # Dispatch nudge (optional quick-path so the UI doesn't wait 60 s) # --------------------------------------------------------------------------- diff --git a/plugins/kanban/systemd/hermes-kanban-dispatcher.service b/plugins/kanban/systemd/hermes-kanban-dispatcher.service new file mode 100644 index 0000000000..0f8d1d8440 --- /dev/null +++ b/plugins/kanban/systemd/hermes-kanban-dispatcher.service @@ -0,0 +1,17 @@ +[Unit] +Description=Hermes Kanban dispatcher (hermes kanban daemon) +Documentation=https://hermes-agent.nousresearch.com/docs/user-guide/features/kanban +After=network.target + +[Service] +Type=simple +ExecStart=/usr/bin/env hermes kanban daemon --interval 60 --pidfile %t/hermes-kanban-dispatcher.pid +Restart=on-failure +RestartSec=5 +# Log to the journal via stdout/stderr; the dispatcher also writes per-task +# worker output to $HERMES_HOME/kanban/logs/.log. +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=default.target diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py new file mode 100644 index 0000000000..f52a8f9cdd --- /dev/null +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -0,0 +1,612 @@ +"""Core-functionality tests for the kanban kernel + CLI additions. + +Complements tests/hermes_cli/test_kanban_db.py (schema + CAS atomicity) +and tests/hermes_cli/test_kanban_cli.py (end-to-end run_slash). The +tests here exercise the pieces added as part of the kanban hardening +pass: circuit breaker, crash detection, daemon loop, idempotency, +retention/gc, stats, notify subscriptions, worker log accessor, run_slash +parity across every registered verb. +""" + +from __future__ import annotations + +import json +import os +import threading +import time +from pathlib import Path +from typing import Optional + +import pytest + +from hermes_cli import kanban_db as kb +from hermes_cli.kanban import run_slash + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def kanban_home(tmp_path, monkeypatch): + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + kb.init_db() + return home + + +# --------------------------------------------------------------------------- +# Idempotency key +# --------------------------------------------------------------------------- + +def test_idempotency_key_returns_existing_task(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="first", idempotency_key="abc") + b = kb.create_task(conn, title="second attempt", idempotency_key="abc") + assert a == b, "same idempotency_key should return the same task id" + # And body wasn't overwritten — first create wins. + task = kb.get_task(conn, a) + assert task.title == "first" + finally: + conn.close() + + +def test_idempotency_key_ignored_for_archived(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="first", idempotency_key="abc") + kb.archive_task(conn, a) + b = kb.create_task(conn, title="second", idempotency_key="abc") + assert a != b, "archived task shouldn't block a fresh create with same key" + finally: + conn.close() + + +def test_no_idempotency_key_never_collides(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a") + b = kb.create_task(conn, title="b") + assert a != b + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Spawn-failure circuit breaker +# --------------------------------------------------------------------------- + +def test_spawn_failure_auto_blocks_after_limit(kanban_home): + """N consecutive spawn failures on the same task → auto_blocked.""" + def _bad_spawn(task, ws): + raise RuntimeError("no PATH") + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + # Three ticks below the default limit (5) → still ready, counter grows. + for i in range(3): + res = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) + assert tid not in res.auto_blocked + task = kb.get_task(conn, tid) + assert task.status == "ready" + assert task.spawn_failures == 3 + + # Two more ticks → fifth failure exceeds the limit. + res1 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) + assert tid not in res1.auto_blocked + res2 = kb.dispatch_once(conn, spawn_fn=_bad_spawn, failure_limit=5) + assert tid in res2.auto_blocked + task = kb.get_task(conn, tid) + assert task.status == "blocked" + assert task.spawn_failures >= 5 + assert task.last_spawn_error and "no PATH" in task.last_spawn_error + finally: + conn.close() + + +def test_successful_spawn_resets_failure_counter(kanban_home): + """A successful spawn clears the counter so past failures don't count + against future retries of the same task.""" + calls = [0] + def _flaky_spawn(task, ws): + calls[0] += 1 + if calls[0] <= 2: + raise RuntimeError("transient") + return 99999 # pid value — harmless; crash detection will clear it + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + # Two failures + one success. + kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) + kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) + task = kb.get_task(conn, tid) + assert task.spawn_failures == 2 + kb.dispatch_once(conn, spawn_fn=_flaky_spawn, failure_limit=5) + task = kb.get_task(conn, tid) + assert task.spawn_failures == 0 + assert task.last_spawn_error is None + # Task is now running with a pid. + assert task.status == "running" + assert task.worker_pid == 99999 + finally: + conn.close() + + +def test_workspace_resolution_failure_also_counts(kanban_home): + """`dir:` workspace with no path should fail workspace resolution AND + count against the failure budget — not just crash the tick.""" + conn = kb.connect() + try: + # Manually insert a broken task: dir workspace but workspace_path is NULL + # after initial create. We achieve this by creating via kanban_db then + # UPDATE-ing workspace_path to NULL. + tid = kb.create_task( + conn, title="x", assignee="worker", + workspace_kind="dir", workspace_path="/tmp/kanban_e2e_dir", + ) + with kb.write_txn(conn): + conn.execute( + "UPDATE tasks SET workspace_path = NULL WHERE id = ?", (tid,), + ) + res = kb.dispatch_once(conn, failure_limit=3) + task = kb.get_task(conn, tid) + assert task.spawn_failures == 1 + assert task.status == "ready" + assert task.last_spawn_error and "workspace" in task.last_spawn_error + # Run twice more → auto-blocked. + kb.dispatch_once(conn, failure_limit=3) + res = kb.dispatch_once(conn, failure_limit=3) + assert tid in res.auto_blocked + task = kb.get_task(conn, tid) + assert task.status == "blocked" + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Worker aliveness / crash detection +# --------------------------------------------------------------------------- + +def test_pid_alive_helper(): + # Our own pid is alive. + assert kb._pid_alive(os.getpid()) + # PID 0 / None / negative. + assert not kb._pid_alive(0) + assert not kb._pid_alive(None) + # A clearly-dead pid (very large, extremely unlikely to exist). + assert not kb._pid_alive(2 ** 30) + + +def test_detect_crashed_workers_reclaims(kanban_home): + """A running task whose pid vanished gets dropped to ready with a + ``crashed`` event, independent of the claim TTL.""" + def _spawn_pid_that_exits(task, ws): + # Spawn a real child that exits instantly. + import subprocess + p = subprocess.Popen( + ["python3", "-c", "pass"], stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL, + ) + p.wait() + return p.pid + + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="worker") + res = kb.dispatch_once(conn, spawn_fn=_spawn_pid_that_exits) + # Brief sleep to make sure the child's pid has been reaped; on + # busy CI the pid may be reused by another process, which would + # fool _pid_alive. If that happens we accept the test still + # passing as long as the dispatcher ran without error. + time.sleep(0.2) + res2 = kb.dispatch_once(conn) + task = kb.get_task(conn, tid) + # Either crashed was detected (preferred) or the TTL reclaim path + # will eventually fire; we accept either outcome but the worker_pid + # should no longer be set. + if res2.crashed: + assert tid in res2.crashed + events = kb.list_events(conn, tid) + assert any(e.kind == "crashed" for e in events) + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Daemon loop +# --------------------------------------------------------------------------- + +def test_daemon_runs_and_stops(kanban_home): + """run_daemon should execute at least one tick and exit cleanly on + stop_event.""" + ticks = [] + stop = threading.Event() + + def _runner(): + kb.run_daemon( + interval=0.05, + stop_event=stop, + on_tick=lambda res: ticks.append(res), + ) + + t = threading.Thread(target=_runner, daemon=True) + t.start() + # Give it a few ticks. + time.sleep(0.3) + stop.set() + t.join(timeout=2.0) + assert not t.is_alive(), "daemon should exit on stop_event" + assert len(ticks) >= 1, "expected at least one tick" + + +def test_daemon_keeps_going_after_tick_exception(kanban_home, monkeypatch): + """A tick that raises shouldn't kill the loop.""" + calls = [0] + orig_dispatch = kb.dispatch_once + + def _boom(conn, **kw): + calls[0] += 1 + if calls[0] == 1: + raise RuntimeError("simulated tick failure") + return orig_dispatch(conn, **kw) + + monkeypatch.setattr(kb, "dispatch_once", _boom) + + stop = threading.Event() + def _runner(): + kb.run_daemon(interval=0.05, stop_event=stop) + + t = threading.Thread(target=_runner, daemon=True) + t.start() + time.sleep(0.3) + stop.set() + t.join(timeout=2.0) + # At minimum, second-tick+ should have run. + assert calls[0] >= 2 + + +# --------------------------------------------------------------------------- +# Stats + age +# --------------------------------------------------------------------------- + +def test_board_stats(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a", assignee="x") + b = kb.create_task(conn, title="b", assignee="y") + kb.complete_task(conn, a, result="done") + stats = kb.board_stats(conn) + assert stats["by_status"]["ready"] == 1 + assert stats["by_status"]["done"] == 1 + assert stats["by_assignee"]["x"]["done"] == 1 + assert stats["by_assignee"]["y"]["ready"] == 1 + assert stats["oldest_ready_age_seconds"] is not None + finally: + conn.close() + + +def test_task_age_helper(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + task = kb.get_task(conn, tid) + age = kb.task_age(task) + assert age["created_age_seconds"] is not None + assert age["started_age_seconds"] is None + assert age["time_to_complete_seconds"] is None + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Notify subscriptions +# --------------------------------------------------------------------------- + +def test_notify_sub_crud(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + kb.add_notify_sub( + conn, task_id=tid, platform="telegram", chat_id="123", user_id="u1", + ) + subs = kb.list_notify_subs(conn, tid) + assert len(subs) == 1 + assert subs[0]["platform"] == "telegram" + # Duplicate add is a no-op. + kb.add_notify_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + ) + assert len(kb.list_notify_subs(conn, tid)) == 1 + # Distinct thread is a new row. + kb.add_notify_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + thread_id="5", + ) + assert len(kb.list_notify_subs(conn, tid)) == 2 + # Remove one. + ok = kb.remove_notify_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + ) + assert ok is True + assert len(kb.list_notify_subs(conn, tid)) == 1 + finally: + conn.close() + + +def test_notify_cursor_advances(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x", assignee="w") + kb.add_notify_sub(conn, task_id=tid, platform="telegram", chat_id="123") + # Initial: one "created" event but we only want terminal kinds. + cursor, events = kb.unseen_events_for_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + kinds=["completed", "blocked"], + ) + assert events == [] + # Complete the task → new `completed` event. + kb.complete_task(conn, tid, result="ok") + cursor, events = kb.unseen_events_for_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + kinds=["completed", "blocked"], + ) + assert len(events) == 1 + assert events[0].kind == "completed" + # Advance cursor — next call returns empty. + kb.advance_notify_cursor( + conn, task_id=tid, platform="telegram", chat_id="123", + new_cursor=cursor, + ) + _, events2 = kb.unseen_events_for_sub( + conn, task_id=tid, platform="telegram", chat_id="123", + kinds=["completed", "blocked"], + ) + assert events2 == [] + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# GC + retention +# --------------------------------------------------------------------------- + +def test_gc_events_keeps_active_task_history(kanban_home): + """gc_events should only prune rows for terminal (done/archived) tasks.""" + conn = kb.connect() + try: + alive = kb.create_task(conn, title="a", assignee="w") + done_id = kb.create_task(conn, title="b", assignee="w") + kb.complete_task(conn, done_id) + + # Force all existing events to "old" by bumping created_at backwards. + with kb.write_txn(conn): + conn.execute( + "UPDATE task_events SET created_at = ?", + (int(time.time()) - 60 * 24 * 3600,), + ) + removed = kb.gc_events(conn, older_than_seconds=30 * 24 * 3600) + # At least the done task's "created" + "completed" events gone. + assert removed >= 2 + # Alive task's events survive. + alive_events = kb.list_events(conn, alive) + assert len(alive_events) >= 1 + finally: + conn.close() + + +def test_gc_worker_logs_deletes_old_files(kanban_home): + log_dir = kanban_home / "kanban" / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + old = log_dir / "old.log" + young = log_dir / "young.log" + old.write_text("stale") + young.write_text("fresh") + # Age the old file by 100 days. + past = time.time() - 100 * 24 * 3600 + os.utime(old, (past, past)) + removed = kb.gc_worker_logs(older_than_seconds=30 * 24 * 3600) + assert removed == 1 + assert not old.exists() + assert young.exists() + + +# --------------------------------------------------------------------------- +# Log rotation + accessor +# --------------------------------------------------------------------------- + +def test_worker_log_rotation_keeps_one_generation(kanban_home, tmp_path): + log_dir = kanban_home / "kanban" / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + target = log_dir / "t_aaaa.log" + target.write_bytes(b"x" * (3 * 1024 * 1024)) # 3 MiB, over 2 MiB threshold + kb._rotate_worker_log(target, kb.DEFAULT_LOG_ROTATE_BYTES) + assert not target.exists() + assert (log_dir / "t_aaaa.log.1").exists() + + +def test_read_worker_log_tail(kanban_home): + log_dir = kanban_home / "kanban" / "logs" + log_dir.mkdir(parents=True, exist_ok=True) + p = log_dir / "t_beef.log" + # 10 lines + p.write_text("\n".join(f"line {i}" for i in range(10))) + full = kb.read_worker_log("t_beef") + assert full is not None and "line 0" in full + tail = kb.read_worker_log("t_beef", tail_bytes=30) + assert tail is not None + # Tail should not include line 0. + assert "line 0" not in tail + # Missing log returns None. + assert kb.read_worker_log("t_missing") is None + + +# --------------------------------------------------------------------------- +# CLI bulk verbs +# --------------------------------------------------------------------------- + +def test_cli_complete_bulk(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a") + b = kb.create_task(conn, title="b") + c = kb.create_task(conn, title="c") + finally: + conn.close() + out = run_slash(f"complete {a} {b} {c} --result all-done") + assert out.count("Completed") == 3 + conn = kb.connect() + try: + for tid in (a, b, c): + assert kb.get_task(conn, tid).status == "done" + finally: + conn.close() + + +def test_cli_archive_bulk(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a") + b = kb.create_task(conn, title="b") + finally: + conn.close() + out = run_slash(f"archive {a} {b}") + assert "Archived" in out + conn = kb.connect() + try: + assert kb.get_task(conn, a).status == "archived" + assert kb.get_task(conn, b).status == "archived" + finally: + conn.close() + + +def test_cli_unblock_bulk(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a") + b = kb.create_task(conn, title="b") + kb.block_task(conn, a) + kb.block_task(conn, b) + finally: + conn.close() + out = run_slash(f"unblock {a} {b}") + assert out.count("Unblocked") == 2 + + +def test_cli_block_bulk_via_ids_flag(kanban_home): + conn = kb.connect() + try: + a = kb.create_task(conn, title="a") + b = kb.create_task(conn, title="b") + finally: + conn.close() + out = run_slash(f"block {a} need input --ids {b}") + assert out.count("Blocked") == 2 + + +def test_cli_create_with_idempotency_key(kanban_home): + out1 = run_slash("create 'x' --idempotency-key abc --json") + tid1 = json.loads(out1)["id"] + out2 = run_slash("create 'y' --idempotency-key abc --json") + tid2 = json.loads(out2)["id"] + assert tid1 == tid2 + + +# --------------------------------------------------------------------------- +# CLI stats / watch / log / notify / daemon parity +# --------------------------------------------------------------------------- + +def test_cli_stats_json(kanban_home): + conn = kb.connect() + try: + kb.create_task(conn, title="a", assignee="r") + finally: + conn.close() + out = run_slash("stats --json") + data = json.loads(out) + assert "by_status" in data + assert "by_assignee" in data + assert "oldest_ready_age_seconds" in data + + +def test_cli_notify_subscribe_and_list(kanban_home): + tid = run_slash("create 'x' --json") + tid = json.loads(tid)["id"] + out = run_slash( + f"notify-subscribe {tid} --platform telegram --chat-id 999", + ) + assert "Subscribed" in out + lst = run_slash("notify-list --json") + subs = json.loads(lst) + assert any(s["task_id"] == tid and s["platform"] == "telegram" for s in subs) + rm = run_slash( + f"notify-unsubscribe {tid} --platform telegram --chat-id 999", + ) + assert "Unsubscribed" in rm + + +def test_cli_log_missing_task(kanban_home): + # No such task → exit-style (no log for...) message on stderr, returned + # in combined output. + out = run_slash("log t_nope") + assert "no log" in out.lower() + + +def test_cli_gc_reports_counts(kanban_home): + conn = kb.connect() + try: + tid = kb.create_task(conn, title="x") + kb.archive_task(conn, tid) + finally: + conn.close() + out = run_slash("gc") + assert "GC complete" in out + + +# --------------------------------------------------------------------------- +# run_slash parity — every verb returns a sensible, non-crashy string +# --------------------------------------------------------------------------- + +def test_run_slash_every_verb_returns_sensible_output(kanban_home): + """Smoke-test every verb with minimal args. None may raise, none may + return the empty string (must either succeed or report a usage error).""" + # Set up a pair of tasks to reference. + conn = kb.connect() + try: + tid_a = kb.create_task(conn, title="a") + tid_b = kb.create_task(conn, title="b", parents=[tid_a]) + finally: + conn.close() + + invocations = [ + "", # no subcommand → help text + "--help", + "init", + "create 'smoke'", + "list", + "ls", + f"show {tid_a}", + f"assign {tid_a} researcher", + f"link {tid_a} {tid_b}", + f"unlink {tid_a} {tid_b}", + f"claim {tid_a}", + f"comment {tid_a} hello", + f"complete {tid_a}", + f"block {tid_b} need input", + f"unblock {tid_b}", + f"archive {tid_a}", + "dispatch --dry-run --json", + "stats --json", + "notify-list", + f"log {tid_a}", + f"context {tid_b}", + "gc", + ] + for cmd in invocations: + out = run_slash(cmd) + assert out is not None + assert out.strip() != "", f"empty output for `/kanban {cmd}`" diff --git a/website/docs/user-guide/features/kanban.md b/website/docs/user-guide/features/kanban.md index 37bd2a2524..62ff2d3c5f 100644 --- a/website/docs/user-guide/features/kanban.md +++ b/website/docs/user-guide/features/kanban.md @@ -43,14 +43,14 @@ They coexist: a kanban worker may call `delegate_task` internally during its run ## Core concepts -- **Task** — a row with title, optional body, one assignee (a profile name), status (`todo | ready | running | blocked | done | archived`), optional tenant namespace. +- **Task** — a row with title, optional body, one assignee (a profile name), status (`triage | todo | ready | running | blocked | done | archived`), optional tenant namespace, optional idempotency key (dedup for retried automation). - **Link** — `task_links` row recording a parent → child dependency. The dispatcher promotes `todo → ready` when all parents are `done`. - **Comment** — the inter-agent protocol. Agents and humans append comments; when a worker is (re-)spawned it reads the full comment thread as part of its context. - **Workspace** — the directory a worker operates in. Three kinds: - `scratch` (default) — fresh tmp dir under `~/.hermes/kanban/workspaces//`. - `dir:` — an existing shared directory (Obsidian vault, mail ops dir, per-account folder). - `worktree` — a git worktree under `.worktrees//` for coding tasks. -- **Dispatcher** — `hermes kanban dispatch` runs a one-shot pass: reclaim stale claims, promote ready tasks, atomically claim, spawn assigned profiles. Runs via cron every 60 seconds. +- **Dispatcher** — a long-lived loop that, every N seconds (default 60): reclaims stale claims, reclaims crashed workers (PID gone but TTL not yet expired), promotes ready tasks, atomically claims, spawns assigned profiles. Runs as `hermes kanban daemon` (foreground) or as a systemd user service. After ~5 consecutive spawn failures on the same task the dispatcher auto-blocks it with the last error as the reason — prevents thrashing on tasks whose profile doesn't exist, workspace can't mount, etc. - **Tenant** — optional string namespace. One specialist fleet can serve multiple businesses (`--tenant business-a`) with data isolation by workspace path and memory key prefix. ## Quick start @@ -59,23 +59,58 @@ They coexist: a kanban worker may call `delegate_task` internally during its run # 1. Create the board hermes kanban init -# 2. Create a task +# 2. Start the dispatcher (foreground; Ctrl-C to stop) +hermes kanban daemon & + +# 3. Create a task hermes kanban create "research AI funding landscape" --assignee researcher -# 3. List what's on the board -hermes kanban list +# 4. Watch activity live +hermes kanban watch -# 4. Run a dispatcher pass (dry-run to preview, real to spawn workers) -hermes kanban dispatch --dry-run -hermes kanban dispatch +# 5. See the board +hermes kanban list +hermes kanban stats ``` -To have the board run continuously, schedule the dispatcher: +### Running the dispatcher as a service + +For production, install the systemd user unit shipped at +`plugins/kanban/systemd/hermes-kanban-dispatcher.service`: ```bash -hermes cron add --schedule "*/1 * * * *" \ - --name kanban-dispatch \ - hermes kanban dispatch +mkdir -p ~/.config/systemd/user +cp plugins/kanban/systemd/hermes-kanban-dispatcher.service \ + ~/.config/systemd/user/ +systemctl --user daemon-reload +systemctl --user enable --now hermes-kanban-dispatcher.service +systemctl --user status hermes-kanban-dispatcher +journalctl --user -u hermes-kanban-dispatcher -f # follow logs +``` + +Without a running dispatcher `ready` tasks stay where they are — `hermes kanban init` will remind you of this on first run. + +### Idempotent create (for automation / webhooks) + +```bash +# First call creates the task. Any subsequent call with the same key +# returns the existing task id instead of duplicating. +hermes kanban create "nightly ops review" \ + --assignee ops \ + --idempotency-key "nightly-ops-$(date -u +%Y-%m-%d)" \ + --json +``` + +### Bulk CLI verbs + +All the lifecycle verbs accept multiple ids so you can clean up a batch +in one command: + +```bash +hermes kanban complete t_abc t_def t_hij --result "batch wrap" +hermes kanban archive t_abc t_def t_hij +hermes kanban unblock t_abc t_def +hermes kanban block t_abc "need input" --ids t_def t_hij ``` ## The worker skill @@ -223,11 +258,12 @@ The GUI is deliberately thin. Everything the plugin does is reachable from the C ## CLI command reference ``` -hermes kanban init # create kanban.db +hermes kanban init # create kanban.db + print daemon hint hermes kanban create "" [--body ...] [--assignee <profile>] [--parent <id>]... [--tenant <name>] [--workspace scratch|worktree|dir:<path>] - [--priority N] [--triage] [--json] + [--priority N] [--triage] [--idempotency-key KEY] + [--json] hermes kanban list [--mine] [--assignee P] [--status S] [--tenant T] [--archived] [--json] hermes kanban show <id> [--json] hermes kanban assign <id> <profile> # or 'none' to unassign @@ -235,14 +271,30 @@ hermes kanban link <parent_id> <child_id> hermes kanban unlink <parent_id> <child_id> hermes kanban claim <id> [--ttl SECONDS] hermes kanban comment <id> "<text>" [--author NAME] -hermes kanban complete <id> [--result "..."] -hermes kanban block <id> "<reason>" -hermes kanban unblock <id> -hermes kanban archive <id> -hermes kanban tail <id> # follow event stream -hermes kanban dispatch [--dry-run] [--max N] [--json] + +# Bulk verbs — accept multiple ids: +hermes kanban complete <id>... [--result "..."] +hermes kanban block <id> "<reason>" [--ids <id>...] +hermes kanban unblock <id>... +hermes kanban archive <id>... + +hermes kanban tail <id> # follow a single task's event stream +hermes kanban watch [--assignee P] [--tenant T] # live stream ALL events to the terminal + [--kinds completed,blocked,…] [--interval SECS] +hermes kanban dispatch [--dry-run] [--max N] # one-shot pass + [--failure-limit N] [--json] +hermes kanban daemon [--interval SECS] [--max N] # long-lived loop + [--failure-limit N] [--pidfile PATH] [-v] +hermes kanban stats [--json] # per-status + per-assignee counts +hermes kanban log <id> [--tail BYTES] # worker log from ~/.hermes/kanban/logs/ +hermes kanban notify-subscribe <id> # gateway bridge hook (used by /kanban in the gateway) + --platform <name> --chat-id <id> [--thread-id <id>] [--user-id <id>] +hermes kanban notify-list [<id>] [--json] +hermes kanban notify-unsubscribe <id> + --platform <name> --chat-id <id> [--thread-id <id>] hermes kanban context <id> # what a worker sees -hermes kanban gc # remove scratch dirs of archived tasks +hermes kanban gc [--event-retention-days N] # workspaces + old events + old logs + [--log-retention-days N] ``` All commands are also available as a slash command in the gateway (`/kanban list`, `/kanban comment t_abc "need docs"`, etc.). The slash command bypasses the running-agent guard, so you can `/kanban unblock` a stuck worker while the main agent is still chatting. @@ -278,6 +330,26 @@ hermes kanban create "monthly report" \ Workers receive `$HERMES_TENANT` and namespace their memory writes by prefix. The board, the dispatcher, and the profile definitions are all shared; only the data is scoped. +## Gateway notifications + +When you run `/kanban create …` from the gateway (Telegram, Discord, Slack, etc.), the originating chat is automatically subscribed to the new task. The gateway's background notifier polls `task_events` every few seconds and delivers one message per terminal event (`completed`, `blocked`, `spawn_auto_blocked`, `crashed`) to that chat. Completed tasks also send the first line of the worker's `--result` so you see the outcome without having to `/kanban show`. + +You can manage subscriptions explicitly from the CLI — useful when a script / cron job wants to notify a chat it didn't originate from: + +```bash +hermes kanban notify-subscribe t_abcd \ + --platform telegram --chat-id 12345678 --thread-id 7 +hermes kanban notify-list +hermes kanban notify-unsubscribe t_abcd \ + --platform telegram --chat-id 12345678 --thread-id 7 +``` + +A subscription removes itself automatically once the task reaches `done` or `archived`; no cleanup needed. + +## Out of scope + +Kanban is deliberately single-host. `~/.hermes/kanban.db` is a local SQLite file and the dispatcher spawns workers on the same machine. Running a shared board across two hosts is not supported — there's no coordination primitive for "worker X on host A, worker Y on host B," and the crash-detection path assumes PIDs are host-local. If you need multi-host, run an independent board per host and use `delegate_task` / a message queue to bridge them. + ## Design spec The complete design — architecture, concurrency correctness, comparison with other systems, implementation plan, risks, open questions — lives in `docs/hermes-kanban-v1-spec.pdf`. Read that before filing any behavior-change PR.