mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 15:01:34 +08:00
Compare commits
2 Commits
fix/dashbo
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
92dc9f67de | ||
|
|
3304e57aeb |
@@ -1230,6 +1230,32 @@ class BasePlatformAdapter(ABC):
|
||||
|
||||
return media, cleaned
|
||||
|
||||
def extract_inline_keyboard(
|
||||
self, text: str,
|
||||
) -> Tuple[Optional[list], str]:
|
||||
"""Extract inline keyboard directives from response text.
|
||||
|
||||
Returns:
|
||||
Tuple of (parsed keyboard data or None, cleaned text with tags removed).
|
||||
|
||||
The base implementation is a no-op — override in platform adapters
|
||||
that support inline keyboards (e.g. Telegram).
|
||||
"""
|
||||
return None, text
|
||||
|
||||
async def attach_inline_keyboard(
|
||||
self,
|
||||
chat_id: str,
|
||||
message_id: str,
|
||||
buttons: list,
|
||||
) -> "SendResult":
|
||||
"""Attach an inline keyboard to an existing message.
|
||||
|
||||
The base implementation is a no-op — override in platform adapters
|
||||
that support inline keyboards (e.g. Telegram).
|
||||
"""
|
||||
return SendResult(success=False, error="Not supported")
|
||||
|
||||
@staticmethod
|
||||
def extract_local_files(content: str) -> Tuple[List[str], str]:
|
||||
"""
|
||||
@@ -1635,6 +1661,19 @@ class BasePlatformAdapter(ABC):
|
||||
# Strip any remaining internal directives from message body (fixes #1561)
|
||||
text_content = text_content.replace("[[audio_as_voice]]", "").strip()
|
||||
text_content = re.sub(r"MEDIA:\s*\S+", "", text_content).strip()
|
||||
|
||||
# Extract inline keyboard tags (e.g. [KEYBOARD: ...] blocks)
|
||||
inline_keyboard = None
|
||||
if text_content:
|
||||
try:
|
||||
inline_keyboard, text_content = self.extract_inline_keyboard(text_content)
|
||||
except Exception as kb_err:
|
||||
logger.warning("[%s] Failed to extract inline keyboard: %s", self.name, kb_err)
|
||||
# If the response was only a keyboard block, provide minimal
|
||||
# placeholder text so there's a message to attach buttons to.
|
||||
if inline_keyboard and not text_content:
|
||||
text_content = "Please choose:"
|
||||
|
||||
if images:
|
||||
logger.info("[%s] extract_images found %d image(s) in response (%d chars)", self.name, len(images), len(response))
|
||||
|
||||
@@ -1691,6 +1730,23 @@ class BasePlatformAdapter(ABC):
|
||||
)
|
||||
_record_delivery(result)
|
||||
|
||||
# Attach inline keyboard to the last message chunk
|
||||
if inline_keyboard and result.success and result.message_id:
|
||||
target_id = result.message_id
|
||||
# For chunked messages, attach to the very last chunk
|
||||
if isinstance(getattr(result, "raw_response", None), dict):
|
||||
ids = result.raw_response.get("message_ids")
|
||||
if isinstance(ids, list) and ids:
|
||||
target_id = ids[-1]
|
||||
try:
|
||||
await self.attach_inline_keyboard(
|
||||
chat_id=event.source.chat_id,
|
||||
message_id=str(target_id),
|
||||
buttons=inline_keyboard,
|
||||
)
|
||||
except Exception as kb_err:
|
||||
logger.warning("[%s] Failed to attach inline keyboard: %s", self.name, kb_err)
|
||||
|
||||
# Human-like pacing delay between text and media
|
||||
human_delay = self._get_human_delay()
|
||||
|
||||
|
||||
@@ -12,7 +12,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import Dict, List, Optional, Any
|
||||
from typing import Dict, List, Optional, Any, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -1021,6 +1021,92 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
)
|
||||
return SendResult(success=False, error=str(e))
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Inline keyboard support (extract from agent output → attach to msg)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def extract_inline_keyboard(
|
||||
self, text: str,
|
||||
) -> Tuple[Optional[List[List[Tuple[str, str]]]], str]:
|
||||
"""Extract a ``[KEYBOARD: ...]`` block from agent output.
|
||||
|
||||
Syntax produced by the agent::
|
||||
|
||||
[KEYBOARD:
|
||||
✅ Confirm=ck:confirm | ❌ Cancel=ck:cancel
|
||||
🔄 Later=ck:later
|
||||
]
|
||||
|
||||
Each line becomes one row; ``|`` separates buttons within a row.
|
||||
Only ``ck:``-prefixed callback data is accepted (custom-keyboard
|
||||
namespace). Callbacks exceeding Telegram's 64-byte limit are
|
||||
silently skipped.
|
||||
|
||||
Returns:
|
||||
``(buttons, cleaned_text)`` — *buttons* is ``None`` when no
|
||||
valid keyboard block was found.
|
||||
"""
|
||||
match = re.search(r'\[KEYBOARD:\s*\n(.*?)\]', text, re.DOTALL)
|
||||
if not match:
|
||||
return None, text
|
||||
|
||||
raw = match.group(1).strip()
|
||||
cleaned = (text[:match.start()] + text[match.end():]).strip()
|
||||
|
||||
buttons: List[List[Tuple[str, str]]] = []
|
||||
for line in raw.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
row: List[Tuple[str, str]] = []
|
||||
for item in line.split("|"):
|
||||
item = item.strip()
|
||||
if "=" not in item:
|
||||
continue
|
||||
label, callback = item.rsplit("=", 1)
|
||||
label, callback = label.strip(), callback.strip()
|
||||
if not label or not callback.startswith("ck:"):
|
||||
continue
|
||||
if len(callback.encode("utf-8")) > 64:
|
||||
logger.warning(
|
||||
"[%s] Skipping keyboard callback over 64 bytes: %s",
|
||||
self.name, callback,
|
||||
)
|
||||
continue
|
||||
row.append((label, callback))
|
||||
if row:
|
||||
buttons.append(row)
|
||||
|
||||
return (buttons or None), cleaned
|
||||
|
||||
async def attach_inline_keyboard(
|
||||
self,
|
||||
chat_id: str,
|
||||
message_id: str,
|
||||
buttons: List[List[Tuple[str, str]]],
|
||||
) -> SendResult:
|
||||
"""Attach an inline keyboard to an existing Telegram message."""
|
||||
if not self._bot:
|
||||
return SendResult(success=False, error="Not connected")
|
||||
try:
|
||||
keyboard = InlineKeyboardMarkup([
|
||||
[InlineKeyboardButton(label, callback_data=data)
|
||||
for label, data in row]
|
||||
for row in buttons
|
||||
])
|
||||
await self._bot.edit_message_reply_markup(
|
||||
chat_id=int(chat_id),
|
||||
message_id=int(message_id),
|
||||
reply_markup=keyboard,
|
||||
)
|
||||
return SendResult(success=True, message_id=message_id)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"[%s] Failed to attach inline keyboard to msg %s: %s",
|
||||
self.name, message_id, e, exc_info=True,
|
||||
)
|
||||
return SendResult(success=False, error=str(e))
|
||||
|
||||
async def send_update_prompt(
|
||||
self, chat_id: str, prompt: str, default: str = "",
|
||||
session_key: str = "",
|
||||
@@ -1411,6 +1497,76 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# Catch-all (e.g. page counter button "mx:noop")
|
||||
await query.answer()
|
||||
|
||||
async def _handle_custom_keyboard_callback(
|
||||
self, query: Any, data: str,
|
||||
) -> None:
|
||||
"""Re-inject a ``ck:`` callback as a virtual user message.
|
||||
|
||||
When the user taps an inline keyboard button whose callback_data
|
||||
starts with ``ck:``, we acknowledge the tap, edit the message to
|
||||
show which option was selected (removing the keyboard), and feed
|
||||
the raw callback string back into the normal message handler so
|
||||
the agent can act on it.
|
||||
"""
|
||||
if not query or not query.message:
|
||||
return
|
||||
|
||||
try:
|
||||
await query.answer()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Find the label of the clicked button from the original keyboard
|
||||
clicked_label = data
|
||||
try:
|
||||
markup = query.message.reply_markup
|
||||
if markup and markup.inline_keyboard:
|
||||
for row in markup.inline_keyboard:
|
||||
for btn in row:
|
||||
if btn.callback_data == data:
|
||||
clicked_label = btn.text
|
||||
break
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Edit the message: keep original text, append selection, remove keyboard
|
||||
try:
|
||||
original_text = query.message.text or ""
|
||||
user_display = getattr(query.from_user, "first_name", "User")
|
||||
updated_text = f"{original_text}\n\n> {user_display} selected: {clicked_label}"
|
||||
await query.edit_message_text(
|
||||
text=updated_text,
|
||||
reply_markup=None,
|
||||
)
|
||||
except Exception:
|
||||
# Fallback: just remove the keyboard
|
||||
try:
|
||||
await query.edit_message_reply_markup(reply_markup=None)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Re-inject as a virtual user message.
|
||||
# We use _build_message_event(query.message) to inherit the correct
|
||||
# chat/thread context (the bot's reply lives in the same chat).
|
||||
# message_id is intentionally kept as the bot's message so the
|
||||
# agent's response threads off the keyboard message (better UX).
|
||||
from_user = getattr(query, "from_user", None)
|
||||
if not from_user:
|
||||
logger.warning("[%s] ck: callback has no from_user, cannot re-inject", self.name)
|
||||
return
|
||||
|
||||
try:
|
||||
event = self._build_message_event(query.message, MessageType.TEXT)
|
||||
event.text = data
|
||||
event.source.user_id = str(from_user.id)
|
||||
event.source.user_name = from_user.full_name
|
||||
await self.handle_message(event)
|
||||
except Exception as exc:
|
||||
logger.error(
|
||||
"[%s] Failed to re-inject keyboard callback %r: %s",
|
||||
self.name, data, exc, exc_info=True,
|
||||
)
|
||||
|
||||
async def _handle_callback_query(
|
||||
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
|
||||
) -> None:
|
||||
@@ -1427,6 +1583,11 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
await self._handle_model_picker_callback(query, data, chat_id)
|
||||
return
|
||||
|
||||
# --- Custom inline keyboard callbacks (ck:...) ---
|
||||
if data.startswith("ck:"):
|
||||
await self._handle_custom_keyboard_callback(query, data)
|
||||
return
|
||||
|
||||
# --- Exec approval callbacks (ea:choice:id) ---
|
||||
if data.startswith("ea:"):
|
||||
parts = data.split(":", 2)
|
||||
|
||||
@@ -233,6 +233,19 @@ class GatewayStreamConsumer:
|
||||
self._last_edit_time = time.monotonic()
|
||||
|
||||
if got_done:
|
||||
# Extract inline keyboard tags before the final delivery
|
||||
# so the raw [KEYBOARD:...] block is never shown to users.
|
||||
inline_keyboard = None
|
||||
if self._accumulated:
|
||||
try:
|
||||
inline_keyboard, self._accumulated = self.adapter.extract_inline_keyboard(self._accumulated)
|
||||
except Exception as kb_err:
|
||||
logger.warning("Stream: failed to extract keyboard tag: %s", kb_err)
|
||||
# Keyboard-only response: provide placeholder so there's
|
||||
# a message to attach buttons to.
|
||||
if inline_keyboard and not self._accumulated:
|
||||
self._accumulated = "Please choose:"
|
||||
|
||||
# Final edit without cursor. If progressive editing failed
|
||||
# mid-stream, send a single continuation/fallback message
|
||||
# here instead of letting the base gateway path send the
|
||||
@@ -246,6 +259,20 @@ class GatewayStreamConsumer:
|
||||
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
||||
elif not self._already_sent:
|
||||
self._final_response_sent = await self._send_or_edit(self._accumulated)
|
||||
|
||||
# Attach inline keyboard to the final streamed message
|
||||
if (inline_keyboard
|
||||
and self._message_id
|
||||
and self._message_id != "__no_edit__"):
|
||||
try:
|
||||
await self.adapter.attach_inline_keyboard(
|
||||
chat_id=self.chat_id,
|
||||
message_id=str(self._message_id),
|
||||
buttons=inline_keyboard,
|
||||
)
|
||||
except Exception as kb_err:
|
||||
logger.warning("Stream: failed to attach keyboard: %s", kb_err)
|
||||
|
||||
return
|
||||
|
||||
if commentary_text is not None:
|
||||
@@ -287,6 +314,8 @@ class GatewayStreamConsumer:
|
||||
# Matches the simple cleanup regex used by the non-streaming path in
|
||||
# gateway/platforms/base.py for post-processing.
|
||||
_MEDIA_RE = re.compile(r'''[`"']?MEDIA:\s*\S+[`"']?''')
|
||||
# Pattern to strip complete [KEYBOARD: ...] blocks from streaming display.
|
||||
_KEYBOARD_RE = re.compile(r'\[KEYBOARD:\s*\n.*?\]', re.DOTALL)
|
||||
|
||||
@staticmethod
|
||||
def _clean_for_display(text: str) -> str:
|
||||
@@ -299,10 +328,17 @@ class GatewayStreamConsumer:
|
||||
stream finishes — we just need to hide the raw directives from the
|
||||
user.
|
||||
"""
|
||||
if "MEDIA:" not in text and "[[audio_as_voice]]" not in text:
|
||||
if ("MEDIA:" not in text
|
||||
and "[[audio_as_voice]]" not in text
|
||||
and "[KEYBOARD:" not in text):
|
||||
return text
|
||||
cleaned = text.replace("[[audio_as_voice]]", "")
|
||||
cleaned = GatewayStreamConsumer._MEDIA_RE.sub("", cleaned)
|
||||
# Strip complete [KEYBOARD:...] blocks; if the closing ']' hasn't
|
||||
# arrived yet, hide everything from the opening tag onward.
|
||||
cleaned = GatewayStreamConsumer._KEYBOARD_RE.sub("", cleaned)
|
||||
if "[KEYBOARD:" in cleaned:
|
||||
cleaned = cleaned.split("[KEYBOARD:", 1)[0]
|
||||
# Collapse excessive blank lines left behind by removed tags
|
||||
cleaned = re.sub(r'\n{3,}', '\n\n', cleaned)
|
||||
# Strip trailing whitespace/newlines but preserve leading content
|
||||
|
||||
405
tests/gateway/test_telegram_inline_keyboard.py
Normal file
405
tests/gateway/test_telegram_inline_keyboard.py
Normal file
@@ -0,0 +1,405 @@
|
||||
"""Tests for Telegram inline keyboard support (PR #8899).
|
||||
|
||||
Covers:
|
||||
- extract_inline_keyboard parser (single/multi-row, edge cases)
|
||||
- attach_inline_keyboard (edit_message_reply_markup call)
|
||||
- _handle_custom_keyboard_callback (re-injection flow)
|
||||
- _clean_for_display keyboard stripping (streaming)
|
||||
- Base class no-ops
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Ensure repo root importable
|
||||
# ---------------------------------------------------------------------------
|
||||
_repo = str(Path(__file__).resolve().parents[2])
|
||||
if _repo not in sys.path:
|
||||
sys.path.insert(0, _repo)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Minimal Telegram mock
|
||||
# ---------------------------------------------------------------------------
|
||||
def _ensure_telegram_mock():
|
||||
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
|
||||
return
|
||||
|
||||
mod = MagicMock()
|
||||
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
|
||||
mod.constants.ParseMode.MARKDOWN = "Markdown"
|
||||
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
|
||||
mod.constants.ParseMode.HTML = "HTML"
|
||||
mod.constants.ChatType.PRIVATE = "private"
|
||||
mod.constants.ChatType.GROUP = "group"
|
||||
mod.constants.ChatType.SUPERGROUP = "supergroup"
|
||||
mod.constants.ChatType.CHANNEL = "channel"
|
||||
mod.error.NetworkError = type("NetworkError", (OSError,), {})
|
||||
mod.error.TimedOut = type("TimedOut", (OSError,), {})
|
||||
mod.error.BadRequest = type("BadRequest", (Exception,), {})
|
||||
|
||||
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
|
||||
sys.modules.setdefault(name, mod)
|
||||
sys.modules.setdefault("telegram.error", mod.error)
|
||||
|
||||
|
||||
_ensure_telegram_mock()
|
||||
|
||||
from gateway.platforms.telegram import TelegramAdapter
|
||||
from gateway.config import PlatformConfig
|
||||
from gateway.stream_consumer import GatewayStreamConsumer
|
||||
|
||||
|
||||
def _make_adapter():
|
||||
config = PlatformConfig(enabled=True, token="test-token")
|
||||
adapter = TelegramAdapter(config)
|
||||
adapter._bot = AsyncMock()
|
||||
adapter._app = MagicMock()
|
||||
return adapter
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# extract_inline_keyboard — parser tests
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class TestExtractInlineKeyboard:
|
||||
"""Test the [KEYBOARD: ...] parser on TelegramAdapter."""
|
||||
|
||||
def test_no_keyboard_block(self):
|
||||
"""Text without a keyboard block returns (None, original)."""
|
||||
adapter = _make_adapter()
|
||||
text = "Hello, how can I help?"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is None
|
||||
assert cleaned == text
|
||||
|
||||
def test_single_row(self):
|
||||
"""Single row with two buttons."""
|
||||
adapter = _make_adapter()
|
||||
text = "Choose:\n[KEYBOARD:\n✅ Yes=ck:yes | ❌ No=ck:no\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert len(buttons) == 1 # one row
|
||||
assert len(buttons[0]) == 2 # two buttons
|
||||
assert buttons[0][0] == ("✅ Yes", "ck:yes")
|
||||
assert buttons[0][1] == ("❌ No", "ck:no")
|
||||
assert "KEYBOARD" not in cleaned
|
||||
assert "Choose:" in cleaned
|
||||
|
||||
def test_multi_row(self):
|
||||
"""Multiple rows (each line = one row)."""
|
||||
adapter = _make_adapter()
|
||||
text = "Pick:\n[KEYBOARD:\n✅ Confirm=ck:confirm | ❌ Cancel=ck:cancel\n🔄 Later=ck:later\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert len(buttons) == 2
|
||||
assert len(buttons[0]) == 2
|
||||
assert len(buttons[1]) == 1
|
||||
assert buttons[1][0] == ("🔄 Later", "ck:later")
|
||||
|
||||
def test_empty_keyboard_block(self):
|
||||
"""Keyboard block with no valid buttons returns (None, cleaned)."""
|
||||
adapter = _make_adapter()
|
||||
text = "Hello\n[KEYBOARD:\n\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is None
|
||||
assert "KEYBOARD" not in cleaned
|
||||
|
||||
def test_missing_ck_prefix_skipped(self):
|
||||
"""Buttons without the ck: prefix are silently skipped."""
|
||||
adapter = _make_adapter()
|
||||
text = "[KEYBOARD:\nOK=confirm | Cancel=ck:cancel\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert len(buttons) == 1
|
||||
assert len(buttons[0]) == 1 # only the ck: one
|
||||
assert buttons[0][0] == ("Cancel", "ck:cancel")
|
||||
|
||||
def test_oversized_callback_skipped(self):
|
||||
"""Callbacks exceeding 64 bytes are skipped."""
|
||||
adapter = _make_adapter()
|
||||
long_data = "ck:" + "x" * 65 # 68 bytes > 64
|
||||
text = f"[KEYBOARD:\nBig={long_data} | Small=ck:ok\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert len(buttons[0]) == 1
|
||||
assert buttons[0][0] == ("Small", "ck:ok")
|
||||
|
||||
def test_label_with_equals(self):
|
||||
"""Label containing '=' still parses (uses rsplit)."""
|
||||
adapter = _make_adapter()
|
||||
text = "[KEYBOARD:\n2+2=4=ck:math\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert buttons[0][0] == ("2+2=4", "ck:math")
|
||||
|
||||
def test_text_preserved_around_block(self):
|
||||
"""Text before and after the keyboard block is kept."""
|
||||
adapter = _make_adapter()
|
||||
text = "Before text.\n[KEYBOARD:\nOK=ck:ok\n]\nAfter text."
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert "Before text." in cleaned
|
||||
assert "After text." in cleaned
|
||||
assert "KEYBOARD" not in cleaned
|
||||
|
||||
def test_keyboard_only_response(self):
|
||||
"""Response with only a keyboard block → buttons present, text empty."""
|
||||
adapter = _make_adapter()
|
||||
text = "[KEYBOARD:\n✅ Yes=ck:yes\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert cleaned.strip() == ""
|
||||
|
||||
def test_no_equals_in_item_skipped(self):
|
||||
"""Items without '=' are silently skipped."""
|
||||
adapter = _make_adapter()
|
||||
text = "[KEYBOARD:\nJust text | OK=ck:ok\n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert len(buttons[0]) == 1
|
||||
assert buttons[0][0] == ("OK", "ck:ok")
|
||||
|
||||
def test_whitespace_trimming(self):
|
||||
"""Leading/trailing whitespace in labels and callbacks is trimmed."""
|
||||
adapter = _make_adapter()
|
||||
text = "[KEYBOARD:\n Yes = ck:yes | No = ck:no \n]"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is not None
|
||||
assert buttons[0][0] == ("Yes", "ck:yes")
|
||||
assert buttons[0][1] == ("No", "ck:no")
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# attach_inline_keyboard
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class TestAttachInlineKeyboard:
|
||||
"""Test attaching an inline keyboard to an existing message."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_calls_edit_message_reply_markup(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._bot.edit_message_reply_markup = AsyncMock()
|
||||
|
||||
buttons = [[("Yes", "ck:yes"), ("No", "ck:no")]]
|
||||
result = await adapter.attach_inline_keyboard("12345", "42", buttons)
|
||||
|
||||
assert result.success is True
|
||||
adapter._bot.edit_message_reply_markup.assert_awaited_once()
|
||||
call_kwargs = adapter._bot.edit_message_reply_markup.call_args[1]
|
||||
assert call_kwargs["chat_id"] == 12345
|
||||
assert call_kwargs["message_id"] == 42
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_not_connected(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._bot = None
|
||||
result = await adapter.attach_inline_keyboard("12345", "42", [[("OK", "ck:ok")]])
|
||||
assert result.success is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_error_handled(self):
|
||||
adapter = _make_adapter()
|
||||
adapter._bot.edit_message_reply_markup = AsyncMock(side_effect=Exception("API error"))
|
||||
|
||||
result = await adapter.attach_inline_keyboard("12345", "42", [[("OK", "ck:ok")]])
|
||||
assert result.success is False
|
||||
assert "API error" in result.error
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# _handle_custom_keyboard_callback
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class TestCustomKeyboardCallback:
|
||||
"""Test the ck: callback handler and re-injection logic."""
|
||||
|
||||
def _make_query(self, data="ck:confirm", message_text="Choose:", label="✅ Confirm"):
|
||||
"""Build a mock callback query for ck: buttons."""
|
||||
btn = MagicMock()
|
||||
btn.callback_data = data
|
||||
btn.text = label
|
||||
|
||||
row = [btn]
|
||||
markup = MagicMock()
|
||||
markup.inline_keyboard = [row]
|
||||
|
||||
message = MagicMock()
|
||||
message.text = message_text
|
||||
message.message_id = 42
|
||||
message.chat.id = 12345
|
||||
message.chat.type = "private"
|
||||
message.chat.title = None
|
||||
message.chat.full_name = "Test Chat"
|
||||
message.from_user = MagicMock()
|
||||
message.from_user.id = 99 # bot
|
||||
message.from_user.full_name = "Bot"
|
||||
message.reply_markup = markup
|
||||
message.message_thread_id = None
|
||||
message.reply_to_message = None
|
||||
# provide forum_topic_created as None so _build_message_event doesn't trip
|
||||
message.forum_topic_created = None
|
||||
|
||||
query = MagicMock()
|
||||
query.data = data
|
||||
query.message = message
|
||||
query.from_user = MagicMock()
|
||||
query.from_user.id = 777 # real user
|
||||
query.from_user.full_name = "Alice"
|
||||
query.from_user.first_name = "Alice"
|
||||
query.answer = AsyncMock()
|
||||
query.edit_message_text = AsyncMock()
|
||||
query.edit_message_reply_markup = AsyncMock()
|
||||
return query
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_reinjects_with_correct_user(self):
|
||||
"""Callback re-injection uses the clicker's user_id, not the bot's."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query()
|
||||
|
||||
captured_event = {}
|
||||
|
||||
async def capture_event(event):
|
||||
captured_event["text"] = event.text
|
||||
captured_event["user_id"] = event.source.user_id
|
||||
captured_event["user_name"] = event.source.user_name
|
||||
|
||||
adapter.handle_message = capture_event
|
||||
|
||||
await adapter._handle_custom_keyboard_callback(query, "ck:confirm")
|
||||
|
||||
assert captured_event["text"] == "ck:confirm"
|
||||
assert captured_event["user_id"] == "777"
|
||||
assert captured_event["user_name"] == "Alice"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edits_message_with_selection(self):
|
||||
"""Callback edits the original message to show the user's selection."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query()
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter._handle_custom_keyboard_callback(query, "ck:confirm")
|
||||
|
||||
query.edit_message_text.assert_awaited_once()
|
||||
call_kwargs = query.edit_message_text.call_args[1]
|
||||
assert "Alice selected:" in call_kwargs["text"]
|
||||
assert "✅ Confirm" in call_kwargs["text"]
|
||||
assert call_kwargs["reply_markup"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_acknowledges_query(self):
|
||||
"""Callback query is answered (acknowledged)."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query()
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter._handle_custom_keyboard_callback(query, "ck:test")
|
||||
query.answer.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_from_user_bails_early(self):
|
||||
"""If from_user is None, bail without re-injecting."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query()
|
||||
query.from_user = None
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter._handle_custom_keyboard_callback(query, "ck:test")
|
||||
adapter.handle_message.assert_not_awaited()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_failure_fallback(self):
|
||||
"""If edit_message_text fails, falls back to removing keyboard only."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query()
|
||||
query.edit_message_text = AsyncMock(side_effect=Exception("edit failed"))
|
||||
adapter.handle_message = AsyncMock()
|
||||
|
||||
await adapter._handle_custom_keyboard_callback(query, "ck:test")
|
||||
# Fallback: just remove the keyboard
|
||||
query.edit_message_reply_markup.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_callback_routing(self):
|
||||
"""ck: data in _handle_callback_query routes to the keyboard handler."""
|
||||
adapter = _make_adapter()
|
||||
query = self._make_query(data="ck:action")
|
||||
|
||||
update = MagicMock()
|
||||
update.callback_query = query
|
||||
context = MagicMock()
|
||||
|
||||
with patch.object(adapter, "_handle_custom_keyboard_callback", new_callable=AsyncMock) as mock_handler:
|
||||
await adapter._handle_callback_query(update, context)
|
||||
mock_handler.assert_awaited_once_with(query, "ck:action")
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# _clean_for_display — keyboard stripping during streaming
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class TestStreamingKeyboardStripping:
|
||||
"""Test [KEYBOARD:] block stripping in the streaming display path."""
|
||||
|
||||
def test_complete_block_stripped(self):
|
||||
"""A complete [KEYBOARD:...] block is removed from display text."""
|
||||
text = "Here is your choice:\n[KEYBOARD:\n✅ Yes=ck:yes | ❌ No=ck:no\n]"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "[KEYBOARD:" not in result
|
||||
assert "ck:" not in result
|
||||
assert "Here is your choice:" in result
|
||||
|
||||
def test_partial_block_stripped(self):
|
||||
"""An incomplete [KEYBOARD: block (mid-stream) hides from that point."""
|
||||
text = "Some text\n[KEYBOARD:\n✅ Yes=ck:yes"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "[KEYBOARD:" not in result
|
||||
assert "Some text" in result
|
||||
|
||||
def test_no_keyboard_passthrough(self):
|
||||
"""Text without KEYBOARD or MEDIA passes through unchanged."""
|
||||
text = "Normal response text."
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert result == text
|
||||
|
||||
def test_keyboard_with_media(self):
|
||||
"""Both KEYBOARD and MEDIA tags are stripped."""
|
||||
text = "Result\nMEDIA:/tmp/img.png\n[KEYBOARD:\nOK=ck:ok\n]"
|
||||
result = GatewayStreamConsumer._clean_for_display(text)
|
||||
assert "MEDIA:" not in result
|
||||
assert "[KEYBOARD:" not in result
|
||||
assert "Result" in result
|
||||
|
||||
|
||||
# ===========================================================================
|
||||
# Base class no-ops
|
||||
# ===========================================================================
|
||||
|
||||
|
||||
class TestBaseClassNoOps:
|
||||
"""Verify the base adapter's keyboard methods are no-ops."""
|
||||
|
||||
def test_extract_returns_none(self):
|
||||
"""Base extract_inline_keyboard returns (None, text)."""
|
||||
from gateway.platforms.base import BasePlatformAdapter
|
||||
|
||||
# We can't instantiate abstract BasePlatformAdapter directly, but we can
|
||||
# call the method on the TelegramAdapter to verify it doesn't crash
|
||||
# when no keyboard is present (falls through to base behavior).
|
||||
adapter = _make_adapter()
|
||||
text = "No keyboard here"
|
||||
buttons, cleaned = adapter.extract_inline_keyboard(text)
|
||||
assert buttons is None
|
||||
assert cleaned == text
|
||||
Reference in New Issue
Block a user