Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
ad85285915 fix(streaming): adaptive backoff + cursor strip to prevent message truncation
Telegram flood control during streaming caused messages to be cut off
mid-response. The old behavior permanently disabled edits after a single
flood-control failure, losing the remainder of the response.

Changes:
- Adaptive backoff: on flood-control edit failures, double the edit interval
  instead of immediately disabling edits. Only permanently disable after 3
  consecutive failures (_MAX_FLOOD_STRIKES).
- Cursor strip: when entering fallback mode, best-effort edit to remove the
  cursor (▉) from the last visible message so it doesn't appear stuck.
- Fallback send retry: _send_fallback_final retries each chunk once on
  flood-control failures (3s delay) before giving up.
- Default edit_interval increased from 0.3s to 1.0s. Telegram rate-limits
  edits at ~1/s per message; 0.3s was virtually guaranteed to trigger flood
  control on any non-trivial response.
- _send_or_edit returns bool so the overflow split loop knows not to
  truncate accumulated text when an edit fails (prevents content loss).

Fixes: messages cutting/stopping mid-response on Telegram, especially
with streaming enabled.
2026-04-11 03:50:33 -07:00
2 changed files with 117 additions and 25 deletions

View File

@@ -190,7 +190,7 @@ class StreamingConfig:
"""Configuration for real-time token streaming to messaging platforms.""" """Configuration for real-time token streaming to messaging platforms."""
enabled: bool = False enabled: bool = False
transport: str = "edit" # "edit" (progressive editMessageText) or "off" transport: str = "edit" # "edit" (progressive editMessageText) or "off"
edit_interval: float = 0.3 # Seconds between message edits edit_interval: float = 1.0 # Seconds between message edits (Telegram rate-limits at ~1/s)
buffer_threshold: int = 40 # Chars before forcing an edit buffer_threshold: int = 40 # Chars before forcing an edit
cursor: str = "" # Cursor shown during streaming cursor: str = "" # Cursor shown during streaming
@@ -210,7 +210,7 @@ class StreamingConfig:
return cls( return cls(
enabled=data.get("enabled", False), enabled=data.get("enabled", False),
transport=data.get("transport", "edit"), transport=data.get("transport", "edit"),
edit_interval=float(data.get("edit_interval", 0.3)), edit_interval=float(data.get("edit_interval", 1.0)),
buffer_threshold=int(data.get("buffer_threshold", 40)), buffer_threshold=int(data.get("buffer_threshold", 40)),
cursor=data.get("cursor", ""), cursor=data.get("cursor", ""),
) )

View File

