Files
hermes-agent/tools/kanban_tools.py
teknium1 832ecde4b0 feat(kanban): structured tool surface for worker + orchestrator agents
Seven new tools in `tools/kanban_tools.py` that give kanban workers a
backend-portable, schema-filtered way to interact with the board from
inside their own Python process — no shelling out to `hermes kanban`.

Motivation
  The CLI path (`hermes kanban complete \$TASK --summary ...`) breaks
  on any remote terminal backend (Docker, Modal, Singularity, SSH).
  The terminal tool runs `hermes kanban` inside the container, where
  `hermes` isn't installed and `~/.hermes/kanban.db` isn't mounted.
  Tools run in the agent's own Python process, so they always reach
  the board regardless of backend. Also skips shell-quoting fragility
  on --metadata JSON and gives structured error returns the model can
  reason about.

The seven tools
  kanban_show        read current task (defaults to HERMES_KANBAN_TASK)
  kanban_complete    structured handoff: summary + metadata
  kanban_block       ask for human input
  kanban_heartbeat   signal liveness during long operations
  kanban_comment     append to task thread
  kanban_create      fan out into child tasks (orchestrator path)
  kanban_link        add parent→child dependency after the fact

Gating
  Each tool's check_fn returns True iff HERMES_KANBAN_TASK is set in
  the process env. The dispatcher sets it when spawning a worker;
  normal `hermes chat` sessions never have it. Empirically verified:
  a baseline hermes-cli schema is 27 tools; with HERMES_KANBAN_TASK
  set it grows to exactly 34 (+7). Zero leak into normal sessions.

Also set HERMES_PROFILE in the spawn env so the kanban_comment tool's
author default works cleanly (it's what the tool reads to attribute
comments).

Skill updates
  - `skills/devops/kanban-worker/SKILL.md`: lifecycle rewritten to use
    kanban_show / kanban_heartbeat / kanban_block / kanban_complete /
    kanban_comment / kanban_create directly. CLI fallback section
    added for human operators / scripts.
  - `skills/devops/kanban-orchestrator/SKILL.md`: all examples ported
    from CLI to tool form; top-banner note explaining tools are the
    primary surface. kanban_create / kanban_link throughout.

Docs
  `website/docs/user-guide/features/kanban.md`:
  new "How workers interact with the board" section explaining the
  tool surface, gating mechanism, and why tools vs CLI. The worker
  skill / orchestrator skill subsections are now nested under it.

Tests (+25 in tests/tools/test_kanban_tools.py)
  - Schema gating: kanban_tools_hidden_without_env_var,
    kanban_tools_visible_with_env_var.
  - Happy paths: show (default + explicit task_id), complete (with
    summary+metadata, with result only), block, heartbeat (with and
    without note), comment (default + custom author), create (with
    list parents, with string parent), link.
  - Error paths: complete rejects no-handoff and non-dict metadata,
    block rejects empty reason, comment rejects empty body, create
    rejects no title / no assignee / non-list parents, link rejects
    self-reference / missing args / cycles.
  - End-to-end: full worker lifecycle driven entirely through the
    tools, verified against DB state.

214/214 kanban suite pass under scripts/run_tests.sh.
2026-04-28 04:30:22 -07:00

