fix(delegate): make max_concurrent_children configurable + error on excess

`delegate_task` silently truncated batch tasks to 3 — the model sends
5 tasks, gets results for 3, never told 2 were dropped. Now returns a
clear tool_error explaining the limit and how to fix it.

The limit is configurable via:
  - delegation.max_concurrent_children in config.yaml (priority 1)
  - DELEGATION_MAX_CONCURRENT_CHILDREN env var (priority 2)
  - default: 3

Uses the same _load_config() path as the rest of delegate_task for
consistent config priority. Clamps to min 1, warns on non-integer
config values.

Also removes the hardcoded maxItems: 3 from the JSON schema — the
schema was blocking the model from even attempting >3 tasks before
the runtime check could fire. The runtime check gives a much more
actionable error message.

Backwards compatible: default remains 3, existing configs unchanged.
This commit is contained in:
angelos
2026-04-10 01:34:39 +00:00
committed by Teknium
parent 6c115440fd
commit 7ccdb74364
4 changed files with 60 additions and 17 deletions

View File

@@ -3021,7 +3021,7 @@ class AIAgent:
@staticmethod
def _cap_delegate_task_calls(tool_calls: list) -> list:
"""Truncate excess delegate_task calls to MAX_CONCURRENT_CHILDREN.
"""Truncate excess delegate_task calls to max_concurrent_children.
The delegate_tool caps the task list inside a single call, but the
model can emit multiple separate delegate_task tool_calls in one
@@ -3029,23 +3029,24 @@ class AIAgent:
Returns the original list if no truncation was needed.
"""
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
max_children = _get_max_concurrent_children()
delegate_count = sum(1 for tc in tool_calls if tc.function.name == "delegate_task")
if delegate_count <= MAX_CONCURRENT_CHILDREN:
if delegate_count <= max_children:
return tool_calls
kept_delegates = 0
truncated = []
for tc in tool_calls:
if tc.function.name == "delegate_task":
if kept_delegates < MAX_CONCURRENT_CHILDREN:
if kept_delegates < max_children:
truncated.append(tc)
kept_delegates += 1
else:
truncated.append(tc)
logger.warning(
"Truncated %d excess delegate_task call(s) to enforce "
"MAX_CONCURRENT_CHILDREN=%d limit",
delegate_count - MAX_CONCURRENT_CHILDREN, MAX_CONCURRENT_CHILDREN,
"max_concurrent_children=%d limit",
delegate_count - max_children, max_children,
)
return truncated

View File

@@ -9,7 +9,9 @@ Covers three static methods on AIAgent (inspired by PR #1321 — @alireza78a):
import types
from run_agent import AIAgent
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
MAX_CONCURRENT_CHILDREN = _get_max_concurrent_children()
# ---------------------------------------------------------------------------

View File

@@ -20,7 +20,7 @@ from unittest.mock import MagicMock, patch
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
MAX_CONCURRENT_CHILDREN,
_get_max_concurrent_children,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
@@ -168,10 +168,13 @@ class TestDelegateTask(unittest.TestCase):
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
tasks = [{"goal": f"Task {i}"} for i in range(5)]
limit = _get_max_concurrent_children()
tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should only run 3 tasks (MAX_CONCURRENT_CHILDREN)
self.assertEqual(mock_run.call_count, 3)
# Should return an error instead of silently truncating
self.assertIn("error", result)
self.assertIn("Too many tasks", result["error"])
mock_run.assert_not_called()
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
@@ -562,7 +565,7 @@ class TestBlockedTools(unittest.TestCase):
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
self.assertEqual(MAX_CONCURRENT_CHILDREN, 3)
self.assertEqual(_get_max_concurrent_children(), 3)
self.assertEqual(MAX_DEPTH, 2)

View File

@@ -35,8 +35,34 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
"execute_code", # children should reason step-by-step, not write scripts
])
MAX_CONCURRENT_CHILDREN = 3
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
def _get_max_concurrent_children() -> int:
"""Read delegation.max_concurrent_children from config, falling back to
DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
Uses the same ``_load_config()`` path that the rest of ``delegate_task``
uses, keeping config priority consistent (config.yaml > env > default).
"""
cfg = _load_config()
val = cfg.get("max_concurrent_children")
if val is not None:
try:
return max(1, int(val))
except (TypeError, ValueError):
logger.warning(
"delegation.max_concurrent_children=%r is not a valid integer; "
"using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN,
)
env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
if env_val:
try:
return max(1, int(env_val))
except (TypeError, ValueError):
pass
return _DEFAULT_MAX_CONCURRENT_CHILDREN
DEFAULT_MAX_ITERATIONS = 50
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
@@ -600,8 +626,17 @@ def delegate_task(
return tool_error(str(exc))
# Normalize to task list
max_children = _get_max_concurrent_children()
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
if len(tasks) > max_children:
return tool_error(
f"Too many tasks: {len(tasks)} provided, but "
f"max_concurrent_children is {max_children}. "
f"Either reduce the task count, split into multiple "
f"delegate_task calls, or increase "
f"delegation.max_concurrent_children in config.yaml."
)
task_list = tasks
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
else:
@@ -661,7 +696,7 @@ def delegate_task(
completed_count = 0
spinner_ref = getattr(parent_agent, '_delegate_spinner', None)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
@@ -965,9 +1000,11 @@ DELEGATE_TASK_SCHEMA = {
},
"required": ["goal"],
},
"maxItems": 3,
# No maxItems — the runtime limit is configurable via
# delegation.max_concurrent_children (default 3) and
# enforced with a clear error in delegate_task().
"description": (
"Batch mode: up to 3 tasks to run in parallel. Each gets "
"Batch mode: tasks to run in parallel (limit configurable via delegation.max_concurrent_children, default 3). Each gets "
"its own subagent with isolated context and terminal session. "
"When provided, top-level goal/context/toolsets are ignored."
),