From 8322b42c6cd0f6ae9bc6721e8b0d8cbff4a856f2 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 18 Apr 2026 01:52:06 -0700 Subject: [PATCH] fix(streaming): surface dropped tool-call on mid-stream stall (#12072) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When streaming died after text was already delivered to the user but before a tool-call's arguments finished streaming, the partial-stream stub at the end of _interruptible_streaming_api_call silently set `tool_calls=None` on the returned message and kept `finish_reason=stop`. The agent treated the turn as complete, the session exited cleanly with code 0, and the attempted action was lost with zero user-facing signal. Live-observed Apr 2026 with MiniMax M2.7 on a ~6-minute audit task: agent streamed 'Let me write the audit:', started emitting a write_file tool call, MiniMax stalled for 240s mid-arguments, the stale-stream detector killed the connection, the stub fired, session ended, no file written, no error shown. Fix: the streaming accumulator now records each tool-call's name into `result['partial_tool_names']` as soon as the name is known. When the stub builder fires after a partial delivery and finds any recorded tool names, it appends a human-visible warning to the stub's content — and also fires it as a live stream delta so the user sees it immediately, not only in the persisted transcript. The next turn's model also sees the warning in conversation history and can retry on its own. Text-only partial streams keep the original bare-recovery behaviour (no warning). Validation: | Scenario | Before | After | |---------------------------------------------|---------------------------|---------------------------------------------| | Stream dies mid tool-call, text already sent | Silent exit, no indication | User sees ⚠ warning naming the dropped tool | | Text-only partial stream | Bare recovered text | Unchanged | | tests/run_agent/test_streaming.py | 24 passed | 26 passed (2 new) | --- run_agent.py | 55 ++++++++++-- tests/run_agent/test_streaming.py | 135 ++++++++++++++++++++++++++++++ 2 files changed, 182 insertions(+), 8 deletions(-) diff --git a/run_agent.py b/run_agent.py index e8d23d39ca..d5ff125e33 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5579,7 +5579,7 @@ class AIAgent: raise result["error"] return result["response"] - result = {"response": None, "error": None} + result = {"response": None, "error": None, "partial_tool_names": []} request_client_holder = {"client": None} first_delta_fired = {"done": False} deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) @@ -5751,6 +5751,14 @@ class AIAgent: tool_gen_notified.add(idx) _fire_first_delta() self._fire_tool_gen_started(name) + # Record the partial tool-call name so the outer + # stub-builder can surface a user-visible warning + # if streaming dies before this tool's arguments + # are fully delivered. Without this, a stall + # during tool-call JSON generation lets the stub + # at line ~6107 return `tool_calls=None`, silently + # discarding the attempted action. + result["partial_tool_names"].append(name) if chunk.choices[0].finish_reason: finish_reason = chunk.choices[0].finish_reason @@ -6117,13 +6125,44 @@ class AIAgent: _partial_text = ( getattr(self, "_current_streamed_assistant_text", "") or "" ).strip() or None - logger.warning( - "Partial stream delivered before error; returning stub " - "response with %s chars of recovered content to prevent " - "duplicate messages: %s", - len(_partial_text or ""), - result["error"], - ) + + # If the stream died while the model was emitting a tool call, + # the stub below will silently set `tool_calls=None` and the + # agent loop will treat the turn as complete — the attempted + # action is lost with no user-facing signal. Append a + # human-visible warning to the stub content so (a) the user + # knows something failed, and (b) the next turn's model sees + # in conversation history what was attempted and can retry. + _partial_names = list(result.get("partial_tool_names") or []) + if _partial_names: + _name_str = ", ".join(_partial_names[:3]) + if len(_partial_names) > 3: + _name_str += f", +{len(_partial_names) - 3} more" + _warn = ( + f"\n\n⚠ Stream stalled mid tool-call " + f"({_name_str}); the action was not executed. " + f"Ask me to retry if you want to continue." + ) + _partial_text = (_partial_text or "") + _warn + # Also fire as a streaming delta so the user sees it now + # instead of only in the persisted transcript. + try: + self._fire_stream_delta(_warn) + except Exception: + pass + logger.warning( + "Partial stream dropped tool call(s) %s after %s chars " + "of text; surfaced warning to user: %s", + _partial_names, len(_partial_text or ""), result["error"], + ) + else: + logger.warning( + "Partial stream delivered before error; returning stub " + "response with %s chars of recovered content to prevent " + "duplicate messages: %s", + len(_partial_text or ""), + result["error"], + ) _stub_msg = SimpleNamespace( role="assistant", content=_partial_text, tool_calls=None, reasoning_content=None, diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 73a9872020..6afe36ee3a 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -952,3 +952,138 @@ class TestAnthropicStreamCallbacks: agent._interruptible_streaming_api_call({}) assert touch_calls.count("receiving stream response") == len(events) + + +class TestPartialToolCallWarning: + """Regression: when a stream dies mid tool-call argument generation after + text was already delivered, the partial-stream stub at run_agent.py + line ~6107 used to silently set ``tool_calls=None`` and return + ``finish_reason=stop``, losing the attempted action with zero user-facing + signal. Live-observed Apr 2026 with MiniMax M2.7 on a 6-minute audit + task — agent streamed commentary, emitted a write_file tool call, + MiniMax stalled for 240 s mid-arguments, stale-stream detector killed + the connection, the stub returned, session ended with no file written + and no error shown. + + Fix: when the stream accumulator captured any tool-call names before the + error, the stub now appends a user-visible warning to content AND fires + it as a stream delta so the user sees it immediately. + """ + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_partial_tool_call_surfaces_warning(self, mock_close, mock_create): + """Stream with text + partial tool-call name + mid-stream error + produces a stub whose content contains the user-visible warning + and whose tool_calls is None.""" + from run_agent import AIAgent + + class _StallError(RuntimeError): + pass + + def _stalling_stream(): + yield _make_stream_chunk(content="Let me write the audit: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"path": "/tmp/x", '), + ]) + raise _StallError("simulated upstream stall") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = lambda *a, **kw: _stalling_stream() + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + fired_deltas: list = [] + agent._fire_stream_delta = lambda text: fired_deltas.append(text) + agent._current_streamed_assistant_text = "Let me write the audit: " + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "0" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + content = response.choices[0].message.content or "" + assert "Let me write the audit:" in content, ( + f"Partial text not preserved in stub: {content!r}" + ) + assert "Stream stalled mid tool-call" in content, ( + f"Stub content is missing the dropped-tool-call warning; users " + f"get silent failure. Got content={content!r}" + ) + assert "write_file" in content, ( + f"Warning should name the dropped tool. Got: {content!r}" + ) + assert response.choices[0].message.tool_calls is None + assert any("Stream stalled mid tool-call" in d for d in fired_deltas), ( + f"Warning was not surfaced as a live stream delta. " + f"fired_deltas={fired_deltas}" + ) + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_partial_text_only_no_warning(self, mock_close, mock_create): + """Text-only partial stream (no tool call mid-flight) keeps the + pre-fix behaviour: bare recovered text, no warning noise.""" + from run_agent import AIAgent + + class _StallError(RuntimeError): + pass + + def _stalling_stream(): + yield _make_stream_chunk(content="Here's my answer so far") + raise _StallError("simulated upstream stall") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = lambda *a, **kw: _stalling_stream() + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + agent._current_streamed_assistant_text = "Here's my answer so far" + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "0" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + content = response.choices[0].message.content or "" + assert content == "Here's my answer so far", ( + f"Pre-fix behaviour regressed for text-only partial streams: {content!r}" + ) + assert "Stream stalled" not in content, ( + f"Unexpected warning on text-only partial stream: {content!r}" + ) +