mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 15:31:38 +08:00
Compare commits
7 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1802bd5e38 | ||
|
|
0b3b1af1f5 | ||
|
|
b9fd4d898d | ||
|
|
e15db23111 | ||
|
|
740e87138f | ||
|
|
2cac7520d6 | ||
|
|
afcecc734c |
@@ -422,6 +422,7 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
|
|
||||||
# Discord message limits
|
# Discord message limits
|
||||||
MAX_MESSAGE_LENGTH = 2000
|
MAX_MESSAGE_LENGTH = 2000
|
||||||
|
_SPLIT_THRESHOLD = 1900 # near the 2000-char split point
|
||||||
|
|
||||||
# Auto-disconnect from voice channel after this many seconds of inactivity
|
# Auto-disconnect from voice channel after this many seconds of inactivity
|
||||||
VOICE_TIMEOUT = 300
|
VOICE_TIMEOUT = 300
|
||||||
@@ -433,6 +434,11 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
self._allowed_user_ids: set = set() # For button approval authorization
|
self._allowed_user_ids: set = set() # For button approval authorization
|
||||||
# Voice channel state (per-guild)
|
# Voice channel state (per-guild)
|
||||||
self._voice_clients: Dict[int, Any] = {} # guild_id -> VoiceClient
|
self._voice_clients: Dict[int, Any] = {} # guild_id -> VoiceClient
|
||||||
|
# Text batching: merge rapid successive messages (Telegram-style)
|
||||||
|
self._text_batch_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||||||
|
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
||||||
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
self._voice_text_channels: Dict[int, int] = {} # guild_id -> text_channel_id
|
self._voice_text_channels: Dict[int, int] = {} # guild_id -> text_channel_id
|
||||||
self._voice_timeout_tasks: Dict[int, asyncio.Task] = {} # guild_id -> timeout task
|
self._voice_timeout_tasks: Dict[int, asyncio.Task] = {} # guild_id -> timeout task
|
||||||
# Phase 2: voice listening
|
# Phase 2: voice listening
|
||||||
@@ -2466,7 +2472,80 @@ class DiscordAdapter(BasePlatformAdapter):
|
|||||||
if thread_id:
|
if thread_id:
|
||||||
self._track_thread(thread_id)
|
self._track_thread(thread_id)
|
||||||
|
|
||||||
await self.handle_message(event)
|
# Only batch plain text messages — commands, media, etc. dispatch
|
||||||
|
# immediately since they won't be split by the Discord client.
|
||||||
|
if msg_type == MessageType.TEXT:
|
||||||
|
self._enqueue_text_event(event)
|
||||||
|
else:
|
||||||
|
await self.handle_message(event)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Text message aggregation (handles Discord client-side splits)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _text_batch_key(self, event: MessageEvent) -> str:
|
||||||
|
"""Session-scoped key for text message batching."""
|
||||||
|
from gateway.session import build_session_key
|
||||||
|
return build_session_key(
|
||||||
|
event.source,
|
||||||
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||||||
|
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||||||
|
"""Buffer a text event and reset the flush timer.
|
||||||
|
|
||||||
|
When Discord splits a long user message at 2000 chars, the chunks
|
||||||
|
arrive within a few hundred milliseconds. This merges them into
|
||||||
|
a single event before dispatching.
|
||||||
|
"""
|
||||||
|
key = self._text_batch_key(event)
|
||||||
|
existing = self._pending_text_batches.get(key)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
self._pending_text_batches[key] = event
|
||||||
|
else:
|
||||||
|
if event.text:
|
||||||
|
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
if event.media_urls:
|
||||||
|
existing.media_urls.extend(event.media_urls)
|
||||||
|
existing.media_types.extend(event.media_types)
|
||||||
|
|
||||||
|
prior_task = self._pending_text_batch_tasks.get(key)
|
||||||
|
if prior_task and not prior_task.done():
|
||||||
|
prior_task.cancel()
|
||||||
|
self._pending_text_batch_tasks[key] = asyncio.create_task(
|
||||||
|
self._flush_text_batch(key)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
|
"""Wait for the quiet period then dispatch the aggregated text.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near Discord's 2000-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
|
current_task = asyncio.current_task()
|
||||||
|
try:
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
event = self._pending_text_batches.pop(key, None)
|
||||||
|
if not event:
|
||||||
|
return
|
||||||
|
logger.info(
|
||||||
|
"[Discord] Flushing text batch %s (%d chars)",
|
||||||
|
key, len(event.text or ""),
|
||||||
|
)
|
||||||
|
await self.handle_message(event)
|
||||||
|
finally:
|
||||||
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
|
self._pending_text_batch_tasks.pop(key, None)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -264,6 +264,7 @@ class FeishuAdapterSettings:
|
|||||||
bot_name: str
|
bot_name: str
|
||||||
dedup_cache_size: int
|
dedup_cache_size: int
|
||||||
text_batch_delay_seconds: float
|
text_batch_delay_seconds: float
|
||||||
|
text_batch_split_delay_seconds: float
|
||||||
text_batch_max_messages: int
|
text_batch_max_messages: int
|
||||||
text_batch_max_chars: int
|
text_batch_max_chars: int
|
||||||
media_batch_delay_seconds: float
|
media_batch_delay_seconds: float
|
||||||
@@ -1014,6 +1015,10 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
"""Feishu/Lark bot adapter."""
|
"""Feishu/Lark bot adapter."""
|
||||||
|
|
||||||
MAX_MESSAGE_LENGTH = 8000
|
MAX_MESSAGE_LENGTH = 8000
|
||||||
|
# Threshold for detecting Feishu client-side message splits.
|
||||||
|
# When a chunk is near the ~4096-char practical limit, a continuation
|
||||||
|
# is almost certain.
|
||||||
|
_SPLIT_THRESHOLD = 4000
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# Lifecycle — init / settings / connect / disconnect
|
# Lifecycle — init / settings / connect / disconnect
|
||||||
@@ -1105,6 +1110,9 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
text_batch_delay_seconds=float(
|
text_batch_delay_seconds=float(
|
||||||
os.getenv("HERMES_FEISHU_TEXT_BATCH_DELAY_SECONDS", str(_DEFAULT_TEXT_BATCH_DELAY_SECONDS))
|
os.getenv("HERMES_FEISHU_TEXT_BATCH_DELAY_SECONDS", str(_DEFAULT_TEXT_BATCH_DELAY_SECONDS))
|
||||||
),
|
),
|
||||||
|
text_batch_split_delay_seconds=float(
|
||||||
|
os.getenv("HERMES_FEISHU_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0")
|
||||||
|
),
|
||||||
text_batch_max_messages=max(
|
text_batch_max_messages=max(
|
||||||
1,
|
1,
|
||||||
int(os.getenv("HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES", str(_DEFAULT_TEXT_BATCH_MAX_MESSAGES))),
|
int(os.getenv("HERMES_FEISHU_TEXT_BATCH_MAX_MESSAGES", str(_DEFAULT_TEXT_BATCH_MAX_MESSAGES))),
|
||||||
@@ -1152,6 +1160,7 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
self._bot_name = settings.bot_name
|
self._bot_name = settings.bot_name
|
||||||
self._dedup_cache_size = settings.dedup_cache_size
|
self._dedup_cache_size = settings.dedup_cache_size
|
||||||
self._text_batch_delay_seconds = settings.text_batch_delay_seconds
|
self._text_batch_delay_seconds = settings.text_batch_delay_seconds
|
||||||
|
self._text_batch_split_delay_seconds = settings.text_batch_split_delay_seconds
|
||||||
self._text_batch_max_messages = settings.text_batch_max_messages
|
self._text_batch_max_messages = settings.text_batch_max_messages
|
||||||
self._text_batch_max_chars = settings.text_batch_max_chars
|
self._text_batch_max_chars = settings.text_batch_max_chars
|
||||||
self._media_batch_delay_seconds = settings.media_batch_delay_seconds
|
self._media_batch_delay_seconds = settings.media_batch_delay_seconds
|
||||||
@@ -2478,8 +2487,10 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
async def _enqueue_text_event(self, event: MessageEvent) -> None:
|
async def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||||||
"""Debounce rapid Feishu text bursts into a single MessageEvent."""
|
"""Debounce rapid Feishu text bursts into a single MessageEvent."""
|
||||||
key = self._text_batch_key(event)
|
key = self._text_batch_key(event)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
existing = self._pending_text_batches.get(key)
|
existing = self._pending_text_batches.get(key)
|
||||||
if existing is None:
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
self._pending_text_batches[key] = event
|
self._pending_text_batches[key] = event
|
||||||
self._pending_text_batch_counts[key] = 1
|
self._pending_text_batch_counts[key] = 1
|
||||||
self._schedule_text_batch_flush(key)
|
self._schedule_text_batch_flush(key)
|
||||||
@@ -2504,6 +2515,7 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
return
|
return
|
||||||
|
|
||||||
existing.text = next_text
|
existing.text = next_text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
existing.timestamp = event.timestamp
|
existing.timestamp = event.timestamp
|
||||||
if event.message_id:
|
if event.message_id:
|
||||||
existing.message_id = event.message_id
|
existing.message_id = event.message_id
|
||||||
@@ -2530,10 +2542,22 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
task_map[key] = asyncio.create_task(flush_fn(key))
|
task_map[key] = asyncio.create_task(flush_fn(key))
|
||||||
|
|
||||||
async def _flush_text_batch(self, key: str) -> None:
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
"""Flush a pending text batch after the quiet period."""
|
"""Flush a pending text batch after the quiet period.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near Feishu's ~4096-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
current_task = asyncio.current_task()
|
current_task = asyncio.current_task()
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(self._text_batch_delay_seconds)
|
# Adaptive delay: if the latest chunk is near the split threshold,
|
||||||
|
# a continuation is almost certain — wait longer.
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
await self._flush_text_batch_now(key)
|
await self._flush_text_batch_now(key)
|
||||||
finally:
|
finally:
|
||||||
if self._pending_text_batch_tasks.get(key) is current_task:
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
|
|||||||
@@ -120,6 +120,11 @@ def check_matrix_requirements() -> bool:
|
|||||||
class MatrixAdapter(BasePlatformAdapter):
|
class MatrixAdapter(BasePlatformAdapter):
|
||||||
"""Gateway adapter for Matrix (any homeserver)."""
|
"""Gateway adapter for Matrix (any homeserver)."""
|
||||||
|
|
||||||
|
# Threshold for detecting Matrix client-side message splits.
|
||||||
|
# When a chunk is near the ~4000-char practical limit, a continuation
|
||||||
|
# is almost certain.
|
||||||
|
_SPLIT_THRESHOLD = 3900
|
||||||
|
|
||||||
def __init__(self, config: PlatformConfig):
|
def __init__(self, config: PlatformConfig):
|
||||||
super().__init__(config, Platform.MATRIX)
|
super().__init__(config, Platform.MATRIX)
|
||||||
|
|
||||||
@@ -172,6 +177,13 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||||||
"MATRIX_REACTIONS", "true"
|
"MATRIX_REACTIONS", "true"
|
||||||
).lower() not in ("false", "0", "no")
|
).lower() not in ("false", "0", "no")
|
||||||
|
|
||||||
|
# Text batching: merge rapid successive messages (Telegram-style).
|
||||||
|
# Matrix clients split long messages around 4000 chars.
|
||||||
|
self._text_batch_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||||||
|
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_MATRIX_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
||||||
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
def _is_duplicate_event(self, event_id) -> bool:
|
def _is_duplicate_event(self, event_id) -> bool:
|
||||||
"""Return True if this event was already processed. Tracks the ID otherwise."""
|
"""Return True if this event was already processed. Tracks the ID otherwise."""
|
||||||
if not event_id:
|
if not event_id:
|
||||||
@@ -1088,7 +1100,81 @@ class MatrixAdapter(BasePlatformAdapter):
|
|||||||
# Acknowledge receipt so the room shows as read (fire-and-forget).
|
# Acknowledge receipt so the room shows as read (fire-and-forget).
|
||||||
self._background_read_receipt(room.room_id, event.event_id)
|
self._background_read_receipt(room.room_id, event.event_id)
|
||||||
|
|
||||||
await self.handle_message(msg_event)
|
# Only batch plain text messages — commands dispatch immediately.
|
||||||
|
if msg_type == MessageType.TEXT:
|
||||||
|
self._enqueue_text_event(msg_event)
|
||||||
|
else:
|
||||||
|
await self.handle_message(msg_event)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Text message aggregation (handles Matrix client-side splits)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _text_batch_key(self, event: MessageEvent) -> str:
|
||||||
|
"""Session-scoped key for text message batching."""
|
||||||
|
from gateway.session import build_session_key
|
||||||
|
return build_session_key(
|
||||||
|
event.source,
|
||||||
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||||||
|
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||||||
|
"""Buffer a text event and reset the flush timer.
|
||||||
|
|
||||||
|
When a Matrix client splits a long message, the chunks arrive within
|
||||||
|
a few hundred milliseconds. This merges them into a single event
|
||||||
|
before dispatching.
|
||||||
|
"""
|
||||||
|
key = self._text_batch_key(event)
|
||||||
|
existing = self._pending_text_batches.get(key)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
self._pending_text_batches[key] = event
|
||||||
|
else:
|
||||||
|
if event.text:
|
||||||
|
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
# Merge any media that might be attached
|
||||||
|
if event.media_urls:
|
||||||
|
existing.media_urls.extend(event.media_urls)
|
||||||
|
existing.media_types.extend(event.media_types)
|
||||||
|
|
||||||
|
# Cancel any pending flush and restart the timer
|
||||||
|
prior_task = self._pending_text_batch_tasks.get(key)
|
||||||
|
if prior_task and not prior_task.done():
|
||||||
|
prior_task.cancel()
|
||||||
|
self._pending_text_batch_tasks[key] = asyncio.create_task(
|
||||||
|
self._flush_text_batch(key)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
|
"""Wait for the quiet period then dispatch the aggregated text.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near Matrix's ~4000-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
|
current_task = asyncio.current_task()
|
||||||
|
try:
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
event = self._pending_text_batches.pop(key, None)
|
||||||
|
if not event:
|
||||||
|
return
|
||||||
|
logger.info(
|
||||||
|
"[Matrix] Flushing text batch %s (%d chars)",
|
||||||
|
key, len(event.text or ""),
|
||||||
|
)
|
||||||
|
await self.handle_message(event)
|
||||||
|
finally:
|
||||||
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
|
self._pending_text_batch_tasks.pop(key, None)
|
||||||
|
|
||||||
async def _on_room_message_media(self, room: Any, event: Any) -> None:
|
async def _on_room_message_media(self, room: Any, event: Any) -> None:
|
||||||
"""Handle incoming media messages (images, audio, video, files)."""
|
"""Handle incoming media messages (images, audio, video, files)."""
|
||||||
|
|||||||
@@ -121,6 +121,9 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
|
|
||||||
# Telegram message limits
|
# Telegram message limits
|
||||||
MAX_MESSAGE_LENGTH = 4096
|
MAX_MESSAGE_LENGTH = 4096
|
||||||
|
# Threshold for detecting Telegram client-side message splits.
|
||||||
|
# When a chunk is near this limit, a continuation is almost certain.
|
||||||
|
_SPLIT_THRESHOLD = 4000
|
||||||
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
MEDIA_GROUP_WAIT_SECONDS = 0.8
|
||||||
|
|
||||||
def __init__(self, config: PlatformConfig):
|
def __init__(self, config: PlatformConfig):
|
||||||
@@ -140,6 +143,7 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
# Buffer rapid text messages so Telegram client-side splits of long
|
# Buffer rapid text messages so Telegram client-side splits of long
|
||||||
# messages are aggregated into a single MessageEvent.
|
# messages are aggregated into a single MessageEvent.
|
||||||
self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
self._text_batch_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||||||
|
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_TELEGRAM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
||||||
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
self._token_lock_identity: Optional[str] = None
|
self._token_lock_identity: Optional[str] = None
|
||||||
@@ -2160,12 +2164,15 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
"""
|
"""
|
||||||
key = self._text_batch_key(event)
|
key = self._text_batch_key(event)
|
||||||
existing = self._pending_text_batches.get(key)
|
existing = self._pending_text_batches.get(key)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
if existing is None:
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
self._pending_text_batches[key] = event
|
self._pending_text_batches[key] = event
|
||||||
else:
|
else:
|
||||||
# Append text from the follow-up chunk
|
# Append text from the follow-up chunk
|
||||||
if event.text:
|
if event.text:
|
||||||
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
# Merge any media that might be attached
|
# Merge any media that might be attached
|
||||||
if event.media_urls:
|
if event.media_urls:
|
||||||
existing.media_urls.extend(event.media_urls)
|
existing.media_urls.extend(event.media_urls)
|
||||||
@@ -2180,10 +2187,22 @@ class TelegramAdapter(BasePlatformAdapter):
|
|||||||
)
|
)
|
||||||
|
|
||||||
async def _flush_text_batch(self, key: str) -> None:
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
"""Wait for the quiet period then dispatch the aggregated text."""
|
"""Wait for the quiet period then dispatch the aggregated text.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near Telegram's 4096-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
current_task = asyncio.current_task()
|
current_task = asyncio.current_task()
|
||||||
try:
|
try:
|
||||||
await asyncio.sleep(self._text_batch_delay_seconds)
|
# Adaptive delay: if the latest chunk is near Telegram's 4096-char
|
||||||
|
# split point, a continuation is almost certain — wait longer.
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
event = self._pending_text_batches.pop(key, None)
|
event = self._pending_text_batches.pop(key, None)
|
||||||
if not event:
|
if not event:
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -143,6 +143,9 @@ class WeComAdapter(BasePlatformAdapter):
|
|||||||
"""WeCom AI Bot adapter backed by a persistent WebSocket connection."""
|
"""WeCom AI Bot adapter backed by a persistent WebSocket connection."""
|
||||||
|
|
||||||
MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH
|
MAX_MESSAGE_LENGTH = MAX_MESSAGE_LENGTH
|
||||||
|
# Threshold for detecting WeCom client-side message splits.
|
||||||
|
# When a chunk is near the 4000-char limit, a continuation is almost certain.
|
||||||
|
_SPLIT_THRESHOLD = 3900
|
||||||
|
|
||||||
def __init__(self, config: PlatformConfig):
|
def __init__(self, config: PlatformConfig):
|
||||||
super().__init__(config, Platform.WECOM)
|
super().__init__(config, Platform.WECOM)
|
||||||
@@ -172,6 +175,13 @@ class WeComAdapter(BasePlatformAdapter):
|
|||||||
self._seen_messages: Dict[str, float] = {}
|
self._seen_messages: Dict[str, float] = {}
|
||||||
self._reply_req_ids: Dict[str, str] = {}
|
self._reply_req_ids: Dict[str, str] = {}
|
||||||
|
|
||||||
|
# Text batching: merge rapid successive messages (Telegram-style).
|
||||||
|
# WeCom clients split long messages around 4000 chars.
|
||||||
|
self._text_batch_delay_seconds = float(os.getenv("HERMES_WECOM_TEXT_BATCH_DELAY_SECONDS", "0.6"))
|
||||||
|
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_WECOM_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
|
||||||
|
self._pending_text_batches: Dict[str, MessageEvent] = {}
|
||||||
|
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Connection lifecycle
|
# Connection lifecycle
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
@@ -519,7 +529,82 @@ class WeComAdapter(BasePlatformAdapter):
|
|||||||
timestamp=datetime.now(tz=timezone.utc),
|
timestamp=datetime.now(tz=timezone.utc),
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.handle_message(event)
|
# Only batch plain text messages — commands, media, etc. dispatch
|
||||||
|
# immediately since they won't be split by the WeCom client.
|
||||||
|
if message_type == MessageType.TEXT:
|
||||||
|
self._enqueue_text_event(event)
|
||||||
|
else:
|
||||||
|
await self.handle_message(event)
|
||||||
|
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
# Text message aggregation (handles WeCom client-side splits)
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _text_batch_key(self, event: MessageEvent) -> str:
|
||||||
|
"""Session-scoped key for text message batching."""
|
||||||
|
from gateway.session import build_session_key
|
||||||
|
return build_session_key(
|
||||||
|
event.source,
|
||||||
|
group_sessions_per_user=self.config.extra.get("group_sessions_per_user", True),
|
||||||
|
thread_sessions_per_user=self.config.extra.get("thread_sessions_per_user", False),
|
||||||
|
)
|
||||||
|
|
||||||
|
def _enqueue_text_event(self, event: MessageEvent) -> None:
|
||||||
|
"""Buffer a text event and reset the flush timer.
|
||||||
|
|
||||||
|
When WeCom splits a long user message at 4000 chars, the chunks
|
||||||
|
arrive within a few hundred milliseconds. This merges them into
|
||||||
|
a single event before dispatching.
|
||||||
|
"""
|
||||||
|
key = self._text_batch_key(event)
|
||||||
|
existing = self._pending_text_batches.get(key)
|
||||||
|
chunk_len = len(event.text or "")
|
||||||
|
if existing is None:
|
||||||
|
event._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
self._pending_text_batches[key] = event
|
||||||
|
else:
|
||||||
|
if event.text:
|
||||||
|
existing.text = f"{existing.text}\n{event.text}" if existing.text else event.text
|
||||||
|
existing._last_chunk_len = chunk_len # type: ignore[attr-defined]
|
||||||
|
# Merge any media that might be attached
|
||||||
|
if event.media_urls:
|
||||||
|
existing.media_urls.extend(event.media_urls)
|
||||||
|
existing.media_types.extend(event.media_types)
|
||||||
|
|
||||||
|
# Cancel any pending flush and restart the timer
|
||||||
|
prior_task = self._pending_text_batch_tasks.get(key)
|
||||||
|
if prior_task and not prior_task.done():
|
||||||
|
prior_task.cancel()
|
||||||
|
self._pending_text_batch_tasks[key] = asyncio.create_task(
|
||||||
|
self._flush_text_batch(key)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _flush_text_batch(self, key: str) -> None:
|
||||||
|
"""Wait for the quiet period then dispatch the aggregated text.
|
||||||
|
|
||||||
|
Uses a longer delay when the latest chunk is near WeCom's 4000-char
|
||||||
|
split point, since a continuation chunk is almost certain.
|
||||||
|
"""
|
||||||
|
current_task = asyncio.current_task()
|
||||||
|
try:
|
||||||
|
pending = self._pending_text_batches.get(key)
|
||||||
|
last_len = getattr(pending, "_last_chunk_len", 0) if pending else 0
|
||||||
|
if last_len >= self._SPLIT_THRESHOLD:
|
||||||
|
delay = self._text_batch_split_delay_seconds
|
||||||
|
else:
|
||||||
|
delay = self._text_batch_delay_seconds
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
event = self._pending_text_batches.pop(key, None)
|
||||||
|
if not event:
|
||||||
|
return
|
||||||
|
logger.info(
|
||||||
|
"[WeCom] Flushing text batch %s (%d chars)",
|
||||||
|
key, len(event.text or ""),
|
||||||
|
)
|
||||||
|
await self.handle_message(event)
|
||||||
|
finally:
|
||||||
|
if self._pending_text_batch_tasks.get(key) is current_task:
|
||||||
|
self._pending_text_batch_tasks.pop(key, None)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _extract_text(body: Dict[str, Any]) -> Tuple[str, Optional[str]]:
|
def _extract_text(body: Dict[str, Any]) -> Tuple[str, Optional[str]]:
|
||||||
|
|||||||
@@ -81,6 +81,7 @@ def adapter(monkeypatch):
|
|||||||
config = PlatformConfig(enabled=True, token="fake-token")
|
config = PlatformConfig(enabled=True, token="fake-token")
|
||||||
adapter = DiscordAdapter(config)
|
adapter = DiscordAdapter(config)
|
||||||
adapter._client = SimpleNamespace(user=SimpleNamespace(id=999))
|
adapter._client = SimpleNamespace(user=SimpleNamespace(id=999))
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
adapter.handle_message = AsyncMock()
|
adapter.handle_message = AsyncMock()
|
||||||
return adapter
|
return adapter
|
||||||
|
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ def adapter(monkeypatch):
|
|||||||
config = PlatformConfig(enabled=True, token="fake-token")
|
config = PlatformConfig(enabled=True, token="fake-token")
|
||||||
adapter = DiscordAdapter(config)
|
adapter = DiscordAdapter(config)
|
||||||
adapter._client = SimpleNamespace(user=SimpleNamespace(id=999))
|
adapter._client = SimpleNamespace(user=SimpleNamespace(id=999))
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
adapter.handle_message = AsyncMock()
|
adapter.handle_message = AsyncMock()
|
||||||
return adapter
|
return adapter
|
||||||
|
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ def adapter():
|
|||||||
fetch_channel=AsyncMock(),
|
fetch_channel=AsyncMock(),
|
||||||
user=SimpleNamespace(id=99999, name="HermesBot"),
|
user=SimpleNamespace(id=99999, name="HermesBot"),
|
||||||
)
|
)
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
return adapter
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ def _make_adapter(tmp_path=None):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
adapter = MatrixAdapter(config)
|
adapter = MatrixAdapter(config)
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
adapter.handle_message = AsyncMock()
|
adapter.handle_message = AsyncMock()
|
||||||
adapter._startup_ts = time.time() - 10 # avoid startup grace filter
|
adapter._startup_ts = time.time() - 10 # avoid startup grace filter
|
||||||
return adapter
|
return adapter
|
||||||
|
|||||||
448
tests/gateway/test_text_batching.py
Normal file
448
tests/gateway/test_text_batching.py
Normal file
@@ -0,0 +1,448 @@
|
|||||||
|
"""Tests for text message batching across all gateway adapters.
|
||||||
|
|
||||||
|
When a user sends a long message, the messaging client splits it at the
|
||||||
|
platform's character limit. Each adapter should buffer rapid successive
|
||||||
|
text messages from the same session and aggregate them before dispatching.
|
||||||
|
|
||||||
|
Covers: Discord, Matrix, WeCom, and the adaptive delay logic for
|
||||||
|
Telegram and Feishu.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import Platform, PlatformConfig
|
||||||
|
from gateway.platforms.base import MessageEvent, MessageType, SessionSource
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Helpers
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_event(
|
||||||
|
text: str,
|
||||||
|
platform: Platform,
|
||||||
|
chat_id: str = "12345",
|
||||||
|
msg_type: MessageType = MessageType.TEXT,
|
||||||
|
) -> MessageEvent:
|
||||||
|
return MessageEvent(
|
||||||
|
text=text,
|
||||||
|
message_type=msg_type,
|
||||||
|
source=SessionSource(platform=platform, chat_id=chat_id, chat_type="dm"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Discord text batching
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_discord_adapter():
|
||||||
|
"""Create a minimal DiscordAdapter for testing text batching."""
|
||||||
|
from gateway.platforms.discord import DiscordAdapter
|
||||||
|
|
||||||
|
config = PlatformConfig(enabled=True, token="test-token")
|
||||||
|
adapter = object.__new__(DiscordAdapter)
|
||||||
|
adapter._platform = Platform.DISCORD
|
||||||
|
adapter.config = config
|
||||||
|
adapter._pending_text_batches = {}
|
||||||
|
adapter._pending_text_batch_tasks = {}
|
||||||
|
adapter._text_batch_delay_seconds = 0.1 # fast for tests
|
||||||
|
adapter._text_batch_split_delay_seconds = 0.3 # fast for tests
|
||||||
|
adapter._active_sessions = {}
|
||||||
|
adapter._pending_messages = {}
|
||||||
|
adapter._message_handler = AsyncMock()
|
||||||
|
adapter.handle_message = AsyncMock()
|
||||||
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
class TestDiscordTextBatching:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_single_message_dispatched_after_delay(self):
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
event = _make_event("hello world", Platform.DISCORD)
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(event)
|
||||||
|
|
||||||
|
# Not dispatched yet
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
# Wait for flush
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
dispatched = adapter.handle_message.call_args[0][0]
|
||||||
|
assert dispatched.text == "hello world"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_split_messages_aggregated(self):
|
||||||
|
"""Two rapid messages from the same chat should be merged."""
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("Part one of a long", Platform.DISCORD))
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
adapter._enqueue_text_event(_make_event("message that was split.", Platform.DISCORD))
|
||||||
|
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
text = adapter.handle_message.call_args[0][0].text
|
||||||
|
assert "Part one" in text
|
||||||
|
assert "split" in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_three_way_split_aggregated(self):
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("chunk 1", Platform.DISCORD))
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
adapter._enqueue_text_event(_make_event("chunk 2", Platform.DISCORD))
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
adapter._enqueue_text_event(_make_event("chunk 3", Platform.DISCORD))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
text = adapter.handle_message.call_args[0][0].text
|
||||||
|
assert "chunk 1" in text
|
||||||
|
assert "chunk 2" in text
|
||||||
|
assert "chunk 3" in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_different_chats_not_merged(self):
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("from A", Platform.DISCORD, chat_id="111"))
|
||||||
|
adapter._enqueue_text_event(_make_event("from B", Platform.DISCORD, chat_id="222"))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert adapter.handle_message.call_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_cleans_up_after_flush(self):
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("test", Platform.DISCORD))
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert len(adapter._pending_text_batches) == 0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adaptive_delay_for_near_limit_chunk(self):
|
||||||
|
"""Chunks near the 2000-char limit should trigger longer delay."""
|
||||||
|
adapter = _make_discord_adapter()
|
||||||
|
# Simulate a chunk near Discord's 2000-char split point
|
||||||
|
long_text = "x" * 1950
|
||||||
|
adapter._enqueue_text_event(_make_event(long_text, Platform.DISCORD))
|
||||||
|
|
||||||
|
# After the short delay (0.1s), should NOT have flushed yet (split delay is 0.3s)
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
# After the split delay, should be flushed
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Matrix text batching
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_matrix_adapter():
|
||||||
|
"""Create a minimal MatrixAdapter for testing text batching."""
|
||||||
|
from gateway.platforms.matrix import MatrixAdapter
|
||||||
|
|
||||||
|
config = PlatformConfig(enabled=True, token="test-token")
|
||||||
|
adapter = object.__new__(MatrixAdapter)
|
||||||
|
adapter._platform = Platform.MATRIX
|
||||||
|
adapter.config = config
|
||||||
|
adapter._pending_text_batches = {}
|
||||||
|
adapter._pending_text_batch_tasks = {}
|
||||||
|
adapter._text_batch_delay_seconds = 0.1
|
||||||
|
adapter._text_batch_split_delay_seconds = 0.3
|
||||||
|
adapter._active_sessions = {}
|
||||||
|
adapter._pending_messages = {}
|
||||||
|
adapter._message_handler = AsyncMock()
|
||||||
|
adapter.handle_message = AsyncMock()
|
||||||
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
class TestMatrixTextBatching:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_single_message_dispatched_after_delay(self):
|
||||||
|
adapter = _make_matrix_adapter()
|
||||||
|
event = _make_event("hello world", Platform.MATRIX)
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(event)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
assert adapter.handle_message.call_args[0][0].text == "hello world"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_split_messages_aggregated(self):
|
||||||
|
adapter = _make_matrix_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("first part", Platform.MATRIX))
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
adapter._enqueue_text_event(_make_event("second part", Platform.MATRIX))
|
||||||
|
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
text = adapter.handle_message.call_args[0][0].text
|
||||||
|
assert "first part" in text
|
||||||
|
assert "second part" in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_different_rooms_not_merged(self):
|
||||||
|
adapter = _make_matrix_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("room A", Platform.MATRIX, chat_id="!aaa:matrix.org"))
|
||||||
|
adapter._enqueue_text_event(_make_event("room B", Platform.MATRIX, chat_id="!bbb:matrix.org"))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert adapter.handle_message.call_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adaptive_delay_for_near_limit_chunk(self):
|
||||||
|
"""Chunks near the 4000-char limit should trigger longer delay."""
|
||||||
|
adapter = _make_matrix_adapter()
|
||||||
|
long_text = "x" * 3950
|
||||||
|
adapter._enqueue_text_event(_make_event(long_text, Platform.MATRIX))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_cleans_up_after_flush(self):
|
||||||
|
adapter = _make_matrix_adapter()
|
||||||
|
adapter._enqueue_text_event(_make_event("test", Platform.MATRIX))
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
assert len(adapter._pending_text_batches) == 0
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# WeCom text batching
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_wecom_adapter():
|
||||||
|
"""Create a minimal WeComAdapter for testing text batching."""
|
||||||
|
from gateway.platforms.wecom import WeComAdapter
|
||||||
|
|
||||||
|
config = PlatformConfig(enabled=True, token="test-token")
|
||||||
|
adapter = object.__new__(WeComAdapter)
|
||||||
|
adapter._platform = Platform.WECOM
|
||||||
|
adapter.config = config
|
||||||
|
adapter._pending_text_batches = {}
|
||||||
|
adapter._pending_text_batch_tasks = {}
|
||||||
|
adapter._text_batch_delay_seconds = 0.1
|
||||||
|
adapter._text_batch_split_delay_seconds = 0.3
|
||||||
|
adapter._active_sessions = {}
|
||||||
|
adapter._pending_messages = {}
|
||||||
|
adapter._message_handler = AsyncMock()
|
||||||
|
adapter.handle_message = AsyncMock()
|
||||||
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
class TestWeComTextBatching:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_single_message_dispatched_after_delay(self):
|
||||||
|
adapter = _make_wecom_adapter()
|
||||||
|
event = _make_event("hello world", Platform.WECOM)
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(event)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
assert adapter.handle_message.call_args[0][0].text == "hello world"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_split_messages_aggregated(self):
|
||||||
|
adapter = _make_wecom_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("first part", Platform.WECOM))
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
adapter._enqueue_text_event(_make_event("second part", Platform.WECOM))
|
||||||
|
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
text = adapter.handle_message.call_args[0][0].text
|
||||||
|
assert "first part" in text
|
||||||
|
assert "second part" in text
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_different_chats_not_merged(self):
|
||||||
|
adapter = _make_wecom_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("chat A", Platform.WECOM, chat_id="chat_a"))
|
||||||
|
adapter._enqueue_text_event(_make_event("chat B", Platform.WECOM, chat_id="chat_b"))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
assert adapter.handle_message.call_count == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adaptive_delay_for_near_limit_chunk(self):
|
||||||
|
"""Chunks near the 4000-char limit should trigger longer delay."""
|
||||||
|
adapter = _make_wecom_adapter()
|
||||||
|
long_text = "x" * 3950
|
||||||
|
adapter._enqueue_text_event(_make_event(long_text, Platform.WECOM))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_batch_cleans_up_after_flush(self):
|
||||||
|
adapter = _make_wecom_adapter()
|
||||||
|
adapter._enqueue_text_event(_make_event("test", Platform.WECOM))
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
assert len(adapter._pending_text_batches) == 0
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Telegram adaptive delay (PR #6891)
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_telegram_adapter():
|
||||||
|
"""Create a minimal TelegramAdapter for testing adaptive delay."""
|
||||||
|
from gateway.platforms.telegram import TelegramAdapter
|
||||||
|
|
||||||
|
config = PlatformConfig(enabled=True, token="test-token")
|
||||||
|
adapter = object.__new__(TelegramAdapter)
|
||||||
|
adapter._platform = Platform.TELEGRAM
|
||||||
|
adapter.config = config
|
||||||
|
adapter._pending_text_batches = {}
|
||||||
|
adapter._pending_text_batch_tasks = {}
|
||||||
|
adapter._text_batch_delay_seconds = 0.1
|
||||||
|
adapter._text_batch_split_delay_seconds = 0.3
|
||||||
|
adapter._active_sessions = {}
|
||||||
|
adapter._pending_messages = {}
|
||||||
|
adapter._message_handler = AsyncMock()
|
||||||
|
adapter.handle_message = AsyncMock()
|
||||||
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
class TestTelegramAdaptiveDelay:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_short_chunk_uses_normal_delay(self):
|
||||||
|
adapter = _make_telegram_adapter()
|
||||||
|
adapter._enqueue_text_event(_make_event("short msg", Platform.TELEGRAM))
|
||||||
|
|
||||||
|
# Should flush after the normal 0.1s delay
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_near_limit_chunk_uses_split_delay(self):
|
||||||
|
"""A chunk near the 4096-char limit should trigger longer delay."""
|
||||||
|
adapter = _make_telegram_adapter()
|
||||||
|
long_text = "x" * 4050 # near the 4096 limit
|
||||||
|
adapter._enqueue_text_event(_make_event(long_text, Platform.TELEGRAM))
|
||||||
|
|
||||||
|
# After the short delay, should NOT have flushed yet
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_not_called()
|
||||||
|
|
||||||
|
# After the split delay, should be flushed
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_split_continuation_merged(self):
|
||||||
|
"""Two near-limit chunks should both be merged."""
|
||||||
|
adapter = _make_telegram_adapter()
|
||||||
|
|
||||||
|
adapter._enqueue_text_event(_make_event("x" * 4050, Platform.TELEGRAM))
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
adapter._enqueue_text_event(_make_event("continuation text", Platform.TELEGRAM))
|
||||||
|
|
||||||
|
# Short chunk arrived → should use normal delay now
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter.handle_message.assert_called_once()
|
||||||
|
text = adapter.handle_message.call_args[0][0].text
|
||||||
|
assert "continuation text" in text
|
||||||
|
|
||||||
|
|
||||||
|
# =====================================================================
|
||||||
|
# Feishu adaptive delay
|
||||||
|
# =====================================================================
|
||||||
|
|
||||||
|
def _make_feishu_adapter():
|
||||||
|
"""Create a minimal FeishuAdapter for testing adaptive delay."""
|
||||||
|
from gateway.platforms.feishu import FeishuAdapter, FeishuBatchState
|
||||||
|
|
||||||
|
config = PlatformConfig(enabled=True, token="test-token")
|
||||||
|
adapter = object.__new__(FeishuAdapter)
|
||||||
|
adapter._platform = Platform.FEISHU
|
||||||
|
adapter.config = config
|
||||||
|
batch_state = FeishuBatchState()
|
||||||
|
adapter._pending_text_batches = batch_state.events
|
||||||
|
adapter._pending_text_batch_tasks = batch_state.tasks
|
||||||
|
adapter._pending_text_batch_counts = batch_state.counts
|
||||||
|
adapter._text_batch_delay_seconds = 0.1
|
||||||
|
adapter._text_batch_split_delay_seconds = 0.3
|
||||||
|
adapter._text_batch_max_messages = 20
|
||||||
|
adapter._text_batch_max_chars = 50000
|
||||||
|
adapter._active_sessions = {}
|
||||||
|
adapter._pending_messages = {}
|
||||||
|
adapter._message_handler = AsyncMock()
|
||||||
|
adapter._handle_message_with_guards = AsyncMock()
|
||||||
|
return adapter
|
||||||
|
|
||||||
|
|
||||||
|
class TestFeishuAdaptiveDelay:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_short_chunk_uses_normal_delay(self):
|
||||||
|
adapter = _make_feishu_adapter()
|
||||||
|
event = _make_event("short msg", Platform.FEISHU)
|
||||||
|
await adapter._enqueue_text_event(event)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter._handle_message_with_guards.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_near_limit_chunk_uses_split_delay(self):
|
||||||
|
"""A chunk near the 4096-char limit should trigger longer delay."""
|
||||||
|
adapter = _make_feishu_adapter()
|
||||||
|
long_text = "x" * 4050
|
||||||
|
event = _make_event(long_text, Platform.FEISHU)
|
||||||
|
await adapter._enqueue_text_event(event)
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter._handle_message_with_guards.assert_not_called()
|
||||||
|
|
||||||
|
await asyncio.sleep(0.25)
|
||||||
|
adapter._handle_message_with_guards.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_split_continuation_merged(self):
|
||||||
|
adapter = _make_feishu_adapter()
|
||||||
|
|
||||||
|
await adapter._enqueue_text_event(_make_event("x" * 4050, Platform.FEISHU))
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
await adapter._enqueue_text_event(_make_event("continuation text", Platform.FEISHU))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.15)
|
||||||
|
adapter._handle_message_with_guards.assert_called_once()
|
||||||
|
text = adapter._handle_message_with_guards.call_args[0][0].text
|
||||||
|
assert "continuation text" in text
|
||||||
@@ -508,6 +508,7 @@ class TestInboundMessages:
|
|||||||
from gateway.platforms.wecom import WeComAdapter
|
from gateway.platforms.wecom import WeComAdapter
|
||||||
|
|
||||||
adapter = WeComAdapter(PlatformConfig(enabled=True))
|
adapter = WeComAdapter(PlatformConfig(enabled=True))
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
adapter.handle_message = AsyncMock()
|
adapter.handle_message = AsyncMock()
|
||||||
adapter._extract_media = AsyncMock(return_value=(["/tmp/test.png"], ["image/png"]))
|
adapter._extract_media = AsyncMock(return_value=(["/tmp/test.png"], ["image/png"]))
|
||||||
|
|
||||||
@@ -539,6 +540,7 @@ class TestInboundMessages:
|
|||||||
from gateway.platforms.wecom import WeComAdapter
|
from gateway.platforms.wecom import WeComAdapter
|
||||||
|
|
||||||
adapter = WeComAdapter(PlatformConfig(enabled=True))
|
adapter = WeComAdapter(PlatformConfig(enabled=True))
|
||||||
|
adapter._text_batch_delay_seconds = 0 # disable batching for tests
|
||||||
adapter.handle_message = AsyncMock()
|
adapter.handle_message = AsyncMock()
|
||||||
adapter._extract_media = AsyncMock(return_value=([], []))
|
adapter._extract_media = AsyncMock(return_value=([], []))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user