705 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Kanban tools — structured tool-call surface for worker + orchestrator agents.
These tools are only registered into the model's schema when the agent is
running under the dispatcher (env var ``HERMES_KANBAN_TASK`` set). A
normal ``hermes chat`` session sees **zero** kanban tools in its schema.
Why tools instead of just shelling out to ``hermes kanban``?
1. **Backend portability.** A worker whose terminal tool points at Docker
/ Modal / Singularity / SSH would run ``hermes kanban complete …``
inside the container, where ``hermes`` isn't installed and the DB
isn't mounted. Tools run in the agent's Python process, so they
always reach ``~/.hermes/kanban.db`` regardless of terminal backend.
2. **No shell-quoting footguns.** Passing ``--metadata '{"x": [...]}'``
through shlex+argparse is fragile. Structured tool args skip it.
3. **Better errors.** Tool-call failures return structured JSON the
model can reason about, not stderr strings it has to parse.
Humans continue to use the CLI (``hermes kanban …``), the dashboard
(``hermes dashboard``), and the slash command (``/kanban …``) — all
three bypass the agent entirely. The tools are ONLY for the worker
agent's handoff back to the kernel.
"""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Optional
from tools.registry import registry, tool_error
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Gating
# ---------------------------------------------------------------------------
def _check_kanban_mode() -> bool:
"""Tools are available iff the current process has ``HERMES_KANBAN_TASK``
set in its env, which the dispatcher sets when spawning a worker.
Humans running ``hermes chat`` see zero kanban tools. Workers spawned
by ``hermes kanban daemon`` see all seven.
"""
return bool(os.environ.get("HERMES_KANBAN_TASK"))
# ---------------------------------------------------------------------------
# Shared helpers
# ---------------------------------------------------------------------------
def _default_task_id(arg: Optional[str]) -> Optional[str]:
"""Resolve ``task_id`` arg or fall back to the env var the dispatcher set."""
if arg:
return arg
env_tid = os.environ.get("HERMES_KANBAN_TASK")
return env_tid or None
def _connect():
"""Import + connect lazily so the module imports cleanly in non-kanban
contexts (e.g. test rigs that import every tool module)."""
from hermes_cli import kanban_db as kb
return kb, kb.connect()
def _ok(**fields: Any) -> str:
return json.dumps({"ok": True, **fields})
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
def _handle_show(args: dict, **kw) -> str:
"""Read a task's full state: task row, parents, children, comments,
runs (attempt history), and the last N events."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
try:
kb, conn = _connect()
try:
task = kb.get_task(conn, tid)
if task is None:
return tool_error(f"task {tid} not found")
comments = kb.list_comments(conn, tid)
events = kb.list_events(conn, tid)
runs = kb.list_runs(conn, tid)
parents = kb.parent_ids(conn, tid)
children = kb.child_ids(conn, tid)
def _task_dict(t):
return {
"id": t.id, "title": t.title, "body": t.body,
"assignee": t.assignee, "status": t.status,
"tenant": t.tenant, "priority": t.priority,
"workspace_kind": t.workspace_kind,
"workspace_path": t.workspace_path,
"created_by": t.created_by, "created_at": t.created_at,
"started_at": t.started_at,
"completed_at": t.completed_at,
"result": t.result,
"current_run_id": t.current_run_id,
}
def _run_dict(r):
return {
"id": r.id, "profile": r.profile,
"status": r.status, "outcome": r.outcome,
"summary": r.summary, "error": r.error,
"metadata": r.metadata,
"started_at": r.started_at, "ended_at": r.ended_at,
}
return json.dumps({
"task": _task_dict(task),
"parents": parents,
"children": children,
"comments": [
{"author": c.author, "body": c.body,
"created_at": c.created_at}
for c in comments
],
"events": [
{"kind": e.kind, "payload": e.payload,
"created_at": e.created_at, "run_id": e.run_id}
for e in events[-50:] # cap; full log via CLI
],
"runs": [_run_dict(r) for r in runs],
# Also surface the worker's own context block so the
# agent can include it directly if it wants. This is
# the same string build_worker_context returns to the
# dispatcher at spawn time.
"worker_context": kb.build_worker_context(conn, tid),
})
finally:
conn.close()
except Exception as e:
logger.exception("kanban_show failed")
return tool_error(f"kanban_show: {e}")
def _handle_complete(args: dict, **kw) -> str:
"""Mark the current task done with a structured handoff."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
summary = args.get("summary")
metadata = args.get("metadata")
result = args.get("result")
if not (summary or result):
return tool_error(
"provide at least one of: summary (preferred), result"
)
if metadata is not None and not isinstance(metadata, dict):
return tool_error(
f"metadata must be an object/dict, got {type(metadata).__name__}"
)
try:
kb, conn = _connect()
try:
ok = kb.complete_task(
conn, tid,
result=result, summary=summary, metadata=metadata,
)
if not ok:
return tool_error(
f"could not complete {tid} (unknown id or already terminal)"
)
run = kb.latest_run(conn, tid)
return _ok(task_id=tid, run_id=run.id if run else None)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_complete failed")
return tool_error(f"kanban_complete: {e}")
def _handle_block(args: dict, **kw) -> str:
"""Transition the task to blocked with a reason a human will read."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
reason = args.get("reason")
if not reason or not str(reason).strip():
return tool_error("reason is required — explain what input you need")
try:
kb, conn = _connect()
try:
ok = kb.block_task(conn, tid, reason=reason)
if not ok:
return tool_error(
f"could not block {tid} (unknown id or not in "
f"running/ready)"
)
run = kb.latest_run(conn, tid)
return _ok(task_id=tid, run_id=run.id if run else None)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_block failed")
return tool_error(f"kanban_block: {e}")
def _handle_heartbeat(args: dict, **kw) -> str:
"""Signal that the worker is still alive during a long operation."""
tid = _default_task_id(args.get("task_id"))
if not tid:
return tool_error(
"task_id is required (or set HERMES_KANBAN_TASK in the env)"
)
note = args.get("note")
try:
kb, conn = _connect()
try:
ok = kb.heartbeat_worker(conn, tid, note=note)
if not ok:
return tool_error(
f"could not heartbeat {tid} (unknown id or not running)"
)
return _ok(task_id=tid)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_heartbeat failed")
return tool_error(f"kanban_heartbeat: {e}")
def _handle_comment(args: dict, **kw) -> str:
"""Append a comment to a task's thread."""
tid = args.get("task_id")
if not tid:
return tool_error(
"task_id is required (use the current task id if that's what "
"you mean — pulls from env but kept explicit here)"
)
body = args.get("body")
if not body or not str(body).strip():
return tool_error("body is required")
author = args.get("author") or os.environ.get("HERMES_PROFILE") or "worker"
try:
kb, conn = _connect()
try:
cid = kb.add_comment(conn, tid, author=author, body=str(body))
return _ok(task_id=tid, comment_id=cid)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_comment failed")
return tool_error(f"kanban_comment: {e}")
def _handle_create(args: dict, **kw) -> str:
"""Create a child task. Orchestrator workers use this to fan out.
``parents`` can be a list of task ids; dependency-gated promotion
works as usual.
"""
title = args.get("title")
if not title or not str(title).strip():
return tool_error("title is required")
assignee = args.get("assignee")
if not assignee:
return tool_error(
"assignee is required — name the profile that should execute this "
"task (the dispatcher will only spawn tasks with an assignee)"
)
body = args.get("body")
parents = args.get("parents") or []
tenant = args.get("tenant") or os.environ.get("HERMES_TENANT")
priority = args.get("priority")
workspace_kind = args.get("workspace_kind") or "scratch"
workspace_path = args.get("workspace_path")
triage = bool(args.get("triage"))
idempotency_key = args.get("idempotency_key")
max_runtime_seconds = args.get("max_runtime_seconds")
if isinstance(parents, str):
parents = [parents]
if not isinstance(parents, (list, tuple)):
return tool_error(
f"parents must be a list of task ids, got {type(parents).__name__}"
)
try:
kb, conn = _connect()
try:
new_tid = kb.create_task(
conn,
title=str(title).strip(),
body=body,
assignee=str(assignee),
parents=tuple(parents),
tenant=tenant,
priority=int(priority) if priority is not None else 0,
workspace_kind=str(workspace_kind),
workspace_path=workspace_path,
triage=triage,
idempotency_key=idempotency_key,
max_runtime_seconds=(
int(max_runtime_seconds)
if max_runtime_seconds is not None else None
),
created_by=os.environ.get("HERMES_PROFILE") or "worker",
)
new_task = kb.get_task(conn, new_tid)
return _ok(
task_id=new_tid,
status=new_task.status if new_task else None,
)
finally:
conn.close()
except Exception as e:
logger.exception("kanban_create failed")
return tool_error(f"kanban_create: {e}")
def _handle_link(args: dict, **kw) -> str:
"""Add a parent→child dependency edge after the fact."""
parent_id = args.get("parent_id")
child_id = args.get("child_id")
if not parent_id or not child_id:
return tool_error("both parent_id and child_id are required")
try:
kb, conn = _connect()
try:
kb.link_tasks(conn, parent_id=parent_id, child_id=child_id)
return _ok(parent_id=parent_id, child_id=child_id)
finally:
conn.close()
except ValueError as e:
# Covers cycle + self-parent rejections
return tool_error(f"kanban_link: {e}")
except Exception as e:
logger.exception("kanban_link failed")
return tool_error(f"kanban_link: {e}")
# ---------------------------------------------------------------------------
# Schemas
# ---------------------------------------------------------------------------
_DESC_TASK_ID_DEFAULT = (
"Task id. If omitted, defaults to HERMES_KANBAN_TASK from the env "
"(the task the dispatcher spawned you to work on)."
)
KANBAN_SHOW_SCHEMA = {
"name": "kanban_show",
"description": (
"Read a task's full state — title, body, assignee, parent task "
"handoffs, your prior attempts on this task if any, comments, "
"and recent events. Use this to (re)orient yourself before "
"starting work, especially on retries. The response includes a "
"pre-formatted ``worker_context`` string suitable for inclusion "
"verbatim in your reasoning."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
},
"required": [],
},
}
KANBAN_COMPLETE_SCHEMA = {
"name": "kanban_complete",
"description": (
"Mark your current task done with a structured handoff for "
"downstream workers and humans. Prefer ``summary`` for a "
"human-readable 1-3 sentence description of what you did; put "
"machine-readable facts in ``metadata`` (changed_files, "
"tests_run, decisions, findings, etc). At least one of "
"``summary`` or ``result`` is required."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"summary": {
"type": "string",
"description": (
"Human-readable handoff, 1-3 sentences. Appears in "
"Run History on the dashboard and in downstream "
"workers' context."
),
},
"metadata": {
"type": "object",
"description": (
"Free-form dict of structured facts about this "
"attempt — {\"changed_files\": [...], \"tests_run\": 12, "
"\"findings\": [...]}. Surfaced to downstream "
"workers alongside ``summary``."
),
},
"result": {
"type": "string",
"description": (
"Short result log line (legacy field, maps to "
"task.result). Use ``summary`` instead when "
"possible; this exists for compatibility with "
"callers that still set --result on the CLI."
),
},
},
"required": [],
},
}
KANBAN_BLOCK_SCHEMA = {
"name": "kanban_block",
"description": (
"Transition the task to blocked because you need human input "
"to proceed. ``reason`` will be shown to the human on the "
"board and included in context when someone unblocks you. "
"Use for genuine blockers only — don't block on things you can "
"resolve yourself."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"reason": {
"type": "string",
"description": (
"What you need answered, in one or two sentences. "
"Don't paste the whole conversation; the human has "
"the board and can ask follow-ups via comments."
),
},
},
"required": ["reason"],
},
}
KANBAN_HEARTBEAT_SCHEMA = {
"name": "kanban_heartbeat",
"description": (
"Signal that you're still alive during a long operation "
"(training, encoding, large crawls). Call every few minutes so "
"humans see liveness separately from PID checks. Pure side "
"effect — no work changes."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": _DESC_TASK_ID_DEFAULT,
},
"note": {
"type": "string",
"description": (
"Optional short note describing current progress. "
"Shown in the event log."
),
},
},
"required": [],
},
}
KANBAN_COMMENT_SCHEMA = {
"name": "kanban_comment",
"description": (
"Append a comment to a task's thread. Use for durable notes "
"that should outlive this run (questions for the next worker, "
"partial findings, rationale). Ephemeral reasoning doesn't "
"belong here — use your normal response instead."
),
"parameters": {
"type": "object",
"properties": {
"task_id": {
"type": "string",
"description": (
"Task id. Required (may be your own task or "
"another's — comment threads are per-task)."
),
},
"body": {
"type": "string",
"description": "Markdown-supported comment body.",
},
"author": {
"type": "string",
"description": (
"Override author name. Defaults to the current "
"profile (HERMES_PROFILE env)."
),
},
},
"required": ["task_id", "body"],
},
}
KANBAN_CREATE_SCHEMA = {
"name": "kanban_create",
"description": (
"Create a new kanban task, optionally as a child of the current "
"one (pass the current task id in ``parents``). Used by "
"orchestrator workers to fan out — decompose work into child "
"tasks with specific assignees, link them into a pipeline, "
"then complete your own task. The dispatcher picks up the new "
"tasks on its next tick and spawns the assigned profiles."
),
"parameters": {
"type": "object",
"properties": {
"title": {
"type": "string",
"description": "Short task title (required).",
},
"assignee": {
"type": "string",
"description": (
"Profile name that should execute this task "
"(e.g. 'researcher-a', 'reviewer', 'writer'). "
"Required — tasks without an assignee are never "
"dispatched."
),
},
"body": {
"type": "string",
"description": (
"Opening post: full spec, acceptance criteria, "
"links. The assigned worker reads this as part of "
"its context."
),
},
"parents": {
"type": "array",
"items": {"type": "string"},
"description": (
"Parent task ids. The new task stays in 'todo' "
"until every parent reaches 'done'; then it "
"auto-promotes to 'ready'. Typical fan-in: list "
"all the researcher task ids when creating a "
"synthesizer task."
),
},
"tenant": {
"type": "string",
"description": (
"Optional namespace for multi-project isolation. "
"Defaults to HERMES_TENANT env if set."
),
},
"priority": {
"type": "integer",
"description": (
"Dispatcher tiebreaker. Higher = picked sooner "
"when multiple ready tasks share an assignee."
),
},
"workspace_kind": {
"type": "string",
"enum": ["scratch", "dir", "worktree"],
"description": (
"Workspace flavor: 'scratch' (fresh tmp dir, "
"default), 'dir' (shared directory, requires "
"absolute workspace_path), 'worktree' (git worktree)."
),
},
"workspace_path": {
"type": "string",
"description": (
"Absolute path for 'dir' or 'worktree' workspace. "
"Relative paths are rejected at dispatch."
),
},
"triage": {
"type": "boolean",
"description": (
"If true, task lands in 'triage' instead of 'todo' "
"— a specifier profile is expected to flesh out "
"the body before work starts."
),
},
"idempotency_key": {
"type": "string",
"description": (
"If a non-archived task with this key already "
"exists, return that task's id instead of creating "
"a duplicate. Useful for retry-safe automation."
),
},
"max_runtime_seconds": {
"type": "integer",
"description": (
"Per-task runtime cap. When exceeded, the "
"dispatcher SIGTERMs the worker and re-queues the "
"task with outcome='timed_out'."
),
},
},
"required": ["title", "assignee"],
},
}
KANBAN_LINK_SCHEMA = {
"name": "kanban_link",
"description": (
"Add a parent→child dependency edge after both tasks already "
"exist. The child won't promote to 'ready' until all parents "
"are 'done'. Cycles and self-links are rejected."
),
"parameters": {
"type": "object",
"properties": {
"parent_id": {"type": "string", "description": "Parent task id."},
"child_id": {"type": "string", "description": "Child task id."},
},
"required": ["parent_id", "child_id"],
},
}
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
registry.register(
name="kanban_show",
toolset="kanban",
schema=KANBAN_SHOW_SCHEMA,
handler=_handle_show,
check_fn=_check_kanban_mode,
emoji="📋",
)
registry.register(
name="kanban_complete",
toolset="kanban",
schema=KANBAN_COMPLETE_SCHEMA,
handler=_handle_complete,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_block",
toolset="kanban",
schema=KANBAN_BLOCK_SCHEMA,
handler=_handle_block,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_heartbeat",
toolset="kanban",
schema=KANBAN_HEARTBEAT_SCHEMA,
handler=_handle_heartbeat,
check_fn=_check_kanban_mode,
emoji="💓",
)
registry.register(
name="kanban_comment",
toolset="kanban",
schema=KANBAN_COMMENT_SCHEMA,
handler=_handle_comment,
check_fn=_check_kanban_mode,
emoji="💬",
)
registry.register(
name="kanban_create",
toolset="kanban",
schema=KANBAN_CREATE_SCHEMA,
handler=_handle_create,
check_fn=_check_kanban_mode,
emoji="",
)
registry.register(
name="kanban_link",
toolset="kanban",
schema=KANBAN_LINK_SCHEMA,
handler=_handle_link,
check_fn=_check_kanban_mode,
emoji="🔗",
)