diff --git a/tests/tools/test_delegate.py b/tests/tools/test_delegate.py index ebdf60d296..623ee2534b 100644 --- a/tests/tools/test_delegate.py +++ b/tests/tools/test_delegate.py @@ -13,6 +13,7 @@ import json import os import sys import threading +import time import unittest from unittest.mock import MagicMock, patch @@ -1052,5 +1053,159 @@ class TestChildCredentialLeasing(unittest.TestCase): child._credential_pool.release_lease.assert_called_once_with("cred-a") +class TestDelegateHeartbeat(unittest.TestCase): + """Heartbeat propagates child activity to parent during delegation. + + Without the heartbeat, the gateway inactivity timeout fires because the + parent's _last_activity_ts freezes when delegate_task starts. + """ + + def test_heartbeat_touches_parent_activity_during_child_run(self): + """Parent's _touch_activity is called while child.run_conversation blocks.""" + from tools.delegate_tool import _run_single_child + + parent = _make_mock_parent() + touch_calls = [] + parent._touch_activity = lambda desc: touch_calls.append(desc) + + child = MagicMock() + child.get_activity_summary.return_value = { + "current_tool": "terminal", + "api_call_count": 3, + "max_iterations": 50, + "last_activity_desc": "executing tool: terminal", + } + + # Make run_conversation block long enough for heartbeats to fire + def slow_run(**kwargs): + time.sleep(0.25) + return {"final_response": "done", "completed": True, "api_calls": 3} + + child.run_conversation.side_effect = slow_run + + # Patch the heartbeat interval to fire quickly + with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): + _run_single_child( + task_index=0, + goal="Test heartbeat", + child=child, + parent_agent=parent, + ) + + # Heartbeat should have fired at least once during the 0.25s sleep + self.assertGreater(len(touch_calls), 0, + "Heartbeat did not propagate activity to parent") + # Verify the description includes child's current tool detail + self.assertTrue( + any("terminal" in desc for desc in touch_calls), + f"Heartbeat descriptions should include child tool info: {touch_calls}") + + def test_heartbeat_stops_after_child_completes(self): + """Heartbeat thread is cleaned up when the child finishes.""" + from tools.delegate_tool import _run_single_child + + parent = _make_mock_parent() + touch_calls = [] + parent._touch_activity = lambda desc: touch_calls.append(desc) + + child = MagicMock() + child.get_activity_summary.return_value = { + "current_tool": None, + "api_call_count": 1, + "max_iterations": 50, + "last_activity_desc": "done", + } + child.run_conversation.return_value = { + "final_response": "done", "completed": True, "api_calls": 1, + } + + with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): + _run_single_child( + task_index=0, + goal="Test cleanup", + child=child, + parent_agent=parent, + ) + + # Record count after completion, wait, and verify no more calls + count_after = len(touch_calls) + time.sleep(0.15) + self.assertEqual(len(touch_calls), count_after, + "Heartbeat continued firing after child completed") + + def test_heartbeat_stops_after_child_error(self): + """Heartbeat thread is cleaned up even when the child raises.""" + from tools.delegate_tool import _run_single_child + + parent = _make_mock_parent() + touch_calls = [] + parent._touch_activity = lambda desc: touch_calls.append(desc) + + child = MagicMock() + child.get_activity_summary.return_value = { + "current_tool": "web_search", + "api_call_count": 2, + "max_iterations": 50, + "last_activity_desc": "executing tool: web_search", + } + + def slow_fail(**kwargs): + time.sleep(0.15) + raise RuntimeError("network timeout") + + child.run_conversation.side_effect = slow_fail + + with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): + result = _run_single_child( + task_index=0, + goal="Test error cleanup", + child=child, + parent_agent=parent, + ) + + self.assertEqual(result["status"], "error") + + # Verify heartbeat stopped + count_after = len(touch_calls) + time.sleep(0.15) + self.assertEqual(len(touch_calls), count_after, + "Heartbeat continued firing after child error") + + def test_heartbeat_includes_child_activity_desc_when_no_tool(self): + """When child has no current_tool, heartbeat uses last_activity_desc.""" + from tools.delegate_tool import _run_single_child + + parent = _make_mock_parent() + touch_calls = [] + parent._touch_activity = lambda desc: touch_calls.append(desc) + + child = MagicMock() + child.get_activity_summary.return_value = { + "current_tool": None, + "api_call_count": 5, + "max_iterations": 90, + "last_activity_desc": "API call #5 completed", + } + + def slow_run(**kwargs): + time.sleep(0.15) + return {"final_response": "done", "completed": True, "api_calls": 5} + + child.run_conversation.side_effect = slow_run + + with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05): + _run_single_child( + task_index=0, + goal="Test desc fallback", + child=child, + parent_agent=parent, + ) + + self.assertGreater(len(touch_calls), 0) + self.assertTrue( + any("API call #5 completed" in desc for desc in touch_calls), + f"Heartbeat should include last_activity_desc: {touch_calls}") + + if __name__ == "__main__": unittest.main() diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index a148a31f05..4ab3d26658 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -20,6 +20,7 @@ import json import logging logger = logging.getLogger(__name__) import os +import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional @@ -37,6 +38,7 @@ DELEGATE_BLOCKED_TOOLS = frozenset([ MAX_CONCURRENT_CHILDREN = 3 MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2) DEFAULT_MAX_ITERATIONS = 50 +_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation DEFAULT_TOOLSETS = ["terminal", "file", "web"] @@ -369,6 +371,44 @@ def _run_single_child( except Exception as exc: logger.debug("Failed to bind child to leased credential: %s", exc) + # Heartbeat: periodically propagate child activity to the parent so the + # gateway inactivity timeout doesn't fire while the subagent is working. + # Without this, the parent's _last_activity_ts freezes when delegate_task + # starts and the gateway eventually kills the agent for "no activity". + _heartbeat_stop = threading.Event() + + def _heartbeat_loop(): + while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL): + if parent_agent is None: + continue + touch = getattr(parent_agent, '_touch_activity', None) + if not touch: + continue + # Pull detail from the child's own activity tracker + desc = f"delegate_task: subagent {task_index} working" + try: + child_summary = child.get_activity_summary() + child_tool = child_summary.get("current_tool") + child_iter = child_summary.get("api_call_count", 0) + child_max = child_summary.get("max_iterations", 0) + if child_tool: + desc = (f"delegate_task: subagent running {child_tool} " + f"(iteration {child_iter}/{child_max})") + else: + child_desc = child_summary.get("last_activity_desc", "") + if child_desc: + desc = (f"delegate_task: subagent {child_desc} " + f"(iteration {child_iter}/{child_max})") + except Exception: + pass + try: + touch(desc) + except Exception: + pass + + _heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True) + _heartbeat_thread.start() + try: result = child.run_conversation(user_message=goal) @@ -479,6 +519,11 @@ def _run_single_child( } finally: + # Stop the heartbeat thread so it doesn't keep touching parent activity + # after the child has finished (or failed). + _heartbeat_stop.set() + _heartbeat_thread.join(timeout=5) + if child_pool is not None and leased_cred_id is not None: try: child_pool.release_lease(leased_cred_id)