@@ -36,7 +36,7 @@ _NEW_SEGMENT = object()
@dataclass @dataclass
class StreamConsumerConfig: class StreamConsumerConfig:
"""Runtime config for a single stream consumer instance.""" """Runtime config for a single stream consumer instance."""
edit_interval: float = 0.3 edit_interval: float = 1.0
buffer_threshold: int = 40 buffer_threshold: int = 40
cursor: str = "" cursor: str = ""
@@ -56,6 +56,10 @@ class GatewayStreamConsumer:
await task # wait for final edit await task # wait for final edit
""" """
# After this many consecutive flood-control failures, permanently disable
# progressive edits for the remainder of the stream.
_MAX_FLOOD_STRIKES = 3
def __init__( def __init__(
self, self,
adapter: Any, adapter: Any,
@@ -76,6 +80,8 @@ class GatewayStreamConsumer:
self._last_sent_text = "" # Track last-sent text to skip redundant edits self._last_sent_text = "" # Track last-sent text to skip redundant edits
self._fallback_final_send = False self._fallback_final_send = False
self._fallback_prefix = "" self._fallback_prefix = ""
self._flood_strikes = 0 # Consecutive flood-control edit failures
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
@property @property
def already_sent(self) -> bool: def already_sent(self) -> bool:
@@ -129,7 +135,7 @@ class GatewayStreamConsumer:
should_edit = ( should_edit = (
got_done got_done
or got_segment_break or got_segment_break
or (elapsed >= self.cfg.edit_interval or (elapsed >= self._current_edit_interval
and self._accumulated) and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold or len(self._accumulated) >= self.cfg.buffer_threshold
) )
@@ -173,12 +179,13 @@ class GatewayStreamConsumer:
if split_at < _safe_limit // 2: if split_at < _safe_limit // 2:
split_at = _safe_limit split_at = _safe_limit
chunk = self._accumulated[:split_at] chunk = self._accumulated[:split_at]
await self._send_or_edit(chunk) ok = await self._send_or_edit(chunk)
if self._fallback_final_send: if self._fallback_final_send or not ok:
# Edit failed while attempting to split an oversized # Edit failed (or backed off due to flood control)
# message. Keep the full accumulated text intact so # while attempting to split an oversized message.
# the fallback final-send path can deliver the # Keep the full accumulated text intact so the
# remaining continuation without dropping content. # fallback final-send path can deliver the remaining
# continuation without dropping content.
break break
self._accumulated = self._accumulated[split_at:].lstrip("\n") self._accumulated = self._accumulated[split_at:].lstrip("\n")
self._message_id = None self._message_id = None
@@ -322,7 +329,10 @@ class GatewayStreamConsumer:
return chunks return chunks
async def _send_fallback_final(self, text: str) -> None: async def _send_fallback_final(self, text: str) -> None:
"""Send the final continuation after streaming edits stop working.""" """Send the final continuation after streaming edits stop working.
Retries each chunk once on flood-control failures with a short delay.
"""
final_text = self._clean_for_display(text) final_text = self._clean_for_display(text)
continuation = self._continuation_text(final_text) continuation = self._continuation_text(final_text)
self._fallback_final_send = False self._fallback_final_send = False
@@ -339,12 +349,25 @@ class GatewayStreamConsumer:
last_successful_chunk = "" last_successful_chunk = ""
sent_any_chunk = False sent_any_chunk = False
for chunk in chunks: for chunk in chunks:
# Try sending with one retry on flood-control errors.
result = None
for attempt in range(2):
result = await self.adapter.send( result = await self.adapter.send(
chat_id=self.chat_id, chat_id=self.chat_id,
content=chunk, content=chunk,
metadata=self.metadata, metadata=self.metadata,
) )
if not result.success: if result.success:
break
if attempt == 0 and self._is_flood_error(result):
logger.debug(
"Flood control on fallback send, retrying in 3s"
)
await asyncio.sleep(3.0)
else:
break # non-flood error or second attempt failed
if not result or not result.success:
if sent_any_chunk: if sent_any_chunk:
# Some continuation text already reached the user. Suppress # Some continuation text already reached the user. Suppress
# the base gateway final-send path so we don't resend the # the base gateway final-send path so we don't resend the
@@ -370,20 +393,52 @@ class GatewayStreamConsumer:
self._last_sent_text = chunks[-1] self._last_sent_text = chunks[-1]
self._fallback_prefix = "" self._fallback_prefix = ""
async def _send_or_edit(self, text: str) -> None: def _is_flood_error(self, result) -> bool:
"""Send or edit the streaming message.""" """Check if a SendResult failure is due to flood control / rate limiting."""
err = getattr(result, "error", "") or ""
err_lower = err.lower()
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
async def _try_strip_cursor(self) -> None:
"""Best-effort edit to remove the cursor from the last visible message.
Called when entering fallback mode so the user doesn't see a stuck
cursor (▉) in the partial message.
"""
if not self._message_id or self._message_id == "__no_edit__":
return
prefix = self._visible_prefix()
if not prefix or not prefix.strip():
return
try:
await self.adapter.edit_message(
chat_id=self.chat_id,
message_id=self._message_id,
content=prefix,
)
self._last_sent_text = prefix
except Exception:
pass # best-effort — don't let this block the fallback path
async def _send_or_edit(self, text: str) -> bool:
"""Send or edit the streaming message.
Returns True if the text was successfully delivered (sent or edited),
False otherwise. Callers like the overflow split loop use this to
decide whether to advance past the delivered chunk.
"""
# Strip MEDIA: directives so they don't appear as visible text. # Strip MEDIA: directives so they don't appear as visible text.
# Media files are delivered as native attachments after the stream # Media files are delivered as native attachments after the stream
# finishes (via _deliver_media_from_response in gateway/run.py). # finishes (via _deliver_media_from_response in gateway/run.py).
text = self._clean_for_display(text) text = self._clean_for_display(text)
if not text.strip(): if not text.strip():
return return True # nothing to send is "success"
try: try:
if self._message_id is not None: if self._message_id is not None:
if self._edit_supported: if self._edit_supported:
# Skip if text is identical to what we last sent # Skip if text is identical to what we last sent
if text == self._last_sent_text: if text == self._last_sent_text:
return return True
# Edit existing message # Edit existing message
result = await self.adapter.edit_message( result = await self.adapter.edit_message(
chat_id=self.chat_id, chat_id=self.chat_id,
@@ -393,19 +448,52 @@ class GatewayStreamConsumer:
if result.success: if result.success:
self._already_sent = True self._already_sent = True
self._last_sent_text = text self._last_sent_text = text
# Successful edit — reset flood strike counter
self._flood_strikes = 0
return True
else: else:
# If an edit fails mid-stream (especially Telegram flood control), # Edit failed. If this looks like flood control / rate
# stop progressive edits and send only the missing tail once the # limiting, use adaptive backoff: double the edit interval
# and retry on the next cycle. Only permanently disable
# edits after _MAX_FLOOD_STRIKES consecutive failures.
if self._is_flood_error(result):
self._flood_strikes += 1
self._current_edit_interval = min(
self._current_edit_interval * 2, 10.0,
)
logger.debug(
"Flood control on edit (strike %d/%d), "
"backoff interval → %.1fs",
self._flood_strikes,
self._MAX_FLOOD_STRIKES,
self._current_edit_interval,
)
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
# Don't disable edits yet — just slow down.
# Update _last_edit_time so the next edit
# respects the new interval.
self._last_edit_time = time.monotonic()
return False
# Non-flood error OR flood strikes exhausted: enter
# fallback mode — send only the missing tail once the
# final response is available. # final response is available.
logger.debug("Edit failed, disabling streaming for this adapter") logger.debug(
"Edit failed (strikes=%d), entering fallback mode",
self._flood_strikes,
)
self._fallback_prefix = self._visible_prefix() self._fallback_prefix = self._visible_prefix()
self._fallback_final_send = True self._fallback_final_send = True
self._edit_supported = False self._edit_supported = False
self._already_sent = True self._already_sent = True
# Best-effort: strip the cursor from the last visible
# message so the user doesn't see a stuck ▉.
await self._try_strip_cursor()
return False
else: else:
# Editing not supported — skip intermediate updates. # Editing not supported — skip intermediate updates.
# The final response will be sent by the fallback path. # The final response will be sent by the fallback path.
pass return False
else: else:
# First message — send new # First message — send new
result = await self.adapter.send( result = await self.adapter.send(
@@ -417,6 +505,7 @@ class GatewayStreamConsumer:
self._message_id = result.message_id self._message_id = result.message_id
self._already_sent = True self._already_sent = True
self._last_sent_text = text self._last_sent_text = text
return True
elif result.success: elif result.success:
# Platform accepted the message but returned no message_id # Platform accepted the message but returned no message_id
# (e.g. Signal). Can't edit without an ID — switch to # (e.g. Signal). Can't edit without an ID — switch to
@@ -428,8 +517,11 @@ class GatewayStreamConsumer:
self._fallback_final_send = True self._fallback_final_send = True
# Sentinel prevents re-entering this branch on every delta # Sentinel prevents re-entering this branch on every delta
self._message_id = "__no_edit__" self._message_id = "__no_edit__"
return True # platform accepted, just can't edit
else: else:
# Initial send failed — disable streaming for this session # Initial send failed — disable streaming for this session
self._edit_supported = False self._edit_supported = False
return False
except Exception as e: except Exception as e:
logger.error("Stream send/edit error: %s", e) logger.error("Stream send/edit error: %s", e)
return False