From 93f6f66872dc2eecc664c1e37d6231268497f388 Mon Sep 17 00:00:00 2001 From: helix4u <4317663+helix4u@users.noreply.github.com> Date: Tue, 14 Apr 2026 23:51:55 -0600 Subject: [PATCH] fix(interrupt): preserve pre-start terminal interrupts --- run_agent.py | 29 +++++++++++--- tests/run_agent/test_interrupt_propagation.py | 39 +++++++++++++++++-- 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/run_agent.py b/run_agent.py index b0bfa53dab..4d414587e8 100644 --- a/run_agent.py +++ b/run_agent.py @@ -754,6 +754,7 @@ class AIAgent: self._interrupt_requested = False self._interrupt_message = None # Optional message that triggered interrupt self._execution_thread_id: int | None = None # Set at run_conversation() start + self._interrupt_thread_signal_pending = False self._client_lock = threading.RLock() # Subagent delegation state @@ -2949,7 +2950,15 @@ class AIAgent: # Signal all tools to abort any in-flight operations immediately. # Scope the interrupt to this agent's execution thread so other # agents running in the same process (gateway) are not affected. - _set_interrupt(True, self._execution_thread_id) + if self._execution_thread_id is not None: + _set_interrupt(True, self._execution_thread_id) + self._interrupt_thread_signal_pending = False + else: + # The interrupt arrived before run_conversation() finished + # binding the agent to its execution thread. Defer the tool-level + # interrupt signal until startup completes instead of targeting + # the caller thread by mistake. + self._interrupt_thread_signal_pending = True # Propagate interrupt to any running child agents (subagent delegation) with self._active_children_lock: children_copy = list(self._active_children) @@ -2965,7 +2974,9 @@ class AIAgent: """Clear any pending interrupt request and the per-thread tool interrupt signal.""" self._interrupt_requested = False self._interrupt_message = None - _set_interrupt(False, self._execution_thread_id) + self._interrupt_thread_signal_pending = False + if self._execution_thread_id is not None: + _set_interrupt(False, self._execution_thread_id) def _touch_activity(self, desc: str) -> None: """Update the last-activity timestamp and description (thread-safe).""" @@ -8179,11 +8190,19 @@ class AIAgent: # Record the execution thread so interrupt()/clear_interrupt() can # scope the tool-level interrupt signal to THIS agent's thread only. - # Must be set before clear_interrupt() which uses it. + # Must be set before any thread-scoped interrupt syncing. self._execution_thread_id = threading.current_thread().ident - # Clear any stale interrupt state at start - self.clear_interrupt() + # Always clear stale per-thread state from a previous turn. If an + # interrupt arrived before startup finished, preserve it and bind it + # to this execution thread now instead of dropping it on the floor. + _set_interrupt(False, self._execution_thread_id) + if self._interrupt_requested: + _set_interrupt(True, self._execution_thread_id) + self._interrupt_thread_signal_pending = False + else: + self._interrupt_message = None + self._interrupt_thread_signal_pending = False # External memory provider: prefetch once before the tool loop. # Reuse the cached result on every iteration to avoid re-calling diff --git a/tests/run_agent/test_interrupt_propagation.py b/tests/run_agent/test_interrupt_propagation.py index a746efdac1..ed1f21bfa1 100644 --- a/tests/run_agent/test_interrupt_propagation.py +++ b/tests/run_agent/test_interrupt_propagation.py @@ -28,7 +28,8 @@ class TestInterruptPropagationToChild(unittest.TestCase): agent = AIAgent.__new__(AIAgent) agent._interrupt_requested = False agent._interrupt_message = None - agent._execution_thread_id = None # defaults to current thread in set_interrupt + agent._execution_thread_id = None + agent._interrupt_thread_signal_pending = False agent._active_children = [] agent._active_children_lock = threading.Lock() agent.quiet_mode = True @@ -46,15 +47,17 @@ class TestInterruptPropagationToChild(unittest.TestCase): assert parent._interrupt_requested is True assert child._interrupt_requested is True assert child._interrupt_message == "new user message" - assert is_interrupted() is True + assert is_interrupted() is False + assert parent._interrupt_thread_signal_pending is True def test_child_clear_interrupt_at_start_clears_thread(self): """child.clear_interrupt() at start of run_conversation clears the - per-thread interrupt flag for the current thread. + bound execution thread's interrupt flag. """ child = self._make_bare_agent() child._interrupt_requested = True child._interrupt_message = "msg" + child._execution_thread_id = threading.current_thread().ident # Interrupt for current thread is set set_interrupt(True) @@ -128,6 +131,36 @@ class TestInterruptPropagationToChild(unittest.TestCase): child_thread.join(timeout=1) set_interrupt(False) + def test_prestart_interrupt_binds_to_execution_thread(self): + """An interrupt that arrives before startup should bind to the agent thread.""" + agent = self._make_bare_agent() + barrier = threading.Barrier(2) + result = {} + + agent.interrupt("stop before start") + assert agent._interrupt_requested is True + assert agent._interrupt_thread_signal_pending is True + assert is_interrupted() is False + + def run_thread(): + from tools.interrupt import set_interrupt as _set_interrupt_for_test + + agent._execution_thread_id = threading.current_thread().ident + _set_interrupt_for_test(False, agent._execution_thread_id) + if agent._interrupt_requested: + _set_interrupt_for_test(True, agent._execution_thread_id) + agent._interrupt_thread_signal_pending = False + barrier.wait(timeout=5) + result["thread_interrupted"] = is_interrupted() + + t = threading.Thread(target=run_thread) + t.start() + barrier.wait(timeout=5) + t.join(timeout=2) + + assert result["thread_interrupted"] is True + assert agent._interrupt_thread_signal_pending is False + class TestPerThreadInterruptIsolation(unittest.TestCase): """Verify that interrupting one agent does NOT affect another agent's thread.