Files
hermes-agent/tests/tools/test_kanban_tools.py
Teknium ff3d2773e2 feat(kanban): auto-subscribe gateway chat on tool-driven kanban_create (#19718)
Closes #19479.

When an orchestrator agent calls kanban_create from a gateway session
(e.g. a Telegram user delegating to an orchestrator profile), auto-
subscribe the originating (platform, chat, thread, user) to the new
task's terminal events. Mirrors the behavior of the /kanban create
slash command in gateway/run.py so tool-driven creation is at parity
with human-driven creation.

Without this, a user who interacts with an orchestrator exclusively
via the gateway never receives blocked / completed / gave_up
notifications for tasks the orchestrator created on their behalf —
silently breaking the gateway-first multi-agent flow the reporter
describes.

Reads the context-local HERMES_SESSION_* vars via get_session_env()
(not os.environ — those are contextvars for asyncio concurrency
safety). Falls through cleanly in CLI / cron contexts with no
session active (subscribed=False in the response). Best-effort: if
the gateway module isn't importable (test rigs stubbing gateway.*),
the task still creates, we just skip the subscription.

Response gains a 'subscribed' bool so the orchestrator knows whether
terminal events will land back in the originating chat or whether it
needs to poll / unblock manually.

Tests: 4 new in tests/tools/test_kanban_tools.py covering
CLI/no-subscribe, telegram/gateway-auto-subscribe, discord-DM/no-
thread subscribe, and partial-ctx/no-chat_id no-subscribe. 40/40
kanban tool tests pass.
2026-05-04 05:02:23 -07:00

713 lines
24 KiB
Python

"""Tests for the Kanban tool surface (tools/kanban_tools.py).
Verifies:
- Tools are gated on HERMES_KANBAN_TASK: a normal chat session sees
zero kanban tools in its schema; a worker session sees all seven.
- Each handler's happy path.
- Error paths (missing required args, bad metadata type, etc).
"""
from __future__ import annotations
import json
import os
import pytest
# ---------------------------------------------------------------------------
# Gating
# ---------------------------------------------------------------------------
def test_kanban_tools_hidden_without_env_var(monkeypatch, tmp_path):
"""Normal `hermes chat` sessions (no HERMES_KANBAN_TASK) must have
zero kanban_* tools in their schema."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.kanban_tools # ensure registered
from tools.registry import registry
from toolsets import resolve_toolset
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
kanban = {n for n in names if n and n.startswith("kanban_")}
assert kanban == set(), (
f"kanban tools leaked into normal chat schema: {kanban}"
)
def test_kanban_tools_visible_with_env_var(monkeypatch, tmp_path):
"""Worker sessions (HERMES_KANBAN_TASK set) must have all 7 tools."""
monkeypatch.setenv("HERMES_KANBAN_TASK", "t_fake")
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.kanban_tools # ensure registered
from tools.registry import registry
from toolsets import resolve_toolset
schema = registry.get_definitions(set(resolve_toolset("hermes-cli")), quiet=True)
names = {s["function"].get("name") for s in schema if "function" in s}
kanban = {n for n in names if n and n.startswith("kanban_")}
expected = {
"kanban_show", "kanban_complete", "kanban_block", "kanban_heartbeat",
"kanban_comment", "kanban_create", "kanban_link",
}
assert kanban == expected, f"expected {expected}, got {kanban}"
# ---------------------------------------------------------------------------
# Handler happy paths
# ---------------------------------------------------------------------------
@pytest.fixture
def worker_env(monkeypatch, tmp_path):
"""Simulate being a worker: HERMES_HOME isolated, HERMES_KANBAN_TASK set
after we've created the task."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setenv("HERMES_PROFILE", "test-worker")
from pathlib import Path as _Path
monkeypatch.setattr(_Path, "home", lambda: tmp_path)
from hermes_cli import kanban_db as kb
kb._INITIALIZED_PATHS.clear()
kb.init_db()
conn = kb.connect()
try:
tid = kb.create_task(conn, title="worker-test", assignee="test-worker")
kb.claim_task(conn, tid)
finally:
conn.close()
monkeypatch.setenv("HERMES_KANBAN_TASK", tid)
return tid
def test_show_defaults_to_env_task_id(worker_env):
from tools import kanban_tools as kt
out = kt._handle_show({})
d = json.loads(out)
assert "task" in d
assert d["task"]["id"] == worker_env
assert d["task"]["status"] == "running"
assert "worker_context" in d
assert "runs" in d
def test_show_explicit_task_id(worker_env):
"""Peek at a different task than the one in env."""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
other = kb.create_task(conn, title="other task", assignee="peer")
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_show({"task_id": other})
d = json.loads(out)
assert d["task"]["id"] == other
def test_complete_happy_path(worker_env):
from tools import kanban_tools as kt
out = kt._handle_complete({
"summary": "got the thing done",
"metadata": {"files": 2},
})
d = json.loads(out)
assert d["ok"] is True
assert d["task_id"] == worker_env
# Verify via kernel
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
run = kb.latest_run(conn, worker_env)
assert run.outcome == "completed"
assert run.summary == "got the thing done"
assert run.metadata == {"files": 2}
finally:
conn.close()
def test_complete_with_result_only(worker_env):
"""`result` alone (without summary) is accepted for legacy compat."""
from tools import kanban_tools as kt
out = kt._handle_complete({"result": "legacy result"})
d = json.loads(out)
assert d["ok"] is True
def test_complete_rejects_no_handoff(worker_env):
from tools import kanban_tools as kt
out = kt._handle_complete({})
assert json.loads(out).get("error"), "should have errored"
def test_complete_rejects_non_dict_metadata(worker_env):
from tools import kanban_tools as kt
out = kt._handle_complete({"summary": "x", "metadata": [1, 2, 3]})
assert json.loads(out).get("error")
def test_block_happy_path(worker_env):
from tools import kanban_tools as kt
out = kt._handle_block({"reason": "need clarification"})
d = json.loads(out)
assert d["ok"] is True
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
assert kb.get_task(conn, worker_env).status == "blocked"
finally:
conn.close()
def test_block_rejects_empty_reason(worker_env):
from tools import kanban_tools as kt
for bad in ["", " ", None]:
out = kt._handle_block({"reason": bad})
assert json.loads(out).get("error")
def test_heartbeat_happy_path(worker_env):
from tools import kanban_tools as kt
out = kt._handle_heartbeat({"note": "progress"})
d = json.loads(out)
assert d["ok"] is True
def test_heartbeat_without_note(worker_env):
"""note is optional."""
from tools import kanban_tools as kt
out = kt._handle_heartbeat({})
d = json.loads(out)
assert d["ok"] is True
def test_comment_happy_path(worker_env):
from tools import kanban_tools as kt
out = kt._handle_comment({
"task_id": worker_env,
"body": "hello thread",
})
d = json.loads(out)
assert d["ok"] is True
assert d["comment_id"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
assert len(comments) == 1
# Author defaults to HERMES_PROFILE env we set in the fixture
assert comments[0].author == "test-worker"
assert comments[0].body == "hello thread"
finally:
conn.close()
def test_comment_rejects_empty_body(worker_env):
from tools import kanban_tools as kt
out = kt._handle_comment({"task_id": worker_env, "body": " "})
assert json.loads(out).get("error")
def test_comment_custom_author(worker_env):
from tools import kanban_tools as kt
out = kt._handle_comment({
"task_id": worker_env, "body": "hi", "author": "custom-bot",
})
assert json.loads(out)["ok"]
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
comments = kb.list_comments(conn, worker_env)
assert comments[0].author == "custom-bot"
finally:
conn.close()
def test_create_happy_path(worker_env):
from tools import kanban_tools as kt
out = kt._handle_create({
"title": "child task",
"assignee": "peer",
"parents": [worker_env],
})
d = json.loads(out)
assert d["ok"] is True
assert d["task_id"]
assert d["status"] == "todo" # parent isn't done yet
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
child = kb.get_task(conn, d["task_id"])
assert child.title == "child task"
assert child.assignee == "peer"
finally:
conn.close()
def test_create_rejects_no_title(worker_env):
from tools import kanban_tools as kt
assert json.loads(kt._handle_create({"assignee": "x"})).get("error")
assert json.loads(kt._handle_create({"title": " ", "assignee": "x"})).get("error")
def test_create_rejects_no_assignee(worker_env):
from tools import kanban_tools as kt
assert json.loads(kt._handle_create({"title": "t"})).get("error")
def test_create_rejects_non_list_parents(worker_env):
from tools import kanban_tools as kt
out = kt._handle_create({"title": "t", "assignee": "a", "parents": 42})
assert json.loads(out).get("error")
def test_create_accepts_string_parent(worker_env):
"""Convenience: a single parent id as string is coerced to [id]."""
from tools import kanban_tools as kt
out = kt._handle_create({
"title": "t", "assignee": "a", "parents": worker_env,
})
assert json.loads(out)["ok"]
def test_create_accepts_skills_list(worker_env):
"""Tool writes the per-task skills through to the kernel."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({
"title": "skilled",
"assignee": "linguist",
"skills": ["translation", "github-code-review"],
})
d = json.loads(out)
assert d["ok"] is True
with kb.connect() as conn:
task = kb.get_task(conn, d["task_id"])
assert task.skills == ["translation", "github-code-review"]
def test_create_accepts_skills_string(worker_env):
"""Convenience: a single skill name as string is coerced to [name]."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({
"title": "one-skill",
"assignee": "a",
"skills": "translation",
})
d = json.loads(out)
assert d["ok"] is True
with kb.connect() as conn:
task = kb.get_task(conn, d["task_id"])
assert task.skills == ["translation"]
def test_create_rejects_non_list_skills(worker_env):
"""skills: 42 must be rejected, not silently dropped."""
from tools import kanban_tools as kt
out = kt._handle_create({
"title": "t", "assignee": "a", "skills": 42,
})
assert json.loads(out).get("error")
def test_link_happy_path(worker_env):
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
a = kb.create_task(conn, title="A", assignee="x")
b = kb.create_task(conn, title="B", assignee="x")
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_link({"parent_id": a, "child_id": b})
d = json.loads(out)
assert d["ok"] is True
def test_link_rejects_self_reference(worker_env):
from tools import kanban_tools as kt
out = kt._handle_link({"parent_id": worker_env, "child_id": worker_env})
assert json.loads(out).get("error")
def test_link_rejects_missing_args(worker_env):
from tools import kanban_tools as kt
assert json.loads(kt._handle_link({"parent_id": "x"})).get("error")
assert json.loads(kt._handle_link({"child_id": "y"})).get("error")
def test_link_rejects_cycle(worker_env):
"""A → B, then try to link B → A."""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
a = kb.create_task(conn, title="A", assignee="x")
b = kb.create_task(conn, title="B", assignee="x", parents=[a])
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_link({"parent_id": b, "child_id": a})
assert json.loads(out).get("error")
# ---------------------------------------------------------------------------
# End-to-end: simulate a full worker lifecycle through the tools
# ---------------------------------------------------------------------------
def test_worker_lifecycle_through_tools(worker_env):
"""Drive the full claim -> heartbeat -> comment -> complete lifecycle
exclusively through the tools, then verify the DB state matches what
the dispatcher/notifier expect."""
from tools import kanban_tools as kt
# 1. show — worker orientation
show = json.loads(kt._handle_show({}))
assert show["task"]["id"] == worker_env
# 2. heartbeat during long op
assert json.loads(kt._handle_heartbeat({"note": "warming up"}))["ok"]
# 3. comment for a future peer
assert json.loads(kt._handle_comment({
"task_id": worker_env,
"body": "note: using stdlib sqlite3 bindings",
}))["ok"]
# 4. spawn a child task for follow-up
child_out = json.loads(kt._handle_create({
"title": "write integration test",
"assignee": "qa",
"parents": [worker_env],
}))
assert child_out["ok"]
# 5. complete with structured handoff
comp = json.loads(kt._handle_complete({
"summary": "implemented + spawned QA follow-up",
"metadata": {"child_task": child_out["task_id"]},
}))
assert comp["ok"]
# Verify final state
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
parent = kb.get_task(conn, worker_env)
assert parent.status == "done"
assert parent.current_run_id is None
run = kb.latest_run(conn, worker_env)
assert run.outcome == "completed"
assert run.metadata == {"child_task": child_out["task_id"]}
# Child is todo (parent just finished, but recompute_ready may
# have promoted it — complete_task runs recompute internally).
child = kb.get_task(conn, child_out["task_id"])
assert child.status == "ready", (
f"child should be ready after parent done, got {child.status}"
)
# Comment is visible
assert len(kb.list_comments(conn, worker_env)) == 1
# Heartbeat event recorded
hb = [e for e in kb.list_events(conn, worker_env) if e.kind == "heartbeat"]
assert len(hb) == 1
finally:
conn.close()
# ---------------------------------------------------------------------------
# System-prompt guidance injection
# ---------------------------------------------------------------------------
def test_kanban_guidance_not_in_normal_prompt(monkeypatch, tmp_path):
"""A normal chat session (no HERMES_KANBAN_TASK) must NOT have
KANBAN_GUIDANCE in its system prompt."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
from pathlib import Path as _P
monkeypatch.setattr(_P, "home", lambda: tmp_path)
from run_agent import AIAgent
a = AIAgent(
api_key="test",
base_url="https://openrouter.ai/api/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
prompt = a._build_system_prompt()
assert "You are a Kanban worker" not in prompt
assert "kanban_show()" not in prompt
def test_kanban_guidance_in_worker_prompt(monkeypatch, tmp_path):
"""A worker session (HERMES_KANBAN_TASK set) MUST have the full
lifecycle guidance in its system prompt."""
monkeypatch.setenv("HERMES_KANBAN_TASK", "t_fake")
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
from pathlib import Path as _P
monkeypatch.setattr(_P, "home", lambda: tmp_path)
from run_agent import AIAgent
a = AIAgent(
api_key="test",
base_url="https://openrouter.ai/api/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
prompt = a._build_system_prompt()
# Header phrase (identity-free — SOUL.md owns identity, layer 3 is protocol)
assert "Kanban task execution protocol" in prompt
# Lifecycle signals
assert "kanban_show()" in prompt
assert "kanban_complete" in prompt
assert "kanban_block" in prompt
assert "kanban_create" in prompt
# Anti-shell guidance
assert "Do not shell out" in prompt or "tools — they work" in prompt
def test_kanban_guidance_prompt_size_bounded(monkeypatch, tmp_path):
"""Sanity: the guidance block is under 4 KB so it doesn't blow
up the cached prompt."""
monkeypatch.setenv("HERMES_KANBAN_TASK", "t_fake")
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
from pathlib import Path as _P
monkeypatch.setattr(_P, "home", lambda: tmp_path)
from agent.prompt_builder import KANBAN_GUIDANCE
assert 1_500 < len(KANBAN_GUIDANCE) < 4_096, (
f"KANBAN_GUIDANCE is {len(KANBAN_GUIDANCE)} chars — too short (missing?) or too long"
)
# ---------------------------------------------------------------------------
# Worker task-ownership enforcement (regression tests for #19534)
# ---------------------------------------------------------------------------
#
# A worker process has HERMES_KANBAN_TASK set to its own task id. The
# destructive tools (kanban_complete, kanban_block, kanban_heartbeat)
# must refuse to operate on any OTHER task id, even if the caller
# supplies an explicit `task_id` argument. Workers legitimately call
# kanban_show / kanban_comment / kanban_create / kanban_link on other
# tasks, so those are unrestricted.
#
# Orchestrator profiles (no HERMES_KANBAN_TASK in env) are intentionally
# exempt — their job is routing, and they sometimes close out child
# tasks on behalf of the child.
def test_worker_complete_rejects_foreign_task_id(worker_env):
"""A worker cannot complete a task that isn't its own (#19534)."""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
other = kb.create_task(conn, title="sibling")
conn.execute("UPDATE tasks SET status='ready' WHERE id=?", (other,))
conn.commit()
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_complete({"task_id": other, "summary": "HIJACK"})
d = json.loads(out)
assert d.get("ok") is not True
assert "refusing to mutate" in d.get("error", "")
# Sibling task must be untouched.
conn = kb.connect()
try:
assert kb.get_task(conn, other).status == "ready"
finally:
conn.close()
def test_worker_block_rejects_foreign_task_id(worker_env):
"""A worker cannot block a task that isn't its own (#19534)."""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
other = kb.create_task(conn, title="sibling")
conn.execute("UPDATE tasks SET status='ready' WHERE id=?", (other,))
conn.commit()
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_block({"task_id": other, "reason": "evil"})
d = json.loads(out)
assert "refusing to mutate" in d.get("error", "")
conn = kb.connect()
try:
assert kb.get_task(conn, other).status == "ready"
finally:
conn.close()
def test_worker_heartbeat_rejects_foreign_task_id(worker_env):
"""A worker cannot heartbeat a task that isn't its own (#19534)."""
from hermes_cli import kanban_db as kb
conn = kb.connect()
try:
other = kb.create_task(conn, title="sibling")
# Put sibling in running state so heartbeat would otherwise succeed.
conn.execute("UPDATE tasks SET status='running' WHERE id=?", (other,))
conn.commit()
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_heartbeat({"task_id": other})
d = json.loads(out)
assert "refusing to mutate" in d.get("error", "")
def test_worker_complete_own_task_still_works(worker_env):
"""The ownership check doesn't break the normal own-task happy path."""
from tools import kanban_tools as kt
# Both implicit (no task_id arg) and explicit (matching env) must work.
out = kt._handle_complete({"task_id": worker_env, "summary": "explicit own"})
d = json.loads(out)
assert d.get("ok") is True and d.get("task_id") == worker_env
def test_orchestrator_complete_any_task_allowed(monkeypatch, tmp_path):
"""Orchestrator profiles (no HERMES_KANBAN_TASK) can still complete
any task via explicit task_id. The check only applies to workers."""
monkeypatch.delenv("HERMES_KANBAN_TASK", raising=False)
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
from pathlib import Path as _P
monkeypatch.setattr(_P, "home", lambda: tmp_path)
from hermes_cli import kanban_db as kb
kb._INITIALIZED_PATHS.clear()
kb.init_db()
conn = kb.connect()
try:
tid = kb.create_task(conn, title="child to close out")
conn.execute("UPDATE tasks SET status='ready' WHERE id=?", (tid,))
conn.commit()
finally:
conn.close()
from tools import kanban_tools as kt
out = kt._handle_complete({"task_id": tid, "summary": "orchestrator close"})
d = json.loads(out)
assert d.get("ok") is True and d.get("task_id") == tid
# ---------------------------------------------------------------------------
# kanban_create auto-subscribe to gateway notifications (#19479)
# ---------------------------------------------------------------------------
#
# When an orchestrator agent (running under the gateway) calls kanban_create,
# the originating (platform, chat, thread) should be auto-subscribed to the
# new task's terminal events — matching the /kanban create slash-command
# behavior in gateway/run.py. In CLI / cron contexts (no session vars set),
# no subscription row is written.
def test_create_no_subscribe_in_cli_context(worker_env):
"""Classic CLI: no gateway session vars -> no notify subscription."""
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
out = kt._handle_create({"title": "cli task", "assignee": "peer"})
d = json.loads(out)
assert d.get("ok") is True
assert d.get("subscribed") is False
conn = kb.connect()
try:
assert kb.list_notify_subs(conn, d["task_id"]) == []
finally:
conn.close()
def test_create_auto_subscribes_in_gateway_context(worker_env):
"""Gateway session vars set -> auto-subscribe the originating source."""
from gateway.session_context import set_session_vars, clear_session_vars
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
tokens = set_session_vars(
platform="telegram",
chat_id="1234567",
thread_id="42",
user_id="u_alice",
)
try:
out = kt._handle_create({"title": "gateway task", "assignee": "peer"})
d = json.loads(out)
assert d.get("ok") is True
assert d.get("subscribed") is True
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, d["task_id"])
finally:
conn.close()
assert len(subs) == 1
assert subs[0]["platform"] == "telegram"
assert subs[0]["chat_id"] == "1234567"
assert subs[0]["thread_id"] == "42"
assert subs[0]["user_id"] == "u_alice"
finally:
clear_session_vars(tokens)
def test_create_subscribe_without_thread_id(worker_env):
"""DM / no-thread platforms subscribe without a thread_id."""
from gateway.session_context import set_session_vars, clear_session_vars
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
tokens = set_session_vars(platform="discord", chat_id="ch_dm_789")
try:
out = kt._handle_create({"title": "dm task", "assignee": "peer"})
d = json.loads(out)
assert d.get("subscribed") is True
conn = kb.connect()
try:
subs = kb.list_notify_subs(conn, d["task_id"])
finally:
conn.close()
assert len(subs) == 1
assert subs[0]["thread_id"] == ""
assert subs[0]["user_id"] is None
finally:
clear_session_vars(tokens)
def test_create_no_subscribe_when_chat_id_missing(worker_env):
"""Partial gateway context (platform but no chat_id) -> no subscription."""
from gateway.session_context import set_session_vars, clear_session_vars
from tools import kanban_tools as kt
from hermes_cli import kanban_db as kb
tokens = set_session_vars(platform="telegram", chat_id="")
try:
out = kt._handle_create({"title": "partial ctx", "assignee": "peer"})
d = json.loads(out)
assert d.get("subscribed") is False
conn = kb.connect()
try:
assert kb.list_notify_subs(conn, d["task_id"]) == []
finally:
conn.close()
finally:
clear_session_vars(tokens)