Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
58362361dd fix(delegation): stream subagent progress per-tool so live windows update mid-run
Subagent tool activity relayed two kinds of events: subagent.tool fired
live per tool call, but the subagent.progress running summary was buffered
5-deep (_BATCH_SIZE=5) and only flushed once 5 tools accumulated, with any
remainder flushed at end-of-run via _flush().

Most subagents run fewer than 5 tools (e.g. a short research or single-edit
task), so the progress summary never reached the threshold mid-run and only
appeared when the child finished — the live subagent window stayed silent
until the very end ("subagent output just appears all at once").

Lower _BATCH_SIZE to 1 so each tool's progress summary streams in step with
the per-tool subagent.tool events. _flush() stays as a harmless end-of-run
safety net (now a no-op in the common case).

Converts the three batch-of-5 change-detector tests into invariant tests
that assert live per-tool streaming and per-child summary isolation.
2026-06-16 09:35:13 -04:00
2 changed files with 56 additions and 32 deletions

View File

@@ -118,9 +118,17 @@ class TestBuildChildProgressCallback:
assert "💭" in output
assert "search for papers" in output
def test_gateway_batched_progress(self):
"""Gateway path: each tool.started relays a subagent.tool event, and a
subagent.progress summary fires once BATCH_SIZE tools accumulate."""
def test_gateway_progress_streams_per_tool(self):
"""Gateway path: each tool.started relays a subagent.tool event AND a
live subagent.progress summary, so a subagent window streams in step
with the child's work instead of only flushing at end-of-run.
Invariant under test: every tool start produces a paired
(subagent.tool, subagent.progress) relay — no batching delay. This is
the regression guard for "subagent output just appears all at once",
which happened when the progress summary was buffered 5-deep and short
subagents (0-4 tools) never reached the flush threshold mid-run.
"""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
@@ -128,21 +136,19 @@ class TestBuildChildProgressCallback:
cb = _build_child_progress_callback(0, "test goal", parent)
# Each tool.started relays a subagent.tool event immediately (per-tool relay).
for i in range(4):
# Each tool.started relays subagent.tool immediately, then a live
# subagent.progress summary right behind it (no accumulation delay).
for i in range(3):
cb("tool.started", f"tool_{i}", f"arg_{i}", {})
# 4 per-tool relays so far, no batch summary yet (BATCH_SIZE=5)
events = [c.args[0] for c in parent_cb.call_args_list]
assert events == ["subagent.tool"] * 4
# 5th call triggers another per-tool relay PLUS the batch-size summary
cb("tool.started", "tool_4", "arg_4", {})
events = [c.args[0] for c in parent_cb.call_args_list]
assert events == ["subagent.tool"] * 5 + ["subagent.progress"]
# Live streaming: tool/progress pairs in order, one pair per tool.
assert events == ["subagent.tool", "subagent.progress"] * 3
# The most recent progress summary reflects the tool that just ran.
summary_call = parent_cb.call_args_list[-1]
summary_text = summary_call.kwargs.get("preview") or summary_call.args[2]
assert "tool_0" in summary_text
assert "tool_4" in summary_text
assert "tool_2" in summary_text
def test_thinking_relayed_to_gateway(self):
"""Thinking events are relayed as subagent.thinking events."""
@@ -159,7 +165,8 @@ class TestBuildChildProgressCallback:
assert parent_cb.call_args.args[2] == "some reasoning text"
def test_parallel_callbacks_independent(self):
"""Each child's callback batches tool names independently."""
"""Each child's callback streams its own tool/progress relays
independently — no cross-child interleaving in the per-child batch."""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
@@ -168,15 +175,23 @@ class TestBuildChildProgressCallback:
cb0 = _build_child_progress_callback(0, "goal a", parent)
cb1 = _build_child_progress_callback(1, "goal b", parent)
# 3 tool.started per child = 6 per-tool relays; neither should hit
# the batch-size summary (batch size = 5, counted per-child).
# 3 tool.started per child. Each tool relays a subagent.tool plus a
# live subagent.progress summary (batch size = 1, counted per-child).
for i in range(3):
cb0("tool.started", f"tool_{i}", f"a_{i}", {})
cb1("tool.started", f"other_{i}", f"b_{i}", {})
events = [c.args[0] for c in parent_cb.call_args_list]
assert events.count("subagent.tool") == 6
assert "subagent.progress" not in events
# Each child's progress summary stays scoped to its own tools — child 0
# never names child 1's tools and vice versa.
progress_calls = [
(c.kwargs.get("preview") or c.args[2])
for c in parent_cb.call_args_list
if c.args[0] == "subagent.progress"
]
assert progress_calls, "expected live progress summaries to stream"
assert all("other_" not in p for p in progress_calls if "tool_" in p)
def test_task_index_prefix_in_batch_mode(self):
"""Batch mode (task_count > 1) should show 1-indexed prefix for all tasks."""
@@ -328,9 +343,14 @@ class TestThinkingCallback:
class TestBatchFlush:
"""Tests for gateway batch flush on subagent completion."""
def test_flush_sends_remaining_batch(self):
"""_flush should send a final subagent.progress summary of any unsent
tool names in the batch (less than BATCH_SIZE)."""
def test_progress_flushes_live_per_tool(self):
"""With per-tool streaming, each tool.started immediately emits its own
subagent.progress summary — nothing is left buffered for end-of-run.
This is the inverse of the old buffer-until-flush behavior: a live
subagent window must see progress as each tool runs, not in one burst
when the child finishes.
"""
parent = MagicMock()
parent._delegate_spinner = None
parent_cb = MagicMock()
@@ -338,22 +358,18 @@ class TestBatchFlush:
cb = _build_child_progress_callback(0, "test goal", parent)
# Send 3 tools (below batch size of 5) — each relays subagent.tool
# Each tool relays a paired subagent.tool + subagent.progress, live.
cb("tool.started", "web_search", "query1", {})
cb("tool.started", "read_file", "file.txt", {})
cb("tool.started", "write_file", "out.txt", {})
events = [c.args[0] for c in parent_cb.call_args_list]
assert events == ["subagent.tool"] * 3 # per-tool relays so far
assert "subagent.progress" not in events # no batch-size summary yet
assert events == ["subagent.tool", "subagent.progress"] * 3
# Flush should send the remaining 3 as a summary
# Nothing left to flush — the batch drained on each tool, so _flush is
# a no-op (no trailing end-of-run summary).
before = len(parent_cb.call_args_list)
cb._flush()
events = [c.args[0] for c in parent_cb.call_args_list]
assert events[-1] == "subagent.progress"
summary_call = parent_cb.call_args_list[-1]
summary_text = summary_call.kwargs.get("preview") or summary_call.args[2]
assert "web_search" in summary_text
assert "write_file" in summary_text
assert len(parent_cb.call_args_list) == before
def test_flush_noop_when_batch_empty(self):
"""_flush should not send anything when batch is empty."""

View File

@@ -773,8 +773,16 @@ def _build_child_progress_callback(
prefix = f"[{task_index + 1}] " if task_count > 1 else ""
goal_label = (goal or "").strip()
# Gateway: batch tool names, flush periodically
_BATCH_SIZE = 5
# Gateway: relay each tool's progress summary as it happens. Historically
# this batched 5 tool names before flushing a single subagent.progress
# line, with any remainder flushed only at completion via _flush(). That
# made short subagents (the common case — 0-4 tools) emit their progress
# rollup ONLY at end-of-run, so a live subagent window showed nothing until
# the child finished ("subagent output just appears all at once"). Flushing
# per tool keeps the window streaming in step with the child's work. The
# per-tool subagent.tool events already fire live; this aligns the progress
# summary with them instead of lagging behind a 5-deep buffer.
_BATCH_SIZE = 1
_batch: List[str] = []
_tool_count = [0] # per-subagent running counter (list for closure mutation)