Compare commits

...

1 Commits

Author SHA1 Message Date
kshitijk4poor
2579245ae2 fix: /stop now immediately aborts streaming retry loop
When a user sends /stop during a streaming API call, the outer poll loop
detects _interrupt_requested and closes the HTTP connection. However, the
inner _call() thread catches the connection error and enters its retry
loop — opening a FRESH connection without checking the interrupt flag.

On slow providers like ollama-cloud, each retry attempt blocks for the
full stream-read timeout (120s+). With 3 retry attempts this caused
510+ second delays between /stop and actual response — the agent appeared
completely unresponsive despite the stop being acknowledged.

Fix: add an _interrupt_requested check at the top of the streaming retry
loop so the agent exits immediately instead of retrying.

Also fix log truncation: all session key logging in gateway/run.py used
[:20] or [:30] slices, which truncated 'agent:main:telegram:dm:5690190437'
(33 chars) to 'agent:main:telegram:' — losing the identifying chat type
and user ID. Replace with full keys to make logs debuggable.

Reported by user Sidharth Pulipaka via Telegram on ollama-cloud provider.
2026-04-25 17:58:24 +05:30
3 changed files with 201 additions and 29 deletions

View File

@@ -1103,7 +1103,7 @@ class GatewayRunner:
if override_runtime.get("api_key"):
logger.debug(
"Session model override (fast): session=%s config_model=%s -> override_model=%s provider=%s",
(resolved_session_key or "")[:30], model, override_model,
resolved_session_key or "", model, override_model,
override_runtime.get("provider"),
)
return override_model, override_runtime
@@ -1111,12 +1111,12 @@ class GatewayRunner:
# resolution and apply model/provider from the override on top.
logger.debug(
"Session model override (no api_key, fallback): session=%s config_model=%s override_model=%s",
(resolved_session_key or "")[:30], model, override_model,
resolved_session_key or "", model, override_model,
)
else:
logger.debug(
"No session model override: session=%s config_model=%s override_keys=%s",
(resolved_session_key or "")[:30], model,
resolved_session_key or "", model,
list(self._session_model_overrides.keys())[:5] if self._session_model_overrides else "[]",
)
@@ -1687,7 +1687,7 @@ class GatewayRunner:
continue
try:
agent.interrupt(reason)
logger.debug("Interrupted running agent for session %s during shutdown", session_key[:20])
logger.debug("Interrupted running agent for session %s during shutdown", session_key)
except Exception as e:
logger.debug("Failed interrupting agent during shutdown: %s", e)
@@ -1859,7 +1859,7 @@ class GatewayRunner:
logger.warning(
"Auto-suspended stuck session %s (active across %d "
"consecutive restarts — likely a stuck loop)",
session_key[:30], counts[session_key],
session_key, counts[session_key],
)
except Exception:
pass
@@ -2681,7 +2681,7 @@ class GatewayRunner:
except Exception as _e:
logger.debug(
"mark_resume_pending failed for %s: %s",
_sk[:20], _e,
_sk, _e,
)
self._interrupt_running_agents(
_INTERRUPT_REASON_GATEWAY_RESTART if self._restart_requested else _INTERRUPT_REASON_GATEWAY_SHUTDOWN
@@ -3347,7 +3347,7 @@ class GatewayRunner:
logger.warning(
"Evicting stale _running_agents entry for %s "
"(age: %.0fs, idle: %.0fs, timeout: %.0fs)%s",
_quick_key[:30], _stale_age, _stale_idle,
_quick_key, _stale_age, _stale_idle,
_raw_stale_timeout, _stale_detail,
)
self._invalidate_session_run_generation(
@@ -3383,7 +3383,7 @@ class GatewayRunner:
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command",
)
logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key[:20])
logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key)
return "⚡ Stopped. You can continue this session."
# /reset and /new must bypass the running-agent guard so they
@@ -3449,7 +3449,7 @@ class GatewayRunner:
try:
accepted = running_agent.steer(steer_text)
except Exception as exc:
logger.warning("Steer failed for session %s: %s", _quick_key[:20], exc)
logger.warning("Steer failed for session %s: %s", _quick_key, exc)
return f"⚠️ Steer failed: {exc}"
if accepted:
preview = steer_text[:60] + ("..." if len(steer_text) > 60 else "")
@@ -3532,7 +3532,7 @@ class GatewayRunner:
)
if event.message_type == MessageType.PHOTO:
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key)
adapter = self.adapters.get(source.platform)
if adapter:
merge_pending_message_event(adapter._pending_messages, _quick_key, event)
@@ -3552,7 +3552,7 @@ class GatewayRunner:
logger.debug(
"Telegram follow-up arrived %.2fs after run start for %s — queueing without interrupt",
time.time() - _started_at,
_quick_key[:20],
_quick_key,
)
adapter = self.adapters.get(source.platform)
if adapter:
@@ -3570,7 +3570,7 @@ class GatewayRunner:
if event.get_command() == "stop":
# Force-clean the sentinel so the session is unlocked.
self._release_running_agent_state(_quick_key)
logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key[:20])
logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key)
return "⚡ Force-stopped. The agent was still starting — session unlocked."
# Queue the message so it will be picked up after the
# agent starts.
@@ -3592,10 +3592,10 @@ class GatewayRunner:
else f"⏳ Gateway is {self._status_action_gerund()} and is not accepting another turn right now."
)
if self._busy_input_mode == "queue":
logger.debug("PRIORITY queue follow-up for session %s", _quick_key[:20])
logger.debug("PRIORITY queue follow-up for session %s", _quick_key)
self._queue_or_replace_pending_event(_quick_key, event)
return None
logger.debug("PRIORITY interrupt for session %s", _quick_key[:20])
logger.debug("PRIORITY interrupt for session %s", _quick_key)
running_agent.interrupt(event.text)
if _quick_key in self._pending_messages:
self._pending_messages[_quick_key] += "\n" + event.text
@@ -4593,7 +4593,7 @@ class GatewayRunner:
if not self._is_session_run_current(_quick_key, run_generation):
logger.info(
"Discarding stale agent result for %s — generation %d is no longer current",
_quick_key[:20] if _quick_key else "?",
_quick_key or "?",
run_generation,
)
_stale_adapter = self.adapters.get(source.platform)
@@ -4644,7 +4644,7 @@ class GatewayRunner:
except Exception as _e:
logger.debug(
"clear_resume_pending failed for %s: %s",
session_key[:20], _e,
session_key, _e,
)
# Surface error details when the agent failed silently (final_response=None)
@@ -5291,7 +5291,7 @@ class GatewayRunner:
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command_pending",
)
logger.info("STOP (pending) for session %s — sentinel cleared", session_key[:20])
logger.info("STOP (pending) for session %s — sentinel cleared", session_key)
return "⚡ Stopped. The agent hadn't started yet — you can continue this session."
if agent:
# Force-clean the session lock so a truly hung agent doesn't
@@ -8798,7 +8798,7 @@ class GatewayRunner:
if reason:
logger.info(
"Invalidated run generation for %s%d (%s)",
session_key[:20],
session_key,
generation,
reason,
)
@@ -9205,7 +9205,7 @@ class GatewayRunner:
if not _run_still_current():
logger.info(
"Discarding stale proxy stream for %s — generation %d is no longer current",
session_key[:20] if session_key else "?",
session_key or "?",
run_generation or 0,
)
return {
@@ -9269,7 +9269,7 @@ class GatewayRunner:
if not _run_still_current():
logger.info(
"Discarding stale proxy result for %s — generation %d is no longer current",
session_key[:20] if session_key else "?",
session_key or "?",
run_generation or 0,
)
return {
@@ -9711,7 +9711,7 @@ class GatewayRunner:
)
logger.debug(
"run_agent resolved: model=%s provider=%s session=%s",
model, runtime_kwargs.get("provider"), (session_key or "")[:30],
model, runtime_kwargs.get("provider"), session_key or "",
)
except Exception as exc:
return {
@@ -10322,7 +10322,7 @@ class GatewayRunner:
):
logger.info(
"Skipping stale agent promotion for %s — generation %s is no longer current",
(session_key or "")[:20],
session_key or "",
run_generation,
)
return
@@ -10469,7 +10469,7 @@ class GatewayRunner:
logger.info(
"Backup interrupt detected for session %s "
"(monitor task state: %s)",
session_key[:20],
session_key,
"done" if interrupt_monitor.done() else "running",
)
_backup_agent.interrupt(_bp_text)
@@ -10529,7 +10529,7 @@ class GatewayRunner:
logger.info(
"Backup interrupt detected for session %s "
"(monitor task state: %s)",
session_key[:20],
session_key,
"done" if interrupt_monitor.done() else "running",
)
_backup_agent.interrupt(_bp_text)
@@ -10631,7 +10631,7 @@ class GatewayRunner:
if _is_control_interrupt_message(interrupt_message):
logger.info(
"Ignoring control interrupt message for session %s: %s",
session_key[:20] if session_key else "?",
session_key or "?",
interrupt_message,
)
else:
@@ -10675,7 +10675,7 @@ class GatewayRunner:
if self._draining and (pending_event or pending):
logger.info(
"Discarding pending follow-up for session %s during gateway %s",
session_key[:20] if session_key else "?",
session_key or "?",
self._status_action_label(),
)
pending_event = None
@@ -10732,7 +10732,7 @@ class GatewayRunner:
try:
logger.info(
"Queued follow-up for session %s: final stream delivery not confirmed; sending first response before continuing.",
session_key[:20] if session_key else "?",
session_key or "?",
)
await adapter.send(
source.chat_id,
@@ -10744,7 +10744,7 @@ class GatewayRunner:
elif first_response:
logger.info(
"Queued follow-up for session %s: skipping resend because final streamed delivery was confirmed.",
session_key[:20] if session_key else "?",
session_key or "?",
)
# Release deferred bg-review notifications now that the
# first response has been delivered. Pop from the
@@ -10879,7 +10879,7 @@ class GatewayRunner:
if not _is_empty_sentinel and (_streamed or _previewed):
logger.info(
"Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s).",
session_key[:20] if session_key else "?",
session_key or "?",
_streamed,
_previewed,
)

View File

@@ -5137,6 +5137,8 @@ class AIAgent:
# response.incomplete instead of response.completed).
self._codex_streamed_text_parts: list = []
for attempt in range(max_stream_retries + 1):
if self._interrupt_requested:
raise InterruptedError("Agent interrupted before Codex stream retry")
collected_output_items: list = []
try:
with active_client.responses.stream(**api_kwargs) as stream:
@@ -6306,6 +6308,14 @@ class AIAgent:
try:
for _stream_attempt in range(_max_stream_retries + 1):
# Check for interrupt before each retry attempt. Without
# this, /stop closes the HTTP connection (outer poll loop),
# but the retry loop opens a FRESH connection — negating the
# interrupt entirely. On slow providers (ollama-cloud) each
# retry can block for the full stream-read timeout (120s+),
# causing multi-minute delays between /stop and response.
if self._interrupt_requested:
raise InterruptedError("Agent interrupted before stream retry")
try:
if self.api_mode == "anthropic_messages":
self._try_refresh_anthropic_client_credentials()

View File

@@ -0,0 +1,162 @@
"""Tests that /stop interrupts streaming retry loops immediately.
When the agent is interrupted during a streaming API call, the outer poll
loop closes the HTTP connection. The inner `_call()` thread sees a
connection error and enters its retry loop. Before this fix, the retry
loop would open a FRESH connection without checking `_interrupt_requested`,
making /stop take multiple retry cycles × read-timeout to actually stop
(510+ seconds observed on slow ollama-cloud providers).
The fix adds an `_interrupt_requested` check at the top of the retry loop
so the agent exits immediately instead of retrying.
"""
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
def _make_agent(**kwargs):
"""Create a minimal AIAgent for streaming tests."""
from run_agent import AIAgent
defaults = dict(
api_key="test-key",
base_url="https://example.com/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
defaults.update(kwargs)
agent = AIAgent(**defaults)
agent.api_mode = "chat_completions"
return agent
class TestStreamInterruptBeforeRetry:
"""Verify _interrupt_requested is checked before each streaming retry."""
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_interrupt_prevents_stream_retry(self, mock_close, mock_create):
"""When _interrupt_requested is set during a transient stream error,
the retry loop must NOT retry — it should raise InterruptedError
immediately instead of opening a fresh connection."""
import httpx
attempt_count = [0]
def fail_once_then_interrupt(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
# First attempt: simulate normal failure, then set interrupt
# (as if /stop arrived while the retry loop processes the error)
agent._interrupt_requested = True
raise httpx.ConnectError("connection reset by /stop")
# Should never reach here — the interrupt check should fire first
raise httpx.ConnectError("unexpected retry — interrupt not checked!")
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = fail_once_then_interrupt
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = False
with pytest.raises(InterruptedError, match="interrupted"):
agent._interruptible_streaming_api_call({})
# Only 1 attempt should have been made — the interrupt should prevent retry
assert attempt_count[0] == 1, (
f"Expected 1 attempt but got {attempt_count[0]}. "
"The retry loop retried despite _interrupt_requested being set."
)
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_interrupt_before_first_attempt(self, mock_close, mock_create):
"""If _interrupt_requested is already set when the streaming call
starts, it should exit immediately without making any API call."""
mock_client = MagicMock()
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = True # Pre-set before call
with pytest.raises(InterruptedError, match="interrupted"):
agent._interruptible_streaming_api_call({})
# No API call should have been made at all
assert mock_client.chat.completions.create.call_count == 0
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_normal_retry_still_works_without_interrupt(self, mock_close, mock_create):
"""Without an interrupt, transient errors should still retry normally."""
import httpx
attempts = [0]
def fail_twice_then_succeed(*args, **kwargs):
attempts[0] += 1
if attempts[0] <= 2:
raise httpx.ConnectError("transient failure")
# Third attempt succeeds
chunks = [
SimpleNamespace(
choices=[
SimpleNamespace(
index=0,
delta=SimpleNamespace(
content="ok",
tool_calls=None,
reasoning_content=None,
reasoning=None,
),
finish_reason=None,
)
],
model="test/model",
usage=None,
),
SimpleNamespace(
choices=[
SimpleNamespace(
index=0,
delta=SimpleNamespace(
content=None,
tool_calls=None,
reasoning_content=None,
reasoning=None,
),
finish_reason="stop",
)
],
model="test/model",
usage=None,
),
]
stream = MagicMock()
stream.__iter__ = MagicMock(return_value=iter(chunks))
stream.response = MagicMock()
stream.response.headers = {}
return stream
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = fail_twice_then_succeed
mock_create.return_value = mock_client
agent = _make_agent()
agent._interrupt_requested = False
# Should succeed on the third attempt
result = agent._interruptible_streaming_api_call({})
assert result is not None
assert attempts[0] == 3