diff --git a/tests/test_real_interrupt_subagent.py b/tests/test_real_interrupt_subagent.py new file mode 100644 index 00000000000..f1a16753a9a --- /dev/null +++ b/tests/test_real_interrupt_subagent.py @@ -0,0 +1,176 @@ +"""Test real interrupt propagation through delegate_task with actual AIAgent. + +This uses a real AIAgent with mocked HTTP responses to test the complete +interrupt flow through _run_single_child → child.run_conversation(). +""" + +import json +import os +import threading +import time +import unittest +from unittest.mock import MagicMock, patch, PropertyMock + +from tools.interrupt import set_interrupt, is_interrupted + + +def _make_slow_api_response(delay=5.0): + """Create a mock that simulates a slow API response (like a real LLM call).""" + def slow_create(**kwargs): + # Simulate a slow API call + time.sleep(delay) + # Return a simple text response (no tool calls) + resp = MagicMock() + resp.choices = [MagicMock()] + resp.choices[0].message = MagicMock() + resp.choices[0].message.content = "Done" + resp.choices[0].message.tool_calls = None + resp.choices[0].message.refusal = None + resp.choices[0].finish_reason = "stop" + resp.usage = MagicMock() + resp.usage.prompt_tokens = 100 + resp.usage.completion_tokens = 10 + resp.usage.total_tokens = 110 + resp.usage.prompt_tokens_details = None + return resp + return slow_create + + +class TestRealSubagentInterrupt(unittest.TestCase): + """Test interrupt with real AIAgent child through delegate_tool.""" + + def setUp(self): + set_interrupt(False) + os.environ.setdefault("OPENAI_API_KEY", "test-key") + + def tearDown(self): + set_interrupt(False) + + def test_interrupt_child_during_api_call(self): + """Real AIAgent child interrupted while making API call.""" + from run_agent import AIAgent, IterationBudget + + # Create a real parent agent (just enough to be a parent) + parent = AIAgent.__new__(AIAgent) + parent._interrupt_requested = False + parent._interrupt_message = None + parent._active_children = [] + parent.quiet_mode = True + parent.model = "test/model" + parent.base_url = "http://localhost:1" + parent.api_key = "test" + parent.provider = "test" + parent.api_mode = "chat_completions" + parent.platform = "cli" + parent.enabled_toolsets = ["terminal", "file"] + parent.providers_allowed = None + parent.providers_ignored = None + parent.providers_order = None + parent.provider_sort = None + parent.max_tokens = None + parent.reasoning_config = None + parent.prefill_messages = None + parent._session_db = None + parent._delegate_depth = 0 + parent._delegate_spinner = None + parent.tool_progress_callback = None + parent.iteration_budget = IterationBudget(max_total=100) + parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"} + + from tools.delegate_tool import _run_single_child + + child_started = threading.Event() + result_holder = [None] + error_holder = [None] + + def run_delegate(): + try: + # Patch the OpenAI client creation inside AIAgent.__init__ + with patch('run_agent.OpenAI') as MockOpenAI: + mock_client = MagicMock() + # API call takes 5 seconds — should be interrupted before that + mock_client.chat.completions.create = _make_slow_api_response(delay=5.0) + mock_client.close = MagicMock() + MockOpenAI.return_value = mock_client + + # Patch the instance method so it skips prompt assembly + with patch.object(AIAgent, '_build_system_prompt', return_value="You are a test agent"): + # Signal when child starts + original_run = AIAgent.run_conversation + + def patched_run(self_agent, *args, **kwargs): + child_started.set() + return original_run(self_agent, *args, **kwargs) + + with patch.object(AIAgent, 'run_conversation', patched_run): + result = _run_single_child( + task_index=0, + goal="Test task", + context=None, + toolsets=["terminal"], + model="test/model", + max_iterations=5, + parent_agent=parent, + task_count=1, + override_provider="test", + override_base_url="http://localhost:1", + override_api_key="test", + override_api_mode="chat_completions", + ) + result_holder[0] = result + except Exception as e: + import traceback + traceback.print_exc() + error_holder[0] = e + + agent_thread = threading.Thread(target=run_delegate, daemon=True) + agent_thread.start() + + # Wait for child to start run_conversation + started = child_started.wait(timeout=10) + if not started: + agent_thread.join(timeout=1) + if error_holder[0]: + raise error_holder[0] + self.fail("Child never started run_conversation") + + # Give child time to enter main loop and start API call + time.sleep(0.5) + + # Verify child is registered + print(f"Active children: {len(parent._active_children)}") + self.assertGreaterEqual(len(parent._active_children), 1, + "Child not registered in _active_children") + + # Interrupt! (simulating what CLI does) + start = time.monotonic() + parent.interrupt("User typed a new message") + + # Check propagation + child = parent._active_children[0] if parent._active_children else None + if child: + print(f"Child._interrupt_requested after parent.interrupt(): {child._interrupt_requested}") + self.assertTrue(child._interrupt_requested, + "Interrupt did not propagate to child!") + + # Wait for delegate to finish (should be fast since interrupted) + agent_thread.join(timeout=5) + elapsed = time.monotonic() - start + + if error_holder[0]: + raise error_holder[0] + + result = result_holder[0] + self.assertIsNotNone(result, "Delegate returned no result") + print(f"Result status: {result['status']}, elapsed: {elapsed:.2f}s") + print(f"Full result: {result}") + + # The child should have been interrupted, not completed the full 5s API call + self.assertLess(elapsed, 3.0, + f"Took {elapsed:.2f}s — interrupt was not detected quickly enough") + self.assertEqual(result["status"], "interrupted", + f"Expected 'interrupted', got '{result['status']}'") + + +if __name__ == "__main__": + unittest.main()