Files
hermes-agent/gateway/stream_consumer.py

586 lines
26 KiB
Python
Raw Normal View History

"""Gateway streaming consumer — bridges sync agent callbacks to async platform delivery.
The agent fires stream_delta_callback(text) synchronously from its worker thread.
GatewayStreamConsumer:
1. Receives deltas via on_delta() (thread-safe, sync)
2. Queues them to an asyncio task via queue.Queue
3. The async run() task buffers, rate-limits, and progressively edits
a single message on the target platform
Design: Uses the edit transport (send initial message, then editMessageText).
This is universally supported across Telegram, Discord, and Slack.
Credit: jobless0x (#774, #1312), OutThisLife (#798), clicksingh (#697).
"""
from __future__ import annotations
import asyncio
import logging
import queue
import re
import time
from dataclasses import dataclass
from typing import Any, Optional
logger = logging.getLogger("gateway.stream_consumer")
# Sentinel to signal the stream is complete
_DONE = object()
# Sentinel to signal a tool boundary — finalize current message and start a
# new one so that subsequent text appears below tool progress messages.
_NEW_SEGMENT = object()
# Queue marker for a completed assistant commentary message emitted between
# API/tool iterations (for example: "I'll inspect the repo first.").
_COMMENTARY = object()
@dataclass
class StreamConsumerConfig:
"""Runtime config for a single stream consumer instance."""
edit_interval: float = 1.0
buffer_threshold: int = 40
cursor: str = ""
class GatewayStreamConsumer:
"""Async consumer that progressively edits a platform message with streamed tokens.
Usage::
consumer = GatewayStreamConsumer(adapter, chat_id, config, metadata=metadata)
# Pass consumer.on_delta as stream_delta_callback to AIAgent
agent = AIAgent(..., stream_delta_callback=consumer.on_delta)
# Start the consumer as an asyncio task
task = asyncio.create_task(consumer.run())
# ... run agent in thread pool ...
consumer.finish() # signal completion
await task # wait for final edit
"""
# After this many consecutive flood-control failures, permanently disable
# progressive edits for the remainder of the stream.
_MAX_FLOOD_STRIKES = 3
def __init__(
self,
adapter: Any,
chat_id: str,
config: Optional[StreamConsumerConfig] = None,
metadata: Optional[dict] = None,
):
self.adapter = adapter
self.chat_id = chat_id
self.cfg = config or StreamConsumerConfig()
self.metadata = metadata
self._queue: queue.Queue = queue.Queue()
self._accumulated = ""
self._message_id: Optional[str] = None
self._already_sent = False
self._edit_supported = True # Disabled when progressive edits are no longer usable
self._last_edit_time = 0.0
fix: Telegram streaming — config bridge, not-modified, flood control (#1782) * fix: NameError in OpenCode provider setup (prompt_text -> prompt) The OpenCode Zen and OpenCode Go setup sections used prompt_text() which is undefined. All other providers correctly use the local prompt() function defined in setup.py. Fixes crash during 'hermes setup' when selecting either OpenCode provider. * fix: Telegram streaming — config bridge, not-modified, flood control Three fixes for gateway streaming: 1. Bridge streaming config from config.yaml into gateway runtime. load_gateway_config() now reads the 'streaming' key from config.yaml (same pattern as session_reset, stt, etc.), matching the docs. Previously only gateway.json was read. 2. Handle 'Message is not modified' in Telegram edit_message(). This Telegram API error fires when editing with identical content — a no-op, not a real failure. Previously it returned success=False which made the stream consumer disable streaming entirely. 3. Handle RetryAfter / flood control in Telegram edit_message(). Fast providers can hit Telegram rate limits during streaming. Now waits the requested retry_after duration and retries once, instead of treating it as a fatal edit failure. Also fixed double-edit on stream finish: the consumer now tracks last-sent text and skips redundant edits, preventing the not-modified error at the source. * refactor: make config.yaml the primary gateway config source Eliminates the per-key bridge pattern in load_gateway_config(). Previously gateway.json was the primary source and each config.yaml key needed an individual bridge — easy to forget (streaming was missing, causing garl4546's bug). Now config.yaml is read first and its keys are mapped directly into the GatewayConfig.from_dict() schema. gateway.json is kept as a legacy fallback layer (loaded first, then overwritten by config.yaml keys). If gateway.json exists, a log message suggests migrating. Also: - Removed dead save_gateway_config() (never called anywhere) - Updated CLI help text and send_message error to reference config.yaml instead of gateway.json --------- Co-authored-by: Test <test@test.com>
2026-03-17 10:51:54 -07:00
self._last_sent_text = "" # Track last-sent text to skip redundant edits
self._fallback_final_send = False
self._fallback_prefix = ""
self._flood_strikes = 0 # Consecutive flood-control edit failures
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
self._final_response_sent = False
@property
def already_sent(self) -> bool:
"""True if at least one message was sent or edited during the run."""
return self._already_sent
@property
def final_response_sent(self) -> bool:
"""True when the stream consumer delivered the final assistant reply."""
return self._final_response_sent
def on_segment_break(self) -> None:
"""Finalize the current stream segment and start a fresh message."""
self._queue.put(_NEW_SEGMENT)
def on_commentary(self, text: str) -> None:
"""Queue a completed interim assistant commentary message."""
if text:
self._queue.put((_COMMENTARY, text))
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
if preserve_no_edit and self._message_id == "__no_edit__":
return
self._message_id = None
self._accumulated = ""
self._last_sent_text = ""
self._fallback_final_send = False
self._fallback_prefix = ""
def on_delta(self, text: str) -> None:
"""Thread-safe callback — called from the agent's worker thread.
When *text* is ``None``, signals a tool boundary: the current message
is finalized and subsequent text will be sent as a new message so it
appears below any tool-progress messages the gateway sent in between.
"""
if text:
self._queue.put(text)
elif text is None:
self.on_segment_break()
def finish(self) -> None:
"""Signal that the stream is complete."""
self._queue.put(_DONE)
async def run(self) -> None:
"""Async task that drains the queue and edits the platform message."""
# Platform message length limit — leave room for cursor + formatting
_raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
_safe_limit = max(500, _raw_limit - len(self.cfg.cursor) - 100)
try:
while True:
# Drain all available items from the queue
got_done = False
got_segment_break = False
commentary_text = None
while True:
try:
item = self._queue.get_nowait()
if item is _DONE:
got_done = True
break
if item is _NEW_SEGMENT:
got_segment_break = True
break
if isinstance(item, tuple) and len(item) == 2 and item[0] is _COMMENTARY:
commentary_text = item[1]
break
self._accumulated += item
except queue.Empty:
break
# Decide whether to flush an edit
now = time.monotonic()
elapsed = now - self._last_edit_time
should_edit = (
got_done
or got_segment_break
or commentary_text is not None
or (elapsed >= self._current_edit_interval
refactor: codebase-wide lint cleanup — unused imports, dead code, and inefficient patterns (#5821) Comprehensive cleanup across 80 files based on automated (ruff, pyflakes, vulture) and manual analysis of the entire codebase. Changes by category: Unused imports removed (~95 across 55 files): - Removed genuinely unused imports from all major subsystems - agent/, hermes_cli/, tools/, gateway/, plugins/, cron/ - Includes imports in try/except blocks that were truly unused (vs availability checks which were left alone) Unused variables removed (~25): - Removed dead variables: connected, inner, channels, last_exc, source, new_server_names, verify, pconfig, default_terminal, result, pending_handled, temperature, loop - Dropped unused argparse subparser assignments in hermes_cli/main.py (12 instances of add_parser() where result was never used) Dead code removed: - run_agent.py: Removed dead ternary (None if False else None) and surrounding unreachable branch in identity fallback - run_agent.py: Removed write-only attribute _last_reported_tool - hermes_cli/providers.py: Removed dead @property decorator on module-level function (decorator has no effect outside a class) - gateway/run.py: Removed unused MCP config load before reconnect - gateway/platforms/slack.py: Removed dead SessionSource construction Undefined name bugs fixed (would cause NameError at runtime): - batch_runner.py: Added missing logger = logging.getLogger(__name__) - tools/environments/daytona.py: Added missing Dict and Path imports Unnecessary global statements removed (14): - tools/terminal_tool.py: 5 functions declared global for dicts they only mutated via .pop()/[key]=value (no rebinding) - tools/browser_tool.py: cleanup thread loop only reads flag - tools/rl_training_tool.py: 4 functions only do dict mutations - tools/mcp_oauth.py: only reads the global - hermes_time.py: only reads cached values Inefficient patterns fixed: - startswith/endswith tuple form: 15 instances of x.startswith('a') or x.startswith('b') consolidated to x.startswith(('a', 'b')) - len(x)==0 / len(x)>0: 13 instances replaced with pythonic truthiness checks (not x / bool(x)) - in dict.keys(): 5 instances simplified to in dict - Redefined unused name: removed duplicate _strip_mdv2 import in send_message_tool.py Other fixes: - hermes_cli/doctor.py: Replaced undefined logger.debug() with pass - hermes_cli/config.py: Consolidated chained .endswith() calls Test results: 3934 passed, 17 failed (all pre-existing on main), 19 skipped. Zero regressions.
2026-04-07 10:25:31 -07:00
and self._accumulated)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
current_update_visible = False
if should_edit and self._accumulated:
# Split overflow: if accumulated text exceeds the platform
# limit, split into properly sized chunks.
if (
len(self._accumulated) > _safe_limit
and self._message_id is None
):
# No existing message to edit (first message or after a
# segment break). Use truncate_message — the same
# helper the non-streaming path uses — to split with
# proper word/code-fence boundaries and chunk
# indicators like "(1/2)".
chunks = self.adapter.truncate_message(
self._accumulated, _safe_limit
)
for chunk in chunks:
await self._send_new_chunk(chunk, self._message_id)
self._accumulated = ""
self._last_sent_text = ""
self._last_edit_time = time.monotonic()
if got_done:
self._final_response_sent = self._already_sent
return
if got_segment_break:
self._message_id = None
self._fallback_final_send = False
self._fallback_prefix = ""
continue
# Existing message: edit it with the first chunk, then
# start a new message for the overflow remainder.
while (
len(self._accumulated) > _safe_limit
and self._message_id is not None
and self._edit_supported
):
split_at = self._accumulated.rfind("\n", 0, _safe_limit)
if split_at < _safe_limit // 2:
split_at = _safe_limit
chunk = self._accumulated[:split_at]
ok = await self._send_or_edit(chunk)
if self._fallback_final_send or not ok:
# Edit failed (or backed off due to flood control)
# while attempting to split an oversized message.
# Keep the full accumulated text intact so the
# fallback final-send path can deliver the remaining
# continuation without dropping content.
break
self._accumulated = self._accumulated[split_at:].lstrip("\n")
self._message_id = None
self._last_sent_text = ""
display_text = self._accumulated
if not got_done and not got_segment_break and commentary_text is None:
display_text += self.cfg.cursor
current_update_visible = await self._send_or_edit(display_text)
self._last_edit_time = time.monotonic()
if got_done:
# Final edit without cursor. If progressive editing failed
# mid-stream, send a single continuation/fallback message
# here instead of letting the base gateway path send the
# full response again.
if self._accumulated:
if self._fallback_final_send:
await self._send_fallback_final(self._accumulated)
elif current_update_visible:
self._final_response_sent = True
elif self._message_id:
self._final_response_sent = await self._send_or_edit(self._accumulated)
elif not self._already_sent:
self._final_response_sent = await self._send_or_edit(self._accumulated)
return
if commentary_text is not None:
self._reset_segment_state()
await self._send_commentary(commentary_text)
self._last_edit_time = time.monotonic()
self._reset_segment_state()
# Tool boundary: reset message state so the next text chunk
# creates a fresh message below any tool-progress messages.
#
# Exception: when _message_id is "__no_edit__" the platform
# never returned a real message ID (e.g. Signal, webhook with
# github_comment delivery). Resetting to None would re-enter
# the "first send" path on every tool boundary and post one
# platform message per tool call — that is what caused 155
# comments under a single PR. Instead, preserve the sentinel
# so the full continuation is delivered once via
# _send_fallback_final.
# (When editing fails mid-stream due to flood control the id is
# a real string like "msg_1", not "__no_edit__", so that case
# still resets and creates a fresh segment as intended.)
if got_segment_break:
self._reset_segment_state(preserve_no_edit=True)
await asyncio.sleep(0.05) # Small yield to not busy-loop
except asyncio.CancelledError:
# Best-effort final edit on cancellation
if self._accumulated and self._message_id:
try:
await self._send_or_edit(self._accumulated)
except Exception:
pass
except Exception as e:
logger.error("Stream consumer error: %s", e)
# Pattern to strip MEDIA:<path> tags (including optional surrounding quotes).
# Matches the simple cleanup regex used by the non-streaming path in
# gateway/platforms/base.py for post-processing.
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
@staticmethod
def _clean_for_display(text: str) -> str:
"""Strip MEDIA: directives and internal markers from text before display.
The streaming path delivers raw text chunks that may include
``MEDIA:<path>`` tags and ``[[audio_as_voice]]`` directives meant for
the platform adapter's post-processing. The actual media files are
delivered separately via ``_deliver_media_from_response()`` after the
stream finishes we just need to hide the raw directives from the
user.
"""
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
return text
cleaned = text.replace("[[audio_as_voice]]", "")
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
# Collapse excessive blank lines left behind by removed tags
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
# Strip trailing whitespace/newlines but preserve leading content
return cleaned.rstrip()
async def _send_new_chunk(self, text: str, reply_to_id: Optional[str]) -> Optional[str]:
"""Send a new message chunk, optionally threaded to a previous message.
Returns the message_id so callers can thread subsequent chunks.
"""
text = self._clean_for_display(text)
if not text.strip():
return reply_to_id
try:
meta = dict(self.metadata) if self.metadata else {}
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
reply_to=reply_to_id,
metadata=meta,
)
if result.success and result.message_id:
self._message_id = str(result.message_id)
self._already_sent = True
self._last_sent_text = text
return str(result.message_id)
else:
self._edit_supported = False
return reply_to_id
except Exception as e:
logger.error("Stream send chunk error: %s", e)
return reply_to_id
def _visible_prefix(self) -> str:
"""Return the visible text already shown in the streamed message."""
prefix = self._last_sent_text or ""
if self.cfg.cursor and prefix.endswith(self.cfg.cursor):
prefix = prefix[:-len(self.cfg.cursor)]
return self._clean_for_display(prefix)
def _continuation_text(self, final_text: str) -> str:
"""Return only the part of final_text the user has not already seen."""
prefix = self._fallback_prefix or self._visible_prefix()
if prefix and final_text.startswith(prefix):
return final_text[len(prefix):].lstrip()
return final_text
@staticmethod
def _split_text_chunks(text: str, limit: int) -> list[str]:
"""Split text into reasonably sized chunks for fallback sends."""
if len(text) <= limit:
return [text]
chunks: list[str] = []
remaining = text
while len(remaining) > limit:
split_at = remaining.rfind("\n", 0, limit)
if split_at < limit // 2:
split_at = limit
chunks.append(remaining[:split_at])
remaining = remaining[split_at:].lstrip("\n")
if remaining:
chunks.append(remaining)
return chunks
async def _send_fallback_final(self, text: str) -> None:
"""Send the final continuation after streaming edits stop working.
Retries each chunk once on flood-control failures with a short delay.
"""
final_text = self._clean_for_display(text)
continuation = self._continuation_text(final_text)
self._fallback_final_send = False
if not continuation.strip():
# Nothing new to send — the visible partial already matches final text.
self._already_sent = True
self._final_response_sent = True
return
raw_limit = getattr(self.adapter, "MAX_MESSAGE_LENGTH", 4096)
safe_limit = max(500, raw_limit - 100)
chunks = self._split_text_chunks(continuation, safe_limit)
last_message_id: Optional[str] = None
last_successful_chunk = ""
sent_any_chunk = False
for chunk in chunks:
# Try sending with one retry on flood-control errors.
result = None
for attempt in range(2):
result = await self.adapter.send(
chat_id=self.chat_id,
content=chunk,
metadata=self.metadata,
)
if result.success:
break
if attempt == 0 and self._is_flood_error(result):
logger.debug(
"Flood control on fallback send, retrying in 3s"
)
await asyncio.sleep(3.0)
else:
break # non-flood error or second attempt failed
if not result or not result.success:
if sent_any_chunk:
# Some continuation text already reached the user. Suppress
# the base gateway final-send path so we don't resend the
# full response and create another duplicate.
self._already_sent = True
self._final_response_sent = True
self._message_id = last_message_id
self._last_sent_text = last_successful_chunk
self._fallback_prefix = ""
return
# No fallback chunk reached the user — allow the normal gateway
# final-send path to try one more time.
self._already_sent = False
self._message_id = None
self._last_sent_text = ""
self._fallback_prefix = ""
return
sent_any_chunk = True
last_successful_chunk = chunk
last_message_id = result.message_id or last_message_id
self._message_id = last_message_id
self._already_sent = True
self._final_response_sent = True
self._last_sent_text = chunks[-1]
self._fallback_prefix = ""
def _is_flood_error(self, result) -> bool:
"""Check if a SendResult failure is due to flood control / rate limiting."""
err = getattr(result, "error", "") or ""
err_lower = err.lower()
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
async def _try_strip_cursor(self) -> None:
"""Best-effort edit to remove the cursor from the last visible message.
Called when entering fallback mode so the user doesn't see a stuck
cursor () in the partial message.
"""
if not self._message_id or self._message_id == "__no_edit__":
return
prefix = self._visible_prefix()
if not prefix or not prefix.strip():
return
try:
await self.adapter.edit_message(
chat_id=self.chat_id,
message_id=self._message_id,
content=prefix,
)
self._last_sent_text = prefix
except Exception:
pass # best-effort — don't let this block the fallback path
async def _send_commentary(self, text: str) -> bool:
"""Send a completed interim assistant commentary message."""
text = self._clean_for_display(text)
if not text.strip():
return False
try:
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
metadata=self.metadata,
)
if result.success:
self._already_sent = True
return True
except Exception as e:
logger.error("Commentary send error: %s", e)
return False
async def _send_or_edit(self, text: str) -> bool:
"""Send or edit the streaming message.
Returns True if the text was successfully delivered (sent or edited),
False otherwise. Callers like the overflow split loop use this to
decide whether to advance past the delivered chunk.
"""
# Strip MEDIA: directives so they don't appear as visible text.
# Media files are delivered as native attachments after the stream
# finishes (via _deliver_media_from_response in gateway/run.py).
text = self._clean_for_display(text)
if not text.strip():
return True # nothing to send is "success"
try:
if self._message_id is not None:
if self._edit_supported:
fix: Telegram streaming — config bridge, not-modified, flood control (#1782) * fix: NameError in OpenCode provider setup (prompt_text -> prompt) The OpenCode Zen and OpenCode Go setup sections used prompt_text() which is undefined. All other providers correctly use the local prompt() function defined in setup.py. Fixes crash during 'hermes setup' when selecting either OpenCode provider. * fix: Telegram streaming — config bridge, not-modified, flood control Three fixes for gateway streaming: 1. Bridge streaming config from config.yaml into gateway runtime. load_gateway_config() now reads the 'streaming' key from config.yaml (same pattern as session_reset, stt, etc.), matching the docs. Previously only gateway.json was read. 2. Handle 'Message is not modified' in Telegram edit_message(). This Telegram API error fires when editing with identical content — a no-op, not a real failure. Previously it returned success=False which made the stream consumer disable streaming entirely. 3. Handle RetryAfter / flood control in Telegram edit_message(). Fast providers can hit Telegram rate limits during streaming. Now waits the requested retry_after duration and retries once, instead of treating it as a fatal edit failure. Also fixed double-edit on stream finish: the consumer now tracks last-sent text and skips redundant edits, preventing the not-modified error at the source. * refactor: make config.yaml the primary gateway config source Eliminates the per-key bridge pattern in load_gateway_config(). Previously gateway.json was the primary source and each config.yaml key needed an individual bridge — easy to forget (streaming was missing, causing garl4546's bug). Now config.yaml is read first and its keys are mapped directly into the GatewayConfig.from_dict() schema. gateway.json is kept as a legacy fallback layer (loaded first, then overwritten by config.yaml keys). If gateway.json exists, a log message suggests migrating. Also: - Removed dead save_gateway_config() (never called anywhere) - Updated CLI help text and send_message error to reference config.yaml instead of gateway.json --------- Co-authored-by: Test <test@test.com>
2026-03-17 10:51:54 -07:00
# Skip if text is identical to what we last sent
if text == self._last_sent_text:
return True
# Edit existing message
result = await self.adapter.edit_message(
chat_id=self.chat_id,
message_id=self._message_id,
content=text,
)
if result.success:
self._already_sent = True
fix: Telegram streaming — config bridge, not-modified, flood control (#1782) * fix: NameError in OpenCode provider setup (prompt_text -> prompt) The OpenCode Zen and OpenCode Go setup sections used prompt_text() which is undefined. All other providers correctly use the local prompt() function defined in setup.py. Fixes crash during 'hermes setup' when selecting either OpenCode provider. * fix: Telegram streaming — config bridge, not-modified, flood control Three fixes for gateway streaming: 1. Bridge streaming config from config.yaml into gateway runtime. load_gateway_config() now reads the 'streaming' key from config.yaml (same pattern as session_reset, stt, etc.), matching the docs. Previously only gateway.json was read. 2. Handle 'Message is not modified' in Telegram edit_message(). This Telegram API error fires when editing with identical content — a no-op, not a real failure. Previously it returned success=False which made the stream consumer disable streaming entirely. 3. Handle RetryAfter / flood control in Telegram edit_message(). Fast providers can hit Telegram rate limits during streaming. Now waits the requested retry_after duration and retries once, instead of treating it as a fatal edit failure. Also fixed double-edit on stream finish: the consumer now tracks last-sent text and skips redundant edits, preventing the not-modified error at the source. * refactor: make config.yaml the primary gateway config source Eliminates the per-key bridge pattern in load_gateway_config(). Previously gateway.json was the primary source and each config.yaml key needed an individual bridge — easy to forget (streaming was missing, causing garl4546's bug). Now config.yaml is read first and its keys are mapped directly into the GatewayConfig.from_dict() schema. gateway.json is kept as a legacy fallback layer (loaded first, then overwritten by config.yaml keys). If gateway.json exists, a log message suggests migrating. Also: - Removed dead save_gateway_config() (never called anywhere) - Updated CLI help text and send_message error to reference config.yaml instead of gateway.json --------- Co-authored-by: Test <test@test.com>
2026-03-17 10:51:54 -07:00
self._last_sent_text = text
# Successful edit — reset flood strike counter
self._flood_strikes = 0
return True
else:
# Edit failed. If this looks like flood control / rate
# limiting, use adaptive backoff: double the edit interval
# and retry on the next cycle. Only permanently disable
# edits after _MAX_FLOOD_STRIKES consecutive failures.
if self._is_flood_error(result):
self._flood_strikes += 1
self._current_edit_interval = min(
self._current_edit_interval * 2, 10.0,
)
logger.debug(
"Flood control on edit (strike %d/%d), "
"backoff interval → %.1fs",
self._flood_strikes,
self._MAX_FLOOD_STRIKES,
self._current_edit_interval,
)
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
# Don't disable edits yet — just slow down.
# Update _last_edit_time so the next edit
# respects the new interval.
self._last_edit_time = time.monotonic()
return False
# Non-flood error OR flood strikes exhausted: enter
# fallback mode — send only the missing tail once the
# final response is available.
logger.debug(
"Edit failed (strikes=%d), entering fallback mode",
self._flood_strikes,
)
self._fallback_prefix = self._visible_prefix()
self._fallback_final_send = True
self._edit_supported = False
self._already_sent = True
# Best-effort: strip the cursor from the last visible
# message so the user doesn't see a stuck ▉.
await self._try_strip_cursor()
return False
else:
# Editing not supported — skip intermediate updates.
# The final response will be sent by the fallback path.
return False
else:
# First message — send new
result = await self.adapter.send(
chat_id=self.chat_id,
content=text,
metadata=self.metadata,
)
if result.success:
if result.message_id:
self._message_id = result.message_id
else:
self._edit_supported = False
self._already_sent = True
fix: Telegram streaming — config bridge, not-modified, flood control (#1782) * fix: NameError in OpenCode provider setup (prompt_text -> prompt) The OpenCode Zen and OpenCode Go setup sections used prompt_text() which is undefined. All other providers correctly use the local prompt() function defined in setup.py. Fixes crash during 'hermes setup' when selecting either OpenCode provider. * fix: Telegram streaming — config bridge, not-modified, flood control Three fixes for gateway streaming: 1. Bridge streaming config from config.yaml into gateway runtime. load_gateway_config() now reads the 'streaming' key from config.yaml (same pattern as session_reset, stt, etc.), matching the docs. Previously only gateway.json was read. 2. Handle 'Message is not modified' in Telegram edit_message(). This Telegram API error fires when editing with identical content — a no-op, not a real failure. Previously it returned success=False which made the stream consumer disable streaming entirely. 3. Handle RetryAfter / flood control in Telegram edit_message(). Fast providers can hit Telegram rate limits during streaming. Now waits the requested retry_after duration and retries once, instead of treating it as a fatal edit failure. Also fixed double-edit on stream finish: the consumer now tracks last-sent text and skips redundant edits, preventing the not-modified error at the source. * refactor: make config.yaml the primary gateway config source Eliminates the per-key bridge pattern in load_gateway_config(). Previously gateway.json was the primary source and each config.yaml key needed an individual bridge — easy to forget (streaming was missing, causing garl4546's bug). Now config.yaml is read first and its keys are mapped directly into the GatewayConfig.from_dict() schema. gateway.json is kept as a legacy fallback layer (loaded first, then overwritten by config.yaml keys). If gateway.json exists, a log message suggests migrating. Also: - Removed dead save_gateway_config() (never called anywhere) - Updated CLI help text and send_message error to reference config.yaml instead of gateway.json --------- Co-authored-by: Test <test@test.com>
2026-03-17 10:51:54 -07:00
self._last_sent_text = text
if not result.message_id:
self._fallback_prefix = self._visible_prefix()
self._fallback_final_send = True
# Sentinel prevents re-entering the first-send path on
# every delta/tool boundary when platforms accept a
# message but do not return an editable message id.
self._message_id = "__no_edit__"
return True
else:
# Initial send failed — disable streaming for this session
self._edit_supported = False
return False
except Exception as e:
logger.error("Stream send/edit error: %s", e)
return False