Compare commits

...

2 Commits

Author SHA1 Message Date
Teknium
47010a7df6 fix: support bare Telegram adapters in typing cooldown 2026-06-14 17:18:11 -07:00
Teknium
db87eb552a fix: cool down transient Telegram typing failures
Port from openclaw/openclaw#93020: add per-chat cooldown for transient sendChatAction failures so keep-typing refreshes do not hammer Telegram during network blips or rate limits.
2026-06-14 17:06:06 -07:00
2 changed files with 227 additions and 32 deletions

View File

@@ -429,6 +429,17 @@ class TelegramAdapter(BasePlatformAdapter):
# endpoint) so later sends skip the doomed rich attempt entirely.
self._rich_send_disabled: bool = False
self._rich_draft_disabled: bool = False
# Transient Telegram sendChatAction failures (network blips, 429/5xx)
# can happen on every keep-typing tick while the agent is waiting on a
# long model call. Back off per chat so a short Telegram-side outage
# does not spam the API/logs or burn the keep-typing budget.
self._telegram_typing_cooldown_until: Dict[str, float] = {}
self._telegram_typing_cooldown_seconds: float = self._coerce_float_extra(
"typing_cooldown_seconds",
30.0,
min_value=1.0,
max_value=300.0,
)
# Buffer rapid/album photo updates so Telegram image bursts are handled
# as a single MessageEvent instead of self-interrupting multiple turns.
self._media_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_MEDIA_BATCH_DELAY_SECONDS", "0.8"))
@@ -912,6 +923,27 @@ class TelegramAdapter(BasePlatformAdapter):
return default
return bool(value)
def _coerce_float_extra(
self,
key: str,
default: float,
*,
min_value: Optional[float] = None,
max_value: Optional[float] = None,
) -> float:
value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None
if value is None:
return default
try:
parsed = float(value)
except (TypeError, ValueError):
return default
if min_value is not None:
parsed = max(parsed, min_value)
if max_value is not None:
parsed = min(parsed, max_value)
return parsed
def _link_preview_kwargs(self) -> Dict[str, Any]:
if not getattr(self, "_disable_link_previews", False):
return {}
@@ -4808,40 +4840,90 @@ class TelegramAdapter(BasePlatformAdapter):
# Fallback: try as a regular photo
return await self.send_image(chat_id, animation_url, caption, reply_to, metadata=metadata)
@staticmethod
def _is_transient_typing_error(exc: Exception) -> bool:
"""Return True for Telegram typing errors worth cooling down."""
retry_after = getattr(exc, "retry_after", None)
if retry_after is not None:
return True
status_code = getattr(exc, "status_code", None) or getattr(exc, "code", None)
if isinstance(status_code, int) and (status_code == 429 or status_code >= 500):
return True
text = str(exc).lower()
if any(marker in text for marker in ("too many requests", "rate limit", "timed out", "timeout", "temporar")):
return True
if isinstance(exc, (OSError, TimeoutError, ConnectionError, asyncio.TimeoutError)):
return True
return False
def _record_typing_cooldown(self, chat_id: str, exc: Exception) -> None:
"""Suppress Telegram typing refreshes for this chat after transient failures."""
if not hasattr(self, "_telegram_typing_cooldown_until"):
self._telegram_typing_cooldown_until = {}
loop = asyncio.get_running_loop()
retry_after = getattr(exc, "retry_after", None)
try:
delay = float(retry_after) if retry_after is not None else self._telegram_typing_cooldown_seconds
except (TypeError, ValueError):
delay = self._telegram_typing_cooldown_seconds
delay = max(1.0, min(delay, 300.0))
self._telegram_typing_cooldown_until[str(chat_id)] = loop.time() + delay
def _typing_in_cooldown(self, chat_id: str) -> bool:
if not hasattr(self, "_telegram_typing_cooldown_until"):
self._telegram_typing_cooldown_until = {}
self._telegram_typing_cooldown_seconds = 30.0
until = self._telegram_typing_cooldown_until.get(str(chat_id))
if until is None:
return False
if asyncio.get_running_loop().time() < until:
return True
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
return False
async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
"""Send typing indicator."""
if self._bot:
_is_dm_topic: bool = False
message_thread_id: Optional[int] = None
try:
_typing_thread = self._metadata_thread_id(metadata)
_is_dm_topic = bool(metadata and metadata.get("telegram_dm_topic_reply_fallback"))
message_thread_id = self._message_thread_id_for_typing(_typing_thread)
await self._bot.send_chat_action(
chat_id=int(chat_id),
action="typing",
message_thread_id=message_thread_id,
)
except Exception as e:
# For DM topic lanes, Telegram may reject message_thread_id.
# Fall back to sending typing without thread_id so the typing
# indicator at least appears in the main DM view.
if _is_dm_topic and message_thread_id is not None:
try:
await self._bot.send_chat_action(
chat_id=int(chat_id),
action="typing",
)
return
except Exception:
pass
# Typing failures are non-fatal; log at debug level only.
logger.debug(
"[%s] Failed to send Telegram typing indicator: %s",
self.name,
e,
exc_info=True,
)
if not self._bot or self._typing_in_cooldown(chat_id):
return
_is_dm_topic: bool = False
message_thread_id: Optional[int] = None
try:
_typing_thread = self._metadata_thread_id(metadata)
_is_dm_topic = bool(metadata and metadata.get("telegram_dm_topic_reply_fallback"))
message_thread_id = self._message_thread_id_for_typing(_typing_thread)
await self._bot.send_chat_action(
chat_id=int(chat_id),
action="typing",
message_thread_id=message_thread_id,
)
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
except Exception as e:
# For DM topic lanes, Telegram may reject message_thread_id.
# Fall back to sending typing without thread_id so the typing
# indicator at least appears in the main DM view.
if _is_dm_topic and message_thread_id is not None:
try:
await self._bot.send_chat_action(
chat_id=int(chat_id),
action="typing",
)
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
return
except Exception as fallback_exc:
if self._is_transient_typing_error(fallback_exc):
self._record_typing_cooldown(chat_id, fallback_exc)
elif self._is_transient_typing_error(e):
self._record_typing_cooldown(chat_id, e)
# Typing failures are non-fatal; log at debug level only.
logger.debug(
"[%s] Failed to send Telegram typing indicator: %s",
self.name,
e,
exc_info=True,
)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Get information about a Telegram chat."""

View File

@@ -0,0 +1,113 @@
"""Telegram typing indicator transient backoff tests."""
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
_repo = str(Path(__file__).resolve().parents[2])
if _repo not in sys.path:
sys.path.insert(0, _repo)
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN = "Markdown"
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ParseMode.HTML = "HTML"
mod.constants.ChatType.PRIVATE = "private"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.RetryAfter = type("RetryAfter", (Exception,), {"__init__": lambda self, retry_after=1: setattr(self, "retry_after", retry_after)})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from gateway.config import PlatformConfig
from gateway.platforms.telegram import TelegramAdapter
def _make_adapter():
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
adapter._bot = AsyncMock()
return adapter
@pytest.mark.asyncio
async def test_typing_transient_failure_enters_cooldown(monkeypatch):
adapter = _make_adapter()
now = {"value": 1000.0}
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: now["value"]})())
monkeypatch.setattr(adapter, "_telegram_typing_cooldown_seconds", 30.0, raising=False)
async def fail_once(**kwargs):
raise OSError("temporary telegram network failure")
adapter._bot.send_chat_action = AsyncMock(side_effect=fail_once)
await adapter.send_typing("123")
await adapter.send_typing("123")
assert adapter._bot.send_chat_action.await_count == 1
assert adapter._telegram_typing_cooldown_until["123"] == pytest.approx(1030.0)
now["value"] = 1031.0
adapter._bot.send_chat_action = AsyncMock(return_value=None)
await adapter.send_typing("123")
assert adapter._bot.send_chat_action.await_count == 1
assert "123" not in adapter._telegram_typing_cooldown_until
@pytest.mark.asyncio
async def test_typing_dm_topic_fallback_success_does_not_cool_down(monkeypatch):
adapter = _make_adapter()
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: 10.0})())
calls = []
async def send_chat_action(**kwargs):
calls.append(kwargs)
if "message_thread_id" in kwargs:
raise RuntimeError("message thread not found")
return None
adapter._bot.send_chat_action = AsyncMock(side_effect=send_chat_action)
await adapter.send_typing(
"123",
metadata={"thread_id": "99", "telegram_dm_topic_reply_fallback": True},
)
assert len(calls) == 2
assert "123" not in adapter._telegram_typing_cooldown_until
@pytest.mark.asyncio
async def test_typing_bad_thread_failure_does_not_cool_down(monkeypatch):
adapter = _make_adapter()
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: 10.0})())
async def bad_request(**kwargs):
raise ValueError("message thread not found")
adapter._bot.send_chat_action = AsyncMock(side_effect=bad_request)
await adapter.send_typing("123", metadata={"thread_id": "99"})
await adapter.send_typing("123", metadata={"thread_id": "99"})
assert adapter._bot.send_chat_action.await_count == 2
assert "123" not in adapter._telegram_typing_cooldown_until