Compare commits

...

2 Commits

Author SHA1 Message Date
Teknium
e45e4107f6 fix: recover from partial stream delivery instead of duplicating
When streaming fails after tokens are already delivered to the platform,
the agent now attempts to continue the response:

  Option A: append partial content as an assistant message and make a
  non-streaming API call — the model sees its previous partial output
  and naturally continues from where it left off.

  Option B (fallback): if trailing assistant is rejected, inject a user
  'continue' instruction and retry — explicitly asks the model to
  resume without repeating.

  Last resort: if both fail, return the partial content as the final
  response (user sees what was delivered, no duplicate).

Tested with real Sonnet and Opus models via both Anthropic native API
and OpenRouter — continuation works seamlessly on all providers.

Also adds partial text accumulation to the Anthropic streaming path
(previously only chat_completions tracked deltas_were_sent).

Inspired by PR #4871 (@trevorgordon981) which identified the bug.
2026-04-04 10:18:36 -07:00
Mibayy
d90178e21b fix(gateway): add message deduplication to Discord and Slack adapters (#4777)
Discord RESUME replays events after reconnects (~7/day observed),
and Slack Socket Mode can redeliver events if the ack was lost.
Neither adapter tracked which messages were already processed,
causing duplicate bot responses.

Add _seen_messages dedup cache (message ID → timestamp) with 5-min
TTL and 2000-entry cap to both adapters, matching the pattern already
used by Mattermost, Matrix, WeCom, Feishu, DingTalk, and Email.

The check goes at the very top of the message handler, before any
other logic, so replayed events are silently dropped.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 14:27:31 -07:00
3 changed files with 175 additions and 0 deletions

View File

@@ -449,6 +449,11 @@ class DiscordAdapter(BasePlatformAdapter):
self._bot_task: Optional[asyncio.Task] = None
# Cap to prevent unbounded growth (Discord threads get archived).
self._MAX_TRACKED_THREADS = 500
# Dedup cache: message_id → timestamp. Prevents duplicate bot
# responses when Discord RESUME replays events after reconnects.
self._seen_messages: Dict[str, float] = {}
self._SEEN_TTL = 300 # 5 minutes
self._SEEN_MAX = 2000 # prune threshold
async def connect(self) -> bool:
"""Connect to Discord and start receiving events."""
@@ -539,6 +544,19 @@ class DiscordAdapter(BasePlatformAdapter):
@self._client.event
async def on_message(message: DiscordMessage):
# Dedup: Discord RESUME replays events after reconnects (#4777)
msg_id = str(message.id)
now = time.time()
if msg_id in adapter_self._seen_messages:
return
adapter_self._seen_messages[msg_id] = now
if len(adapter_self._seen_messages) > adapter_self._SEEN_MAX:
cutoff = now - adapter_self._SEEN_TTL
adapter_self._seen_messages = {
k: v for k, v in adapter_self._seen_messages.items()
if v > cutoff
}
# Always ignore our own messages
if message.author == self._client.user:
return

View File

@@ -13,6 +13,7 @@ import json
import logging
import os
import re
import time
from typing import Dict, Optional, Any
try:
@@ -78,6 +79,11 @@ class SlackAdapter(BasePlatformAdapter):
self._team_clients: Dict[str, AsyncWebClient] = {} # team_id → WebClient
self._team_bot_user_ids: Dict[str, str] = {} # team_id → bot_user_id
self._channel_team: Dict[str, str] = {} # channel_id → team_id
# Dedup cache: event_ts → timestamp. Prevents duplicate bot
# responses when Socket Mode reconnects redeliver events.
self._seen_messages: Dict[str, float] = {}
self._SEEN_TTL = 300 # 5 minutes
self._SEEN_MAX = 2000 # prune threshold
async def connect(self) -> bool:
"""Connect to Slack via Socket Mode."""
@@ -710,6 +716,20 @@ class SlackAdapter(BasePlatformAdapter):
async def _handle_slack_message(self, event: dict) -> None:
"""Handle an incoming Slack message event."""
# Dedup: Slack Socket Mode can redeliver events after reconnects (#4777)
event_ts = event.get("ts", "")
if event_ts:
now = time.time()
if event_ts in self._seen_messages:
return
self._seen_messages[event_ts] = now
if len(self._seen_messages) > self._SEEN_MAX:
cutoff = now - self._SEEN_TTL
self._seen_messages = {
k: v for k, v in self._seen_messages.items()
if v > cutoff
}
# Ignore bot messages (including our own)
if event.get("bot_id") or event.get("subtype") == "bot_message":
return

View File

@@ -4027,6 +4027,7 @@ class AIAgent:
request_client_holder = {"client": None}
first_delta_fired = {"done": False}
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
partial_streamed_text = {"content": ""} # Accumulates text delivered to platform
# Wall-clock timestamp of the last real streaming chunk. The outer
# poll loop uses this to detect stale connections that keep receiving
# SSE keep-alive pings but no actual data.
@@ -4114,6 +4115,7 @@ class AIAgent:
_fire_first_delta()
self._fire_stream_delta(delta.content)
deltas_were_sent["yes"] = True
partial_streamed_text["content"] += delta.content
else:
# Tool calls suppress regular content streaming (avoids
# displaying chatty "I'll use the tool..." text alongside
@@ -4264,6 +4266,8 @@ class AIAgent:
if text and not has_tool_use:
_fire_first_delta()
self._fire_stream_delta(text)
deltas_were_sent["yes"] = True
partial_streamed_text["content"] += text
elif delta_type == "thinking_delta":
thinking_text = getattr(delta, "thinking", "")
if thinking_text:
@@ -4473,6 +4477,14 @@ class AIAgent:
pass
raise InterruptedError("Agent interrupted during streaming API call")
if result["error"] is not None:
if deltas_were_sent["yes"] and partial_streamed_text["content"]:
# Streaming failed AFTER tokens were delivered to the platform.
# Attach the partial text so the outer retry loop can attempt
# continuation (Option A: trailing assistant, Option B: user
# "continue" prompt) instead of duplicating or losing content.
err = result["error"]
err._partial_content = partial_streamed_text["content"]
raise err
raise result["error"]
return result["response"]
@@ -7277,6 +7289,131 @@ class AIAgent:
break
except Exception as api_error:
# -----------------------------------------------------------
# Partial stream recovery. When streaming fails AFTER some
# tokens were delivered to the user, we attempt to continue
# the response rather than duplicate or lose content.
# Option A: append partial as assistant message, retry
# Option B: also inject user "continue" prompt (fallback)
# -----------------------------------------------------------
_partial = getattr(api_error, "_partial_content", None)
if _partial and not getattr(self, "_partial_recovery_done", False):
logger.warning(
"%sStream interrupted after %d chars delivered. "
"Attempting continuation (Option A)...",
self.log_prefix, len(_partial),
)
self._emit_status(
"⚡ Stream interrupted — attempting to continue..."
)
# Option A: trailing assistant message
api_messages.append({"role": "assistant", "content": _partial})
try:
_cont_resp = self._interruptible_api_call(
{**api_kwargs, "messages": api_messages}
)
# Merge: prepend partial to continuation
_cont_text = ""
if self.api_mode == "anthropic_messages":
for _blk in getattr(_cont_resp, "content", []):
if getattr(_blk, "type", None) == "text":
_cont_text = getattr(_blk, "text", "")
break
else:
_cont_text = getattr(
_cont_resp.choices[0].message, "content", ""
) or ""
if _cont_text:
# Fire the continuation to the platform stream
self._fire_stream_delta(_cont_text)
self._partial_recovery_done = True
# Build a merged response for the agent loop
_merged = _partial + _cont_text
_merged_msg = SimpleNamespace(
role="assistant", content=_merged,
tool_calls=None, reasoning_content=None,
)
response = SimpleNamespace(
id="partial-recovery",
model=getattr(_cont_resp, "model", self.model),
choices=[SimpleNamespace(
index=0, message=_merged_msg,
finish_reason="stop",
)],
usage=getattr(_cont_resp, "usage", None),
)
break # Success — exit retry loop
except Exception as opt_a_err:
logger.warning(
"%sOption A failed (%s), trying Option B...",
self.log_prefix, opt_a_err,
)
# Option B: add user "continue" instruction
api_messages.append({
"role": "user",
"content": (
"Your response was cut off mid-sentence "
"due to a connection error. Continue from "
"exactly where you stopped — do not repeat "
"what you already said."
),
})
try:
_cont_resp_b = self._interruptible_api_call(
{**api_kwargs, "messages": api_messages}
)
_cont_text_b = ""
if self.api_mode == "anthropic_messages":
for _blk in getattr(_cont_resp_b, "content", []):
if getattr(_blk, "type", None) == "text":
_cont_text_b = getattr(_blk, "text", "")
break
else:
_cont_text_b = getattr(
_cont_resp_b.choices[0].message,
"content", "",
) or ""
if _cont_text_b:
self._fire_stream_delta(_cont_text_b)
self._partial_recovery_done = True
_merged_b = _partial + _cont_text_b
_merged_msg_b = SimpleNamespace(
role="assistant", content=_merged_b,
tool_calls=None, reasoning_content=None,
)
response = SimpleNamespace(
id="partial-recovery-b",
model=getattr(_cont_resp_b, "model", self.model),
choices=[SimpleNamespace(
index=0, message=_merged_msg_b,
finish_reason="stop",
)],
usage=getattr(_cont_resp_b, "usage", None),
)
break # Success via Option B
except Exception as opt_b_err:
logger.warning(
"%sBoth recovery options failed. "
"Returning partial content as final response.",
self.log_prefix,
)
# Last resort: return what we have
_partial_msg = SimpleNamespace(
role="assistant", content=_partial,
tool_calls=None, reasoning_content=None,
)
response = SimpleNamespace(
id="partial-only",
model=getattr(self, "model", "unknown"),
choices=[SimpleNamespace(
index=0, message=_partial_msg,
finish_reason="stop",
)],
usage=None,
)
self._partial_recovery_done = True
break
# Stop spinner before printing error messages
if thinking_spinner:
thinking_spinner.stop("(╥_╥) error, retrying...")