mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-28 04:44:56 +08:00
Compare commits
2 Commits
chore/remo
...
openclaw-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47010a7df6 | ||
|
|
db87eb552a |
@@ -429,6 +429,17 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# endpoint) so later sends skip the doomed rich attempt entirely.
|
||||
self._rich_send_disabled: bool = False
|
||||
self._rich_draft_disabled: bool = False
|
||||
# Transient Telegram sendChatAction failures (network blips, 429/5xx)
|
||||
# can happen on every keep-typing tick while the agent is waiting on a
|
||||
# long model call. Back off per chat so a short Telegram-side outage
|
||||
# does not spam the API/logs or burn the keep-typing budget.
|
||||
self._telegram_typing_cooldown_until: Dict[str, float] = {}
|
||||
self._telegram_typing_cooldown_seconds: float = self._coerce_float_extra(
|
||||
"typing_cooldown_seconds",
|
||||
30.0,
|
||||
min_value=1.0,
|
||||
max_value=300.0,
|
||||
)
|
||||
# Buffer rapid/album photo updates so Telegram image bursts are handled
|
||||
# as a single MessageEvent instead of self-interrupting multiple turns.
|
||||
self._media_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_MEDIA_BATCH_DELAY_SECONDS", "0.8"))
|
||||
@@ -912,6 +923,27 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return default
|
||||
return bool(value)
|
||||
|
||||
def _coerce_float_extra(
|
||||
self,
|
||||
key: str,
|
||||
default: float,
|
||||
*,
|
||||
min_value: Optional[float] = None,
|
||||
max_value: Optional[float] = None,
|
||||
) -> float:
|
||||
value = self.config.extra.get(key) if getattr(self.config, "extra", None) else None
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
parsed = float(value)
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
if min_value is not None:
|
||||
parsed = max(parsed, min_value)
|
||||
if max_value is not None:
|
||||
parsed = min(parsed, max_value)
|
||||
return parsed
|
||||
|
||||
def _link_preview_kwargs(self) -> Dict[str, Any]:
|
||||
if not getattr(self, "_disable_link_previews", False):
|
||||
return {}
|
||||
@@ -4808,40 +4840,90 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# Fallback: try as a regular photo
|
||||
return await self.send_image(chat_id, animation_url, caption, reply_to, metadata=metadata)
|
||||
|
||||
@staticmethod
|
||||
def _is_transient_typing_error(exc: Exception) -> bool:
|
||||
"""Return True for Telegram typing errors worth cooling down."""
|
||||
retry_after = getattr(exc, "retry_after", None)
|
||||
if retry_after is not None:
|
||||
return True
|
||||
|
||||
status_code = getattr(exc, "status_code", None) or getattr(exc, "code", None)
|
||||
if isinstance(status_code, int) and (status_code == 429 or status_code >= 500):
|
||||
return True
|
||||
|
||||
text = str(exc).lower()
|
||||
if any(marker in text for marker in ("too many requests", "rate limit", "timed out", "timeout", "temporar")):
|
||||
return True
|
||||
if isinstance(exc, (OSError, TimeoutError, ConnectionError, asyncio.TimeoutError)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _record_typing_cooldown(self, chat_id: str, exc: Exception) -> None:
|
||||
"""Suppress Telegram typing refreshes for this chat after transient failures."""
|
||||
if not hasattr(self, "_telegram_typing_cooldown_until"):
|
||||
self._telegram_typing_cooldown_until = {}
|
||||
loop = asyncio.get_running_loop()
|
||||
retry_after = getattr(exc, "retry_after", None)
|
||||
try:
|
||||
delay = float(retry_after) if retry_after is not None else self._telegram_typing_cooldown_seconds
|
||||
except (TypeError, ValueError):
|
||||
delay = self._telegram_typing_cooldown_seconds
|
||||
delay = max(1.0, min(delay, 300.0))
|
||||
self._telegram_typing_cooldown_until[str(chat_id)] = loop.time() + delay
|
||||
|
||||
def _typing_in_cooldown(self, chat_id: str) -> bool:
|
||||
if not hasattr(self, "_telegram_typing_cooldown_until"):
|
||||
self._telegram_typing_cooldown_until = {}
|
||||
self._telegram_typing_cooldown_seconds = 30.0
|
||||
until = self._telegram_typing_cooldown_until.get(str(chat_id))
|
||||
if until is None:
|
||||
return False
|
||||
if asyncio.get_running_loop().time() < until:
|
||||
return True
|
||||
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
|
||||
return False
|
||||
|
||||
async def send_typing(self, chat_id: str, metadata: Optional[Dict[str, Any]] = None) -> None:
|
||||
"""Send typing indicator."""
|
||||
if self._bot:
|
||||
_is_dm_topic: bool = False
|
||||
message_thread_id: Optional[int] = None
|
||||
try:
|
||||
_typing_thread = self._metadata_thread_id(metadata)
|
||||
_is_dm_topic = bool(metadata and metadata.get("telegram_dm_topic_reply_fallback"))
|
||||
message_thread_id = self._message_thread_id_for_typing(_typing_thread)
|
||||
await self._bot.send_chat_action(
|
||||
chat_id=int(chat_id),
|
||||
action="typing",
|
||||
message_thread_id=message_thread_id,
|
||||
)
|
||||
except Exception as e:
|
||||
# For DM topic lanes, Telegram may reject message_thread_id.
|
||||
# Fall back to sending typing without thread_id so the typing
|
||||
# indicator at least appears in the main DM view.
|
||||
if _is_dm_topic and message_thread_id is not None:
|
||||
try:
|
||||
await self._bot.send_chat_action(
|
||||
chat_id=int(chat_id),
|
||||
action="typing",
|
||||
)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
# Typing failures are non-fatal; log at debug level only.
|
||||
logger.debug(
|
||||
"[%s] Failed to send Telegram typing indicator: %s",
|
||||
self.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
if not self._bot or self._typing_in_cooldown(chat_id):
|
||||
return
|
||||
|
||||
_is_dm_topic: bool = False
|
||||
message_thread_id: Optional[int] = None
|
||||
try:
|
||||
_typing_thread = self._metadata_thread_id(metadata)
|
||||
_is_dm_topic = bool(metadata and metadata.get("telegram_dm_topic_reply_fallback"))
|
||||
message_thread_id = self._message_thread_id_for_typing(_typing_thread)
|
||||
await self._bot.send_chat_action(
|
||||
chat_id=int(chat_id),
|
||||
action="typing",
|
||||
message_thread_id=message_thread_id,
|
||||
)
|
||||
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
|
||||
except Exception as e:
|
||||
# For DM topic lanes, Telegram may reject message_thread_id.
|
||||
# Fall back to sending typing without thread_id so the typing
|
||||
# indicator at least appears in the main DM view.
|
||||
if _is_dm_topic and message_thread_id is not None:
|
||||
try:
|
||||
await self._bot.send_chat_action(
|
||||
chat_id=int(chat_id),
|
||||
action="typing",
|
||||
)
|
||||
self._telegram_typing_cooldown_until.pop(str(chat_id), None)
|
||||
return
|
||||
except Exception as fallback_exc:
|
||||
if self._is_transient_typing_error(fallback_exc):
|
||||
self._record_typing_cooldown(chat_id, fallback_exc)
|
||||
elif self._is_transient_typing_error(e):
|
||||
self._record_typing_cooldown(chat_id, e)
|
||||
# Typing failures are non-fatal; log at debug level only.
|
||||
logger.debug(
|
||||
"[%s] Failed to send Telegram typing indicator: %s",
|
||||
self.name,
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
||||
"""Get information about a Telegram chat."""
|
||||
|
||||
113
tests/gateway/test_telegram_typing_backoff.py
Normal file
113
tests/gateway/test_telegram_typing_backoff.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""Telegram typing indicator transient backoff tests."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
_repo = str(Path(__file__).resolve().parents[2])
|
||||
if _repo not in sys.path:
|
||||
sys.path.insert(0, _repo)
|
||||
|
||||
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
|
||||
mod = MagicMock()
|
||||
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
mod.constants.ParseMode.MARKDOWN = "Markdown"
|
||||
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
mod.constants.ParseMode.HTML = "HTML"
|
||||
mod.constants.ChatType.PRIVATE = "private"
|
||||
mod.constants.ChatType.GROUP = "group"
|
||||
mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
mod.constants.ChatType.CHANNEL = "channel"
|
||||
mod.error.NetworkError = type("NetworkError", (OSError,), {})
|
||||
mod.error.TimedOut = type("TimedOut", (OSError,), {})
|
||||
mod.error.RetryAfter = type("RetryAfter", (Exception,), {"__init__": lambda self, retry_after=1: setattr(self, "retry_after", retry_after)})
|
||||
mod.error.BadRequest = type("BadRequest", (Exception,), {})
|
||||
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
||||
sys.modules.setdefault(name, mod)
|
||||
sys.modules.setdefault("telegram.error", mod.error)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.platforms.telegram import TelegramAdapter
|
||||
|
||||
|
||||
def _make_adapter():
|
||||
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
|
||||
adapter._bot = AsyncMock()
|
||||
return adapter
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_typing_transient_failure_enters_cooldown(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
now = {"value": 1000.0}
|
||||
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: now["value"]})())
|
||||
monkeypatch.setattr(adapter, "_telegram_typing_cooldown_seconds", 30.0, raising=False)
|
||||
|
||||
async def fail_once(**kwargs):
|
||||
raise OSError("temporary telegram network failure")
|
||||
|
||||
adapter._bot.send_chat_action = AsyncMock(side_effect=fail_once)
|
||||
|
||||
await adapter.send_typing("123")
|
||||
await adapter.send_typing("123")
|
||||
|
||||
assert adapter._bot.send_chat_action.await_count == 1
|
||||
assert adapter._telegram_typing_cooldown_until["123"] == pytest.approx(1030.0)
|
||||
|
||||
now["value"] = 1031.0
|
||||
adapter._bot.send_chat_action = AsyncMock(return_value=None)
|
||||
await adapter.send_typing("123")
|
||||
|
||||
assert adapter._bot.send_chat_action.await_count == 1
|
||||
assert "123" not in adapter._telegram_typing_cooldown_until
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_typing_dm_topic_fallback_success_does_not_cool_down(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: 10.0})())
|
||||
|
||||
calls = []
|
||||
|
||||
async def send_chat_action(**kwargs):
|
||||
calls.append(kwargs)
|
||||
if "message_thread_id" in kwargs:
|
||||
raise RuntimeError("message thread not found")
|
||||
return None
|
||||
|
||||
adapter._bot.send_chat_action = AsyncMock(side_effect=send_chat_action)
|
||||
|
||||
await adapter.send_typing(
|
||||
"123",
|
||||
metadata={"thread_id": "99", "telegram_dm_topic_reply_fallback": True},
|
||||
)
|
||||
|
||||
assert len(calls) == 2
|
||||
assert "123" not in adapter._telegram_typing_cooldown_until
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_typing_bad_thread_failure_does_not_cool_down(monkeypatch):
|
||||
adapter = _make_adapter()
|
||||
monkeypatch.setattr("gateway.platforms.telegram.asyncio.get_running_loop", lambda: type("Loop", (), {"time": lambda self: 10.0})())
|
||||
|
||||
async def bad_request(**kwargs):
|
||||
raise ValueError("message thread not found")
|
||||
|
||||
adapter._bot.send_chat_action = AsyncMock(side_effect=bad_request)
|
||||
|
||||
await adapter.send_typing("123", metadata={"thread_id": "99"})
|
||||
await adapter.send_typing("123", metadata={"thread_id": "99"})
|
||||
|
||||
assert adapter._bot.send_chat_action.await_count == 2
|
||||
assert "123" not in adapter._telegram_typing_cooldown_until
|
||||
Reference in New Issue
Block a user