mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-11 04:38:43 +08:00
Compare commits
1 Commits
fix/dev-de
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3144585f38 |
@@ -1042,7 +1042,7 @@ def load_gateway_config() -> GatewayConfig:
|
||||
if isinstance(group_allowed_chats, list):
|
||||
group_allowed_chats = ",".join(str(v) for v in group_allowed_chats)
|
||||
os.environ["TELEGRAM_GROUP_ALLOWED_CHATS"] = str(group_allowed_chats)
|
||||
for _telegram_extra_key in ("guest_mode", "disable_link_previews"):
|
||||
for _telegram_extra_key in ("guest_mode", "disable_link_previews", "business_mode"):
|
||||
if _telegram_extra_key in telegram_cfg:
|
||||
plat_data = platforms_data.setdefault(Platform.TELEGRAM.value, {})
|
||||
if not isinstance(plat_data, dict):
|
||||
|
||||
@@ -447,6 +447,16 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
# behavior; opt-in via display.platforms.telegram.notifications).
|
||||
self._notifications_mode: str = "important"
|
||||
|
||||
# Business Mode (Secretary Bots) — owner-approved drafting against
|
||||
# incoming customer messages. Initialized lazily in ``connect()``
|
||||
# once the bot is available, only when the feature is enabled.
|
||||
# See ``telegram_business.py`` for the manager + state machine.
|
||||
self._business_manager: Optional[Any] = None
|
||||
self._business_mode_enabled: bool = bool(
|
||||
self.config.extra.get("business_mode", {}).get("enabled", False)
|
||||
if isinstance(self.config.extra.get("business_mode"), dict) else False
|
||||
)
|
||||
|
||||
def _notification_kwargs(
|
||||
self, metadata: Optional[Dict[str, Any]]
|
||||
) -> Dict[str, Any]:
|
||||
@@ -1292,6 +1302,43 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
))
|
||||
# Handle inline keyboard button callbacks (update prompts)
|
||||
self._app.add_handler(CallbackQueryHandler(self._handle_callback_query))
|
||||
|
||||
# Telegram Business Mode (Secretary Bots) handlers — gated on
|
||||
# the ``telegram.business_mode.enabled`` config key. Registers
|
||||
# three update types: connection lifecycle, customer messages,
|
||||
# and edits to customer messages. Deletes are ignored for v1.
|
||||
if self._business_mode_enabled:
|
||||
try:
|
||||
self._init_business_manager()
|
||||
# Connection lifecycle (established / edited / ended)
|
||||
try:
|
||||
from telegram.ext import BusinessConnectionHandler
|
||||
self._app.add_handler(
|
||||
BusinessConnectionHandler(self._handle_business_connection)
|
||||
)
|
||||
except ImportError:
|
||||
logger.warning(
|
||||
"[%s] BusinessConnectionHandler unavailable — Business Mode requires python-telegram-bot >= 21.1",
|
||||
self.name,
|
||||
)
|
||||
raise
|
||||
# Incoming customer messages (TEXT only for v1)
|
||||
self._app.add_handler(TelegramMessageHandler(
|
||||
filters.UpdateType.BUSINESS_MESSAGE & filters.TEXT,
|
||||
self._handle_business_message,
|
||||
))
|
||||
# Edited customer messages — informational; treated like a fresh draft.
|
||||
self._app.add_handler(TelegramMessageHandler(
|
||||
filters.UpdateType.EDITED_BUSINESS_MESSAGE & filters.TEXT,
|
||||
self._handle_business_message,
|
||||
))
|
||||
logger.info("[%s] Telegram Business Mode handlers registered", self.name)
|
||||
except Exception as biz_err:
|
||||
logger.warning(
|
||||
"[%s] Could not initialize Business Mode (%s) — feature disabled",
|
||||
self.name, biz_err,
|
||||
)
|
||||
self._business_manager = None
|
||||
|
||||
# Start polling — retry initialize() for transient TLS resets
|
||||
try:
|
||||
@@ -2897,6 +2944,26 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
)
|
||||
return
|
||||
|
||||
# --- Business Mode draft callbacks (bd:choice:draft_id) ---
|
||||
if data.startswith("bd:") and self._business_manager is not None:
|
||||
caller_id = str(getattr(query.from_user, "id", ""))
|
||||
try:
|
||||
await self._business_manager.handle_callback(
|
||||
data=data,
|
||||
caller_user_id=caller_id or None,
|
||||
answer=query.answer,
|
||||
edit_message_text=query.edit_message_text,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"[%s] Business draft callback failed: %s", self.name, exc,
|
||||
)
|
||||
try:
|
||||
await query.answer(text="⚠️ Action failed.")
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
|
||||
# --- Update prompt callbacks ---
|
||||
if not data.startswith("update_prompt:"):
|
||||
return
|
||||
@@ -3985,6 +4052,146 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
return True
|
||||
return self._message_matches_mention_patterns(message)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Telegram Business Mode (Secretary Bots)
|
||||
# ------------------------------------------------------------------
|
||||
# Only wired when ``telegram.business_mode.enabled`` is True in
|
||||
# config.yaml. See ``telegram_business.py`` for the manager and
|
||||
# state machine; the methods below are just thin PTB-update glue.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _init_business_manager(self) -> None:
|
||||
"""Construct the BusinessModeManager lazily, once the bot is up."""
|
||||
if self._business_manager is not None:
|
||||
return
|
||||
from gateway.platforms.telegram_business import BusinessModeManager
|
||||
|
||||
biz_cfg = self.config.extra.get("business_mode", {}) or {}
|
||||
debounce = float(biz_cfg.get("debounce_seconds", 8.0))
|
||||
draft_ttl_hours = float(biz_cfg.get("draft_ttl_hours", 24.0))
|
||||
max_chars = int(biz_cfg.get("max_customer_text_chars", 4000))
|
||||
|
||||
async def _send(**kwargs):
|
||||
# Strip Markdown formatting from owner DMs — the draft body has
|
||||
# raw customer text that could contain Telegram-special chars.
|
||||
# The business-mode renderer emits plain text only.
|
||||
kwargs.setdefault("parse_mode", None)
|
||||
kwargs.setdefault("disable_web_page_preview", True)
|
||||
if self._bot is None:
|
||||
raise RuntimeError("Telegram bot is not connected")
|
||||
return await self._bot.send_message(**kwargs)
|
||||
|
||||
async def _draft(customer_text: str, customer_chat_id: str) -> str:
|
||||
return await self._business_draft_via_auxiliary(customer_text, customer_chat_id)
|
||||
|
||||
from hermes_state import SessionDB
|
||||
from hermes_constants import get_hermes_home
|
||||
from pathlib import Path
|
||||
|
||||
# Lazily acquire the shared state DB. ``hermes_state`` singleton
|
||||
# is created elsewhere in the gateway runner; if we can't grab
|
||||
# it from there, fall back to opening our own handle against the
|
||||
# canonical state.db path.
|
||||
runner = getattr(getattr(self, "_message_handler", None), "__self__", None)
|
||||
db = getattr(runner, "_session_db", None) if runner else None
|
||||
if db is None:
|
||||
try:
|
||||
db = SessionDB(Path(get_hermes_home()) / "state.db")
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"Failed to open state.db for Business Mode: {exc}")
|
||||
|
||||
self._business_manager = BusinessModeManager(
|
||||
session_db=db,
|
||||
send_message=_send,
|
||||
draft_generator=_draft,
|
||||
debounce_seconds=debounce,
|
||||
draft_ttl_hours=draft_ttl_hours,
|
||||
max_customer_text_chars=max_chars,
|
||||
)
|
||||
|
||||
async def _business_draft_via_auxiliary(
|
||||
self, customer_text: str, customer_chat_id: str
|
||||
) -> str:
|
||||
"""Generate a draft reply via the auxiliary LLM client.
|
||||
|
||||
Synchronous ``call_llm`` is offloaded to a thread so it doesn't
|
||||
block the asyncio loop. System prompt is plain — v1 doesn't
|
||||
carry any prior history; v2 will plumb a per-customer session.
|
||||
"""
|
||||
from agent.auxiliary_client import call_llm
|
||||
|
||||
biz_cfg = self.config.extra.get("business_mode", {}) or {}
|
||||
owner_persona = (
|
||||
biz_cfg.get("owner_persona")
|
||||
or "You are drafting a short, friendly reply on behalf of the "
|
||||
"account owner. Match the tone of a personal message — "
|
||||
"warm, direct, and concise. Do not introduce yourself as "
|
||||
"an assistant or AI. Reply in the same language the "
|
||||
"customer used."
|
||||
)
|
||||
system_prompt = (
|
||||
f"{owner_persona}\n\n"
|
||||
"Output only the message text. No preamble, no quotes, no "
|
||||
"markdown headers. Keep it under 4 sentences unless the "
|
||||
"customer asked something that genuinely needs a longer reply."
|
||||
)
|
||||
messages = [
|
||||
{"role": "system", "content": system_prompt},
|
||||
{"role": "user", "content": customer_text},
|
||||
]
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
response = await loop.run_in_executor(
|
||||
None,
|
||||
lambda: call_llm(
|
||||
task="title_generation", # reuse light auxiliary task slot
|
||||
messages=messages,
|
||||
temperature=0.7,
|
||||
max_tokens=500,
|
||||
timeout=60.0,
|
||||
),
|
||||
)
|
||||
try:
|
||||
content = response.choices[0].message.content or ""
|
||||
except (AttributeError, IndexError, TypeError):
|
||||
content = ""
|
||||
return str(content).strip()
|
||||
|
||||
async def _handle_business_connection(
|
||||
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
|
||||
) -> None:
|
||||
"""PTB callback: BusinessConnection updates."""
|
||||
if self._business_manager is None:
|
||||
return
|
||||
conn = getattr(update, "business_connection", None)
|
||||
if conn is None:
|
||||
return
|
||||
try:
|
||||
await self._business_manager.handle_connection_update(conn)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"[%s] Business connection handler failed: %s", self.name, exc,
|
||||
)
|
||||
|
||||
async def _handle_business_message(
|
||||
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
|
||||
) -> None:
|
||||
"""PTB callback: business_message and edited_business_message updates."""
|
||||
if self._business_manager is None:
|
||||
return
|
||||
message = (
|
||||
getattr(update, "business_message", None)
|
||||
or getattr(update, "edited_business_message", None)
|
||||
)
|
||||
if message is None:
|
||||
return
|
||||
try:
|
||||
await self._business_manager.handle_business_message(message)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"[%s] Business message handler failed: %s", self.name, exc,
|
||||
)
|
||||
|
||||
async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
"""Handle incoming text messages.
|
||||
|
||||
@@ -3994,6 +4201,25 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
"""
|
||||
if not update.message or not update.message.text:
|
||||
return
|
||||
|
||||
# Business Mode: if this text DM is the owner replying after they
|
||||
# tapped Edit on a draft, consume it as the edit override and stop.
|
||||
if self._business_manager is not None and update.message.chat is not None:
|
||||
chat = update.message.chat
|
||||
if getattr(chat, "type", None) == ChatType.PRIVATE:
|
||||
try:
|
||||
consumed = await self._business_manager.maybe_handle_edit_capture(
|
||||
owner_chat_id=str(chat.id),
|
||||
text=update.message.text or "",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"[%s] Business edit-capture failed: %s", self.name, exc,
|
||||
)
|
||||
consumed = False
|
||||
if consumed:
|
||||
return
|
||||
|
||||
if not self._should_process_message(update.message):
|
||||
return
|
||||
|
||||
@@ -4005,6 +4231,41 @@ class TelegramAdapter(BasePlatformAdapter):
|
||||
"""Handle incoming command messages."""
|
||||
if not update.message or not update.message.text:
|
||||
return
|
||||
|
||||
# Business Mode: /biz is handled entirely at the adapter layer
|
||||
# since it manipulates per-owner business state with no agent loop
|
||||
# involvement. Routed before _should_process_message so it works
|
||||
# in private DMs regardless of mention requirements.
|
||||
if self._business_manager is not None and update.message.chat is not None:
|
||||
text = (update.message.text or "").strip()
|
||||
parts = text.split()
|
||||
head = parts[0] if parts else ""
|
||||
if head.startswith("/"):
|
||||
cmd = head[1:].split("@", 1)[0].lower()
|
||||
if cmd == "biz":
|
||||
args = parts[1:]
|
||||
user = update.message.from_user
|
||||
chat = update.message.chat
|
||||
try:
|
||||
reply = await self._business_manager.handle_biz_command(
|
||||
owner_user_id=str(getattr(user, "id", "")),
|
||||
owner_chat_id=str(chat.id),
|
||||
args=args,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("[%s] /biz handler failed: %s", self.name, exc)
|
||||
reply = f"⚠️ /biz failed: {exc}"
|
||||
try:
|
||||
if self._bot is not None:
|
||||
await self._bot.send_message(
|
||||
chat_id=chat.id,
|
||||
text=reply,
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to deliver /biz reply", exc_info=True)
|
||||
return
|
||||
|
||||
if not self._should_process_message(update.message, is_command=True):
|
||||
return
|
||||
|
||||
|
||||
760
gateway/platforms/telegram_business.py
Normal file
760
gateway/platforms/telegram_business.py
Normal file
@@ -0,0 +1,760 @@
|
||||
"""Telegram Business Mode (Secretary Bots) — owner-approved drafting.
|
||||
|
||||
When a Telegram Business account owner connects this bot via BotFather's
|
||||
Business Mode, the bot starts receiving messages addressed to the owner
|
||||
in chats the owner has whitelisted. This module turns those incoming
|
||||
customer messages into drafted replies that the owner approves before
|
||||
they go out — no auto-send, ever.
|
||||
|
||||
User flow (the simplest path that works end-to-end):
|
||||
|
||||
1. Owner enables Business Mode in BotFather and connects this bot.
|
||||
→ bot receives ``BusinessConnection`` update, persists it, DMs the
|
||||
owner an onboarding message with a quick how-it-works summary.
|
||||
|
||||
2. Customer messages the owner in a chat covered by the connection.
|
||||
→ bot debounces ``debounce_seconds`` (default 8s) to coalesce
|
||||
typing bursts, then drafts a single reply using the most-recent
|
||||
customer text.
|
||||
→ bot DMs the draft to the *owner's* chat with this bot, with
|
||||
inline buttons: [✓ Send] [✎ Edit] [✕ Discard].
|
||||
|
||||
3. Owner taps:
|
||||
Send → bot sends the draft to the customer chat using
|
||||
``business_connection_id``. Owner gets a "✓ Sent" confirmation.
|
||||
Edit → bot replies "send me the text you want delivered" and
|
||||
captures the owner's next text DM as the outgoing reply.
|
||||
Discard → draft dropped, nothing goes to the customer.
|
||||
|
||||
The owner is always in control: ``can_reply`` from Telegram is required
|
||||
for the Send button to appear, ``/biz pause`` globally suspends drafting,
|
||||
and per-chat pauses live in ``telegram_business_connections.paused_chats``.
|
||||
|
||||
State lives in two SQLite tables (``telegram_business_connections`` and
|
||||
``telegram_business_drafts``) created lazily on first use by
|
||||
``SessionDB.apply_telegram_business_migration()``. See ``hermes_state.py``
|
||||
for the schema.
|
||||
|
||||
This module is glued onto ``gateway/platforms/telegram.py`` via three
|
||||
PTB update handlers plus an inline-button callback prefix (``bd:`` for
|
||||
"business draft"). The host adapter owns all Telegram I/O — this module
|
||||
just calls back into it for sending.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from typing import Any, Awaitable, Callable, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Callback-data prefix for the inline buttons. Keep it short — Telegram
|
||||
# caps callback_data at 64 bytes. Format: "bd:<choice>:<draft_id>".
|
||||
CALLBACK_PREFIX = "bd:"
|
||||
|
||||
|
||||
# Choice values rendered on the inline keyboard.
|
||||
CHOICE_SEND = "send"
|
||||
CHOICE_EDIT = "edit"
|
||||
CHOICE_DISCARD = "discard"
|
||||
|
||||
|
||||
# Onboarding text the bot DMs the owner the first time a BusinessConnection
|
||||
# arrives. Plain text (Telegram MarkdownV2 escaping is fragile, and this
|
||||
# message is content-stable enough to hand-author).
|
||||
ONBOARDING_MESSAGE = (
|
||||
"🤝 You've connected me as your Telegram Business assistant.\n\n"
|
||||
"Here's how it works:\n"
|
||||
"• When someone messages you in a chat I have access to, I'll draft "
|
||||
"a reply and send it to you here.\n"
|
||||
"• You'll see [✓ Send] [✎ Edit] [✕ Discard] buttons. Nothing goes "
|
||||
"to your contact until you tap Send.\n"
|
||||
"• Tap Edit to override the draft with your own wording.\n"
|
||||
"• Tap Discard to drop the draft entirely.\n\n"
|
||||
"Useful commands:\n"
|
||||
" /biz — show status and active connections\n"
|
||||
" /biz pause — pause drafting (you still see the messages)\n"
|
||||
" /biz resume — resume drafting\n"
|
||||
" /biz off — disable drafting for one chat (reply to that customer's draft)\n\n"
|
||||
"I never auto-send. Every reply is yours to approve."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Type aliases for the adapter callbacks the manager depends on.
|
||||
# Defined as Callables so this module stays import-light and decoupled
|
||||
# from the rest of the telegram.py module.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Generates a draft reply text for a given customer message.
|
||||
# Returns the draft text, or raises on failure.
|
||||
DraftGenerator = Callable[[str, str], Awaitable[str]] # (customer_text, customer_chat_id) -> draft
|
||||
|
||||
# Sends a message to a chat. For owner DMs, business_connection_id is None.
|
||||
# For customer chats reached via business mode, business_connection_id is set.
|
||||
SendMessage = Callable[..., Awaitable[Any]] # delegates to bot.send_message kwargs
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Manager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class BusinessModeManager:
|
||||
"""Owns Business Mode state and orchestrates drafting + approval.
|
||||
|
||||
One instance per Telegram adapter. The adapter wires up handlers in
|
||||
``connect()`` and calls into this manager for every business update.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
session_db: Any,
|
||||
send_message: SendMessage,
|
||||
draft_generator: DraftGenerator,
|
||||
debounce_seconds: float = 8.0,
|
||||
draft_ttl_hours: float = 24.0,
|
||||
max_customer_text_chars: int = 4000,
|
||||
) -> None:
|
||||
self._db = session_db
|
||||
self._send = send_message
|
||||
self._draft_generator = draft_generator
|
||||
self._debounce_seconds = max(0.0, float(debounce_seconds))
|
||||
self._draft_ttl_seconds = max(60.0, float(draft_ttl_hours) * 3600.0)
|
||||
self._max_customer_text_chars = int(max_customer_text_chars)
|
||||
|
||||
# In-flight debounce tasks, keyed by (connection_id, customer_chat_id).
|
||||
# New customer messages reset the timer so a typing burst yields one draft.
|
||||
self._debounce_tasks: Dict[str, asyncio.Task] = {}
|
||||
self._debounce_buffers: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
# Owner DMs that are in "next message = edited reply" mode.
|
||||
# Keyed by owner_chat_id → draft_id awaiting the override text.
|
||||
self._edit_capture: Dict[str, int] = {}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# BusinessConnection updates (established / edited / ended).
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def handle_connection_update(self, business_connection: Any) -> None:
|
||||
"""Persist or remove a connection row.
|
||||
|
||||
``business_connection`` is a PTB ``BusinessConnection`` object.
|
||||
On Telegram Bot API 9.0+, ``rights`` is an object with ``can_reply``
|
||||
among other flags; on older versions a ``can_reply`` bool sits on
|
||||
the connection itself. We tolerate both shapes.
|
||||
"""
|
||||
conn_id = getattr(business_connection, "id", None)
|
||||
user = getattr(business_connection, "user", None)
|
||||
owner_user_id = getattr(user, "id", None)
|
||||
owner_chat_id = getattr(business_connection, "user_chat_id", None)
|
||||
is_enabled = bool(getattr(business_connection, "is_enabled", False))
|
||||
if conn_id is None or owner_user_id is None or owner_chat_id is None:
|
||||
logger.warning(
|
||||
"BusinessConnection update missing required fields: id=%s, user.id=%s, user_chat_id=%s",
|
||||
conn_id, owner_user_id, owner_chat_id,
|
||||
)
|
||||
return
|
||||
|
||||
# ``can_reply`` location moved in API 9.0 from connection root to
|
||||
# ``rights.can_reply``. Probe rights first.
|
||||
can_reply = False
|
||||
rights = getattr(business_connection, "rights", None)
|
||||
if rights is not None:
|
||||
can_reply = bool(getattr(rights, "can_reply", False))
|
||||
else:
|
||||
can_reply = bool(getattr(business_connection, "can_reply", False))
|
||||
|
||||
previous = self._db.get_telegram_business_connection(str(conn_id))
|
||||
self._db.upsert_telegram_business_connection(
|
||||
connection_id=str(conn_id),
|
||||
owner_user_id=str(owner_user_id),
|
||||
owner_chat_id=str(owner_chat_id),
|
||||
can_reply=can_reply,
|
||||
is_enabled=is_enabled,
|
||||
)
|
||||
|
||||
# First-time onboarding DM.
|
||||
if previous is None and is_enabled:
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text=ONBOARDING_MESSAGE,
|
||||
disable_notification=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Failed to deliver business-mode onboarding DM to %s: %s",
|
||||
owner_chat_id, exc,
|
||||
)
|
||||
|
||||
# Connection ended → DM the owner a confirmation.
|
||||
if previous is not None and previous.get("is_enabled") and not is_enabled:
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text="🔌 Business connection ended. I won't draft any more replies.",
|
||||
disable_notification=True,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("Disconnection notice send failed (%s): %s", owner_chat_id, exc)
|
||||
|
||||
# can_reply changed → tell the owner.
|
||||
if previous is not None and previous.get("is_enabled") and is_enabled:
|
||||
if previous.get("can_reply") != can_reply:
|
||||
msg = (
|
||||
"✅ Send-on-your-behalf permission enabled — Send buttons are now live."
|
||||
if can_reply else
|
||||
"⚠️ Send-on-your-behalf permission is OFF in your Telegram Business "
|
||||
"settings. I'll still draft replies but you'll need to copy and "
|
||||
"send them yourself."
|
||||
)
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id), text=msg, disable_notification=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# business_message updates (customer talks to owner).
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def handle_business_message(self, message: Any) -> None:
|
||||
"""Schedule a debounced draft for an incoming customer message.
|
||||
|
||||
``message`` is a PTB ``Message`` from a business_message update.
|
||||
It has ``business_connection_id``, ``chat`` (the customer chat),
|
||||
``from_user`` (the customer), and ``text``/``caption``.
|
||||
"""
|
||||
conn_id = getattr(message, "business_connection_id", None)
|
||||
if not conn_id:
|
||||
return
|
||||
|
||||
conn = self._db.get_telegram_business_connection(str(conn_id))
|
||||
if not conn:
|
||||
logger.debug("Received business_message for unknown connection %s", conn_id)
|
||||
return
|
||||
if not conn.get("is_enabled"):
|
||||
return
|
||||
if not conn.get("auto_draft", True):
|
||||
logger.debug("Business connection %s has auto_draft=False; skipping draft", conn_id)
|
||||
return
|
||||
|
||||
chat = getattr(message, "chat", None)
|
||||
customer_chat_id = getattr(chat, "id", None)
|
||||
if customer_chat_id is None:
|
||||
return
|
||||
if str(customer_chat_id) in (conn.get("paused_chats") or []):
|
||||
logger.debug("Customer chat %s paused; skipping draft", customer_chat_id)
|
||||
return
|
||||
|
||||
# Pull text. Captions on media count too — they're often the only
|
||||
# part the customer typed.
|
||||
text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
|
||||
if not text:
|
||||
# Pure media without caption — out of scope for v1 (no vision pipeline).
|
||||
return
|
||||
if len(text) > self._max_customer_text_chars:
|
||||
text = text[: self._max_customer_text_chars]
|
||||
|
||||
key = f"{conn_id}:{customer_chat_id}"
|
||||
|
||||
# Cancel any in-flight debounce for this (connection, customer chat) so
|
||||
# rapid bursts coalesce into a single draft against the latest text.
|
||||
prior = self._debounce_tasks.pop(key, None)
|
||||
if prior is not None and not prior.done():
|
||||
prior.cancel()
|
||||
|
||||
# Buffer the latest message info — the timer will read this at fire time.
|
||||
self._debounce_buffers[key] = {
|
||||
"conn_id": str(conn_id),
|
||||
"owner_chat_id": str(conn.get("owner_chat_id")),
|
||||
"customer_chat_id": str(customer_chat_id),
|
||||
"customer_msg_id": str(getattr(message, "message_id", "") or ""),
|
||||
"customer_text": text,
|
||||
"customer_name": _customer_display_name(message),
|
||||
}
|
||||
|
||||
# Schedule the actual draft. If debounce is zero (test mode), fire
|
||||
# immediately so the test doesn't have to wait.
|
||||
if self._debounce_seconds <= 0:
|
||||
await self._run_draft(key)
|
||||
else:
|
||||
self._debounce_tasks[key] = asyncio.create_task(
|
||||
self._debounce_then_draft(key)
|
||||
)
|
||||
|
||||
async def _debounce_then_draft(self, key: str) -> None:
|
||||
try:
|
||||
await asyncio.sleep(self._debounce_seconds)
|
||||
except asyncio.CancelledError:
|
||||
return
|
||||
try:
|
||||
await self._run_draft(key)
|
||||
finally:
|
||||
self._debounce_tasks.pop(key, None)
|
||||
|
||||
async def _run_draft(self, key: str) -> None:
|
||||
buf = self._debounce_buffers.pop(key, None)
|
||||
if not buf:
|
||||
return
|
||||
|
||||
# Re-check the connection state — owner may have hit /biz pause
|
||||
# during the debounce window.
|
||||
conn = self._db.get_telegram_business_connection(buf["conn_id"])
|
||||
if not conn or not conn.get("is_enabled") or not conn.get("auto_draft", True):
|
||||
return
|
||||
if buf["customer_chat_id"] in (conn.get("paused_chats") or []):
|
||||
return
|
||||
|
||||
try:
|
||||
draft_text = await self._draft_generator(
|
||||
buf["customer_text"], buf["customer_chat_id"]
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.exception("Business-mode draft generator failed: %s", exc)
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(buf["owner_chat_id"]),
|
||||
text=(
|
||||
"⚠️ I couldn't draft a reply to "
|
||||
f"{buf['customer_name']}: {exc}.\n\n"
|
||||
f"Their message was:\n\n{buf['customer_text']}"
|
||||
),
|
||||
disable_notification=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return
|
||||
|
||||
draft_text = (draft_text or "").strip()
|
||||
if not draft_text:
|
||||
logger.debug("Empty draft for %s — skipping", key)
|
||||
return
|
||||
|
||||
draft_id = self._db.create_telegram_business_draft(
|
||||
connection_id=buf["conn_id"],
|
||||
owner_chat_id=buf["owner_chat_id"],
|
||||
customer_chat_id=buf["customer_chat_id"],
|
||||
customer_msg_id=buf["customer_msg_id"] or None,
|
||||
customer_text=buf["customer_text"],
|
||||
draft_text=draft_text,
|
||||
ttl_seconds=self._draft_ttl_seconds,
|
||||
)
|
||||
|
||||
owner_message = self._render_draft_owner_message(
|
||||
customer_name=buf["customer_name"],
|
||||
customer_text=buf["customer_text"],
|
||||
draft_text=draft_text,
|
||||
)
|
||||
keyboard = self._build_draft_keyboard(draft_id, can_reply=bool(conn.get("can_reply")))
|
||||
|
||||
try:
|
||||
sent = await self._send(
|
||||
chat_id=int(buf["owner_chat_id"]),
|
||||
text=owner_message,
|
||||
reply_markup=keyboard,
|
||||
disable_notification=False,
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to deliver business-mode draft to owner %s: %s",
|
||||
buf["owner_chat_id"], exc)
|
||||
# Mark the draft expired so we don't leave an unactionable row.
|
||||
self._db.resolve_telegram_business_draft(draft_id, status="expired")
|
||||
return
|
||||
|
||||
owner_msg_id = getattr(sent, "message_id", None)
|
||||
if owner_msg_id is not None:
|
||||
self._db.set_telegram_business_draft_owner_message(draft_id, str(owner_msg_id))
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Inline-button callback dispatch (bd:choice:draft_id)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def handle_callback(
|
||||
self,
|
||||
*,
|
||||
data: str,
|
||||
caller_user_id: Optional[str],
|
||||
answer: Callable[..., Awaitable[Any]],
|
||||
edit_message_text: Callable[..., Awaitable[Any]],
|
||||
) -> bool:
|
||||
"""Handle a callback_query whose data starts with ``bd:``.
|
||||
|
||||
Returns True if the callback was dispatched (caller should stop
|
||||
further handling), False if it wasn't ours.
|
||||
"""
|
||||
if not data.startswith(CALLBACK_PREFIX):
|
||||
return False
|
||||
|
||||
parts = data.split(":", 2)
|
||||
if len(parts) != 3:
|
||||
await answer(text="Invalid draft action.")
|
||||
return True
|
||||
choice = parts[1]
|
||||
try:
|
||||
draft_id = int(parts[2])
|
||||
except ValueError:
|
||||
await answer(text="Invalid draft action.")
|
||||
return True
|
||||
|
||||
draft = self._db.get_telegram_business_draft(draft_id)
|
||||
if draft is None:
|
||||
await answer(text="That draft has expired.")
|
||||
return True
|
||||
if draft.get("status") != "pending":
|
||||
await answer(text="That draft has already been resolved.")
|
||||
return True
|
||||
|
||||
# Only the owner of this connection may act on the buttons.
|
||||
conn = self._db.get_telegram_business_connection(draft["connection_id"])
|
||||
if not conn:
|
||||
await answer(text="Connection no longer exists.")
|
||||
return True
|
||||
if caller_user_id and str(caller_user_id) != str(conn.get("owner_user_id")):
|
||||
await answer(text="⛔ Only the connected account owner can use these buttons.")
|
||||
return True
|
||||
|
||||
if choice == CHOICE_DISCARD:
|
||||
self._db.resolve_telegram_business_draft(draft_id, status="discarded")
|
||||
await answer(text="✕ Discarded")
|
||||
try:
|
||||
await edit_message_text(
|
||||
text=self._render_resolved_message(draft, status="discarded"),
|
||||
reply_markup=None,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
if choice == CHOICE_EDIT:
|
||||
self._edit_capture[str(conn["owner_chat_id"])] = draft_id
|
||||
await answer(text="✎ Send me the text to deliver")
|
||||
try:
|
||||
await edit_message_text(
|
||||
text=(
|
||||
self._render_resolved_message(draft, status="awaiting_edit")
|
||||
+ "\n\n✎ Reply to this DM with the text you want delivered."
|
||||
),
|
||||
reply_markup=None,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
if choice == CHOICE_SEND:
|
||||
if not conn.get("can_reply"):
|
||||
await answer(
|
||||
text=(
|
||||
"⚠️ Send-on-your-behalf is OFF — enable it in Telegram → "
|
||||
"Business → Chatbots, then try again."
|
||||
)
|
||||
)
|
||||
return True
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(draft["customer_chat_id"]),
|
||||
text=draft["draft_text"],
|
||||
business_connection_id=draft["connection_id"],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Business send failed for draft %s: %s", draft_id, exc)
|
||||
await answer(text=f"⚠️ Send failed: {exc}")
|
||||
return True
|
||||
|
||||
self._db.resolve_telegram_business_draft(
|
||||
draft_id, status="sent", final_sent_text=draft["draft_text"],
|
||||
)
|
||||
await answer(text="✓ Sent")
|
||||
try:
|
||||
await edit_message_text(
|
||||
text=self._render_resolved_message(draft, status="sent"),
|
||||
reply_markup=None,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
await answer(text="Unknown action.")
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Owner-side "next message after Edit = the actual reply text" capture.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def maybe_handle_edit_capture(
|
||||
self,
|
||||
*,
|
||||
owner_chat_id: str,
|
||||
text: str,
|
||||
) -> bool:
|
||||
"""If the owner just tapped Edit, treat their next DM as the override.
|
||||
|
||||
Returns True if the message was consumed by the edit-capture flow
|
||||
(so the caller shouldn't dispatch it to the normal command path).
|
||||
"""
|
||||
draft_id = self._edit_capture.pop(str(owner_chat_id), None)
|
||||
if draft_id is None:
|
||||
return False
|
||||
|
||||
override = (text or "").strip()
|
||||
if not override:
|
||||
# Empty edit attempt → restore capture and let the user retry.
|
||||
self._edit_capture[str(owner_chat_id)] = draft_id
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text="✎ Edit cancelled (empty text). Tap Edit again if you want to try.",
|
||||
disable_notification=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
draft = self._db.get_telegram_business_draft(draft_id)
|
||||
if not draft or draft.get("status") not in {"pending", "awaiting_edit"}:
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text="That draft has expired or was already resolved.",
|
||||
disable_notification=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
conn = self._db.get_telegram_business_connection(draft["connection_id"])
|
||||
if not conn:
|
||||
return True
|
||||
if not conn.get("can_reply"):
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text=(
|
||||
"⚠️ Send-on-your-behalf is OFF in Telegram → Business → "
|
||||
"Chatbots. I can't deliver this — copy the text and send it "
|
||||
"manually."
|
||||
),
|
||||
disable_notification=False,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(draft["customer_chat_id"]),
|
||||
text=override,
|
||||
business_connection_id=draft["connection_id"],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Business edit-send failed for draft %s: %s", draft_id, exc)
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text=f"⚠️ Send failed: {exc}",
|
||||
disable_notification=False,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
self._db.resolve_telegram_business_draft(
|
||||
draft_id, status="edited", final_sent_text=override,
|
||||
)
|
||||
try:
|
||||
await self._send(
|
||||
chat_id=int(owner_chat_id),
|
||||
text=f"✓ Sent (edited):\n\n{override}",
|
||||
disable_notification=True,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# /biz slash command (owner-only).
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def handle_biz_command(
|
||||
self,
|
||||
*,
|
||||
owner_user_id: str,
|
||||
owner_chat_id: str,
|
||||
args: List[str],
|
||||
) -> str:
|
||||
"""Process /biz subcommands. Returns text the adapter should send back.
|
||||
|
||||
Supported:
|
||||
/biz — status dashboard
|
||||
/biz pause — set auto_draft=False on all this user's connections
|
||||
/biz resume — set auto_draft=True on all this user's connections
|
||||
/biz off <chat_id> — add a customer chat to the paused list
|
||||
/biz on <chat_id> — remove a customer chat from the paused list
|
||||
"""
|
||||
connections = self._db.list_telegram_business_connections(
|
||||
owner_user_id=str(owner_user_id), enabled_only=False,
|
||||
)
|
||||
active = [c for c in connections if c.get("is_enabled")]
|
||||
|
||||
if not args:
|
||||
return self._render_status(connections=connections, owner_chat_id=owner_chat_id)
|
||||
|
||||
sub = args[0].lower()
|
||||
if sub in {"pause", "resume"}:
|
||||
target = (sub == "resume")
|
||||
if not active:
|
||||
return "You don't have any active business connections."
|
||||
for conn in active:
|
||||
self._db.set_telegram_business_auto_draft(
|
||||
conn["connection_id"], auto_draft=target,
|
||||
)
|
||||
verb = "▶ Resumed drafting." if target else "⏸ Paused drafting."
|
||||
return f"{verb} ({len(active)} connection{'s' if len(active) != 1 else ''})"
|
||||
|
||||
if sub in {"off", "on"} and len(args) >= 2:
|
||||
chat_arg = args[1].strip()
|
||||
if not active:
|
||||
return "You don't have any active business connections."
|
||||
updated = 0
|
||||
for conn in active:
|
||||
paused = list(conn.get("paused_chats") or [])
|
||||
if sub == "off" and chat_arg not in paused:
|
||||
paused.append(chat_arg)
|
||||
self._db.set_telegram_business_paused_chats(
|
||||
conn["connection_id"], paused,
|
||||
)
|
||||
updated += 1
|
||||
elif sub == "on" and chat_arg in paused:
|
||||
paused.remove(chat_arg)
|
||||
self._db.set_telegram_business_paused_chats(
|
||||
conn["connection_id"], paused,
|
||||
)
|
||||
updated += 1
|
||||
if updated:
|
||||
verb = "muted in" if sub == "off" else "re-enabled for"
|
||||
return f"✓ Chat {chat_arg} {verb} {updated} connection(s)."
|
||||
return f"No change — {chat_arg} was already in that state."
|
||||
|
||||
return (
|
||||
"Usage:\n"
|
||||
" /biz — show status\n"
|
||||
" /biz pause — pause drafting (still see messages)\n"
|
||||
" /biz resume — resume drafting\n"
|
||||
" /biz off <id> — mute drafting for one customer chat\n"
|
||||
" /biz on <id> — re-enable drafting for one customer chat"
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Rendering helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _render_draft_owner_message(
|
||||
*, customer_name: str, customer_text: str, draft_text: str
|
||||
) -> str:
|
||||
"""Build the plain-text owner-DM body that carries one draft."""
|
||||
return (
|
||||
f"💬 {customer_name} wrote:\n"
|
||||
f"{_quote_block(customer_text)}\n\n"
|
||||
f"📝 Suggested reply:\n"
|
||||
f"{draft_text}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _render_resolved_message(draft: Dict[str, Any], *, status: str) -> str:
|
||||
"""Re-render the owner DM after a button decision."""
|
||||
header = {
|
||||
"sent": "✓ Sent",
|
||||
"edited": "✓ Sent (edited)",
|
||||
"discarded": "✕ Discarded",
|
||||
"expired": "⏰ Expired",
|
||||
"awaiting_edit": "✎ Edit",
|
||||
}.get(status, status)
|
||||
return (
|
||||
f"{header}\n\n"
|
||||
f"💬 Customer wrote:\n{_quote_block(draft.get('customer_text', ''))}\n\n"
|
||||
f"📝 Draft was:\n{draft.get('draft_text', '')}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _render_status(*, connections: List[Dict[str, Any]], owner_chat_id: str) -> str:
|
||||
if not connections:
|
||||
return (
|
||||
"You haven't connected this bot to any Telegram Business account yet.\n\n"
|
||||
"Open Telegram → Settings → Business → Chatbots, paste my @username, "
|
||||
"and pick which chats I can see."
|
||||
)
|
||||
lines = ["📋 Business Mode status\n"]
|
||||
for c in connections:
|
||||
state = "🟢 active" if c.get("is_enabled") else "⚪ ended"
|
||||
auto = "drafting ON" if c.get("auto_draft") else "drafting PAUSED"
|
||||
send_perm = "send ON" if c.get("can_reply") else "send OFF"
|
||||
paused = c.get("paused_chats") or []
|
||||
paused_note = f" — muted chats: {', '.join(paused)}" if paused else ""
|
||||
lines.append(
|
||||
f"• {state} · {auto} · {send_perm}{paused_note}"
|
||||
)
|
||||
lines.append("")
|
||||
lines.append(
|
||||
"Commands: /biz pause · /biz resume · /biz off <chat_id> · /biz on <chat_id>"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
@staticmethod
|
||||
def _build_draft_keyboard(draft_id: int, *, can_reply: bool):
|
||||
"""Construct the inline keyboard for one draft.
|
||||
|
||||
Imported lazily so the module is importable when python-telegram-bot
|
||||
is missing (matches the lazy-deps pattern used elsewhere in the
|
||||
adapter).
|
||||
"""
|
||||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
|
||||
row = []
|
||||
if can_reply:
|
||||
row.append(InlineKeyboardButton(
|
||||
"✓ Send", callback_data=f"{CALLBACK_PREFIX}{CHOICE_SEND}:{draft_id}",
|
||||
))
|
||||
row.append(InlineKeyboardButton(
|
||||
"✎ Edit", callback_data=f"{CALLBACK_PREFIX}{CHOICE_EDIT}:{draft_id}",
|
||||
))
|
||||
row.append(InlineKeyboardButton(
|
||||
"✕ Discard", callback_data=f"{CALLBACK_PREFIX}{CHOICE_DISCARD}:{draft_id}",
|
||||
))
|
||||
return InlineKeyboardMarkup([row])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _customer_display_name(message: Any) -> str:
|
||||
"""Best-effort human name for the customer who sent ``message``."""
|
||||
user = getattr(message, "from_user", None)
|
||||
if user is not None:
|
||||
for attr in ("full_name", "first_name", "username"):
|
||||
val = getattr(user, attr, None)
|
||||
if val:
|
||||
return str(val)
|
||||
chat = getattr(message, "chat", None)
|
||||
if chat is not None:
|
||||
for attr in ("full_name", "title", "username"):
|
||||
val = getattr(chat, attr, None)
|
||||
if val:
|
||||
return str(val)
|
||||
return "Customer"
|
||||
|
||||
|
||||
def _quote_block(text: str, *, max_len: int = 600) -> str:
|
||||
"""Render a customer message as a quoted block for the owner DM."""
|
||||
if not text:
|
||||
return " (no text)"
|
||||
if len(text) > max_len:
|
||||
text = text[: max_len - 1].rstrip() + "…"
|
||||
return "\n".join(f" > {line}" for line in text.splitlines())
|
||||
@@ -67,6 +67,8 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
||||
aliases=("reset",), args_hint="[name]"),
|
||||
CommandDef("topic", "Enable or inspect Telegram DM topic sessions", "Session",
|
||||
gateway_only=True, args_hint="[off|help|session-id]"),
|
||||
CommandDef("biz", "Manage Telegram Business Mode (Secretary Bots) drafting", "Session",
|
||||
gateway_only=True, args_hint="[pause|resume|off <chat>|on <chat>]"),
|
||||
CommandDef("clear", "Clear screen and start a new session", "Session",
|
||||
cli_only=True),
|
||||
CommandDef("redraw", "Force a full UI repaint (recovers from terminal drift)", "Session",
|
||||
|
||||
@@ -1321,6 +1321,19 @@ DEFAULT_CONFIG = {
|
||||
"reactions": False, # Add 👀/✅/❌ reactions to messages during processing
|
||||
"channel_prompts": {}, # Per-chat/topic ephemeral system prompts (topics inherit from parent group)
|
||||
"allowed_chats": "", # If set, bot ONLY responds in these group/supergroup chat IDs (whitelist)
|
||||
# Business Mode (Secretary Bots) — owner-approved drafting for
|
||||
# Telegram Business accounts. When enabled, customer messages
|
||||
# routed via Business Mode are turned into drafts delivered to
|
||||
# the owner with Send / Edit / Discard buttons. Off by default;
|
||||
# also requires Business Mode toggled in @BotFather.
|
||||
# See website/docs/user-guide/messaging/telegram.md#business-mode.
|
||||
"business_mode": {
|
||||
"enabled": False,
|
||||
"debounce_seconds": 8, # Coalesce typing-burst customer messages into one draft
|
||||
"draft_ttl_hours": 24, # Pending drafts expire after this many hours
|
||||
"max_customer_text_chars": 4000, # Truncate very long customer messages before drafting
|
||||
"owner_persona": "", # Optional override; empty uses a sensible default persona
|
||||
},
|
||||
},
|
||||
|
||||
# Mattermost platform settings (gateway mode)
|
||||
|
||||
358
hermes_state.py
358
hermes_state.py
@@ -2484,6 +2484,364 @@ class SessionDB:
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Telegram Business Mode (Secretary Bots)
|
||||
# ------------------------------------------------------------------
|
||||
# Business Mode lets the owner of a Telegram Business account connect
|
||||
# this bot to their personal account so it can observe customer
|
||||
# conversations and draft replies for owner approval. Two tables:
|
||||
#
|
||||
# telegram_business_connections — one row per account that linked
|
||||
# this bot via BotFather Business Mode. Updated by
|
||||
# BusinessConnection updates (established / edited / ended).
|
||||
#
|
||||
# telegram_business_drafts — pending owner-approval drafts. A
|
||||
# draft is created when a customer messages a connected chat
|
||||
# and the manager produces a candidate reply; it's deleted when
|
||||
# the owner taps Send / Edit / Discard or when it expires.
|
||||
#
|
||||
# Like the topic-mode tables, this migration is lazy: it runs only
|
||||
# when business mode is actually enabled, keeping the core schema
|
||||
# clean for users who never use the feature.
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def apply_telegram_business_migration(self) -> None:
|
||||
"""Create Telegram Business Mode tables on first use.
|
||||
|
||||
Idempotent — safe to call from every business handler.
|
||||
"""
|
||||
def _do(conn):
|
||||
conn.executescript(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS telegram_business_connections (
|
||||
connection_id TEXT PRIMARY KEY,
|
||||
owner_user_id TEXT NOT NULL,
|
||||
owner_chat_id TEXT NOT NULL,
|
||||
can_reply INTEGER NOT NULL DEFAULT 0,
|
||||
is_enabled INTEGER NOT NULL DEFAULT 1,
|
||||
auto_draft INTEGER NOT NULL DEFAULT 1,
|
||||
paused_chats TEXT NOT NULL DEFAULT '[]',
|
||||
created_at REAL NOT NULL,
|
||||
updated_at REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tg_biz_conn_owner
|
||||
ON telegram_business_connections(owner_user_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS telegram_business_drafts (
|
||||
draft_id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
connection_id TEXT NOT NULL,
|
||||
owner_chat_id TEXT NOT NULL,
|
||||
customer_chat_id TEXT NOT NULL,
|
||||
customer_msg_id TEXT,
|
||||
customer_text TEXT NOT NULL,
|
||||
draft_text TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
owner_message_id TEXT,
|
||||
final_sent_text TEXT,
|
||||
created_at REAL NOT NULL,
|
||||
updated_at REAL NOT NULL,
|
||||
expires_at REAL NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tg_biz_drafts_conn_customer
|
||||
ON telegram_business_drafts(connection_id, customer_chat_id, status);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_tg_biz_drafts_owner_msg
|
||||
ON telegram_business_drafts(owner_chat_id, owner_message_id)
|
||||
WHERE owner_message_id IS NOT NULL;
|
||||
"""
|
||||
)
|
||||
|
||||
conn.execute(
|
||||
"INSERT INTO state_meta (key, value) VALUES (?, ?) "
|
||||
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
||||
("telegram_business_schema_version", "1"),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def upsert_telegram_business_connection(
|
||||
self,
|
||||
*,
|
||||
connection_id: str,
|
||||
owner_user_id: str,
|
||||
owner_chat_id: str,
|
||||
can_reply: bool,
|
||||
is_enabled: bool,
|
||||
) -> None:
|
||||
"""Insert or update a business connection row.
|
||||
|
||||
Called from BusinessConnection update handlers. Preserves
|
||||
``auto_draft`` and ``paused_chats`` across updates so the owner's
|
||||
preferences survive Telegram re-issuing the connection.
|
||||
"""
|
||||
self.apply_telegram_business_migration()
|
||||
now = time.time()
|
||||
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO telegram_business_connections (
|
||||
connection_id, owner_user_id, owner_chat_id,
|
||||
can_reply, is_enabled, auto_draft, paused_chats,
|
||||
created_at, updated_at
|
||||
) VALUES (?, ?, ?, ?, ?, 1, '[]', ?, ?)
|
||||
ON CONFLICT(connection_id) DO UPDATE SET
|
||||
owner_user_id = excluded.owner_user_id,
|
||||
owner_chat_id = excluded.owner_chat_id,
|
||||
can_reply = excluded.can_reply,
|
||||
is_enabled = excluded.is_enabled,
|
||||
updated_at = excluded.updated_at
|
||||
""",
|
||||
(
|
||||
str(connection_id),
|
||||
str(owner_user_id),
|
||||
str(owner_chat_id),
|
||||
1 if can_reply else 0,
|
||||
1 if is_enabled else 0,
|
||||
now,
|
||||
now,
|
||||
),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def get_telegram_business_connection(
|
||||
self, connection_id: str
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Return the connection row as a dict, or None."""
|
||||
with self._lock:
|
||||
try:
|
||||
row = self._conn.execute(
|
||||
"SELECT * FROM telegram_business_connections WHERE connection_id = ?",
|
||||
(str(connection_id),),
|
||||
).fetchone()
|
||||
except sqlite3.OperationalError:
|
||||
return None
|
||||
if row is None:
|
||||
return None
|
||||
d = dict(row) if isinstance(row, sqlite3.Row) else dict(zip(
|
||||
["connection_id", "owner_user_id", "owner_chat_id", "can_reply",
|
||||
"is_enabled", "auto_draft", "paused_chats", "created_at",
|
||||
"updated_at"],
|
||||
row,
|
||||
))
|
||||
try:
|
||||
d["paused_chats"] = json.loads(d.get("paused_chats") or "[]")
|
||||
except (TypeError, ValueError):
|
||||
d["paused_chats"] = []
|
||||
d["can_reply"] = bool(d.get("can_reply"))
|
||||
d["is_enabled"] = bool(d.get("is_enabled"))
|
||||
d["auto_draft"] = bool(d.get("auto_draft", 1))
|
||||
return d
|
||||
|
||||
def list_telegram_business_connections(
|
||||
self, *, owner_user_id: Optional[str] = None, enabled_only: bool = True
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List connections, optionally filtered by owner_user_id."""
|
||||
sql = "SELECT * FROM telegram_business_connections"
|
||||
clauses: List[str] = []
|
||||
params: List[Any] = []
|
||||
if owner_user_id is not None:
|
||||
clauses.append("owner_user_id = ?")
|
||||
params.append(str(owner_user_id))
|
||||
if enabled_only:
|
||||
clauses.append("is_enabled = 1")
|
||||
if clauses:
|
||||
sql += " WHERE " + " AND ".join(clauses)
|
||||
sql += " ORDER BY updated_at DESC"
|
||||
with self._lock:
|
||||
try:
|
||||
rows = self._conn.execute(sql, params).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
return []
|
||||
out: List[Dict[str, Any]] = []
|
||||
for row in rows:
|
||||
d = dict(row) if isinstance(row, sqlite3.Row) else dict(zip(
|
||||
["connection_id", "owner_user_id", "owner_chat_id", "can_reply",
|
||||
"is_enabled", "auto_draft", "paused_chats", "created_at",
|
||||
"updated_at"],
|
||||
row,
|
||||
))
|
||||
try:
|
||||
d["paused_chats"] = json.loads(d.get("paused_chats") or "[]")
|
||||
except (TypeError, ValueError):
|
||||
d["paused_chats"] = []
|
||||
d["can_reply"] = bool(d.get("can_reply"))
|
||||
d["is_enabled"] = bool(d.get("is_enabled"))
|
||||
d["auto_draft"] = bool(d.get("auto_draft", 1))
|
||||
out.append(d)
|
||||
return out
|
||||
|
||||
def set_telegram_business_auto_draft(
|
||||
self, connection_id: str, *, auto_draft: bool
|
||||
) -> None:
|
||||
"""Toggle auto-drafting globally for one connection (the /biz pause switch)."""
|
||||
self.apply_telegram_business_migration()
|
||||
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"UPDATE telegram_business_connections SET auto_draft = ?, "
|
||||
"updated_at = ? WHERE connection_id = ?",
|
||||
(1 if auto_draft else 0, time.time(), str(connection_id)),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def set_telegram_business_paused_chats(
|
||||
self, connection_id: str, paused_chats: List[str]
|
||||
) -> None:
|
||||
"""Replace the per-chat pause list for one connection."""
|
||||
self.apply_telegram_business_migration()
|
||||
payload = json.dumps([str(c) for c in (paused_chats or [])])
|
||||
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"UPDATE telegram_business_connections SET paused_chats = ?, "
|
||||
"updated_at = ? WHERE connection_id = ?",
|
||||
(payload, time.time(), str(connection_id)),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def create_telegram_business_draft(
|
||||
self,
|
||||
*,
|
||||
connection_id: str,
|
||||
owner_chat_id: str,
|
||||
customer_chat_id: str,
|
||||
customer_msg_id: Optional[str],
|
||||
customer_text: str,
|
||||
draft_text: str,
|
||||
ttl_seconds: float = 86400.0,
|
||||
) -> int:
|
||||
"""Insert a pending draft row and return its draft_id.
|
||||
|
||||
Any prior pending drafts for the same (connection, customer_chat)
|
||||
are marked superseded so only one Send button is ever live per
|
||||
conversation.
|
||||
"""
|
||||
self.apply_telegram_business_migration()
|
||||
now = time.time()
|
||||
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"UPDATE telegram_business_drafts SET status = 'superseded', "
|
||||
"updated_at = ? WHERE connection_id = ? AND customer_chat_id = ? "
|
||||
"AND status = 'pending'",
|
||||
(now, str(connection_id), str(customer_chat_id)),
|
||||
)
|
||||
cur = conn.execute(
|
||||
"""
|
||||
INSERT INTO telegram_business_drafts (
|
||||
connection_id, owner_chat_id, customer_chat_id,
|
||||
customer_msg_id, customer_text, draft_text,
|
||||
status, created_at, updated_at, expires_at
|
||||
) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(connection_id),
|
||||
str(owner_chat_id),
|
||||
str(customer_chat_id),
|
||||
str(customer_msg_id) if customer_msg_id is not None else None,
|
||||
customer_text,
|
||||
draft_text,
|
||||
now, now, now + max(60.0, float(ttl_seconds)),
|
||||
),
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
return self._execute_write(_do)
|
||||
|
||||
def get_telegram_business_draft(self, draft_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""Fetch one draft row by id."""
|
||||
with self._lock:
|
||||
try:
|
||||
row = self._conn.execute(
|
||||
"SELECT * FROM telegram_business_drafts WHERE draft_id = ?",
|
||||
(int(draft_id),),
|
||||
).fetchone()
|
||||
except sqlite3.OperationalError:
|
||||
return None
|
||||
if row is None:
|
||||
return None
|
||||
return dict(row) if isinstance(row, sqlite3.Row) else None
|
||||
|
||||
def set_telegram_business_draft_owner_message(
|
||||
self, draft_id: int, owner_message_id: str
|
||||
) -> None:
|
||||
"""Record the Telegram message_id of the owner-DM that carries the draft."""
|
||||
self.apply_telegram_business_migration()
|
||||
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"UPDATE telegram_business_drafts SET owner_message_id = ?, "
|
||||
"updated_at = ? WHERE draft_id = ?",
|
||||
(str(owner_message_id), time.time(), int(draft_id)),
|
||||
)
|
||||
self._execute_write(_do)
|
||||
|
||||
def resolve_telegram_business_draft(
|
||||
self,
|
||||
draft_id: int,
|
||||
*,
|
||||
status: str,
|
||||
final_sent_text: Optional[str] = None,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Atomically mark a draft sent / edited / discarded and return its prior row.
|
||||
|
||||
Returns None if the draft no longer exists or was already resolved
|
||||
(so callbacks for stale buttons no-op safely).
|
||||
"""
|
||||
self.apply_telegram_business_migration()
|
||||
if status not in {"sent", "edited", "discarded", "expired"}:
|
||||
raise ValueError(f"invalid business draft status: {status}")
|
||||
now = time.time()
|
||||
|
||||
def _do(conn):
|
||||
row = conn.execute(
|
||||
"SELECT * FROM telegram_business_drafts WHERE draft_id = ? AND status = 'pending'",
|
||||
(int(draft_id),),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
conn.execute(
|
||||
"UPDATE telegram_business_drafts SET status = ?, "
|
||||
"final_sent_text = COALESCE(?, final_sent_text), "
|
||||
"updated_at = ? WHERE draft_id = ?",
|
||||
(status, final_sent_text, now, int(draft_id)),
|
||||
)
|
||||
return dict(row) if isinstance(row, sqlite3.Row) else None
|
||||
|
||||
return self._execute_write(_do)
|
||||
|
||||
def get_pending_telegram_business_drafts_for_owner(
|
||||
self, owner_chat_id: str
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""List all pending drafts addressed to one owner DM (for /biz status)."""
|
||||
with self._lock:
|
||||
try:
|
||||
rows = self._conn.execute(
|
||||
"SELECT * FROM telegram_business_drafts WHERE owner_chat_id = ? "
|
||||
"AND status = 'pending' ORDER BY created_at ASC",
|
||||
(str(owner_chat_id),),
|
||||
).fetchall()
|
||||
except sqlite3.OperationalError:
|
||||
return []
|
||||
return [dict(r) if isinstance(r, sqlite3.Row) else r for r in rows]
|
||||
|
||||
def expire_telegram_business_drafts(self, *, now: Optional[float] = None) -> int:
|
||||
"""Mark drafts past their expiry as expired. Returns count affected."""
|
||||
self.apply_telegram_business_migration()
|
||||
cutoff = now if now is not None else time.time()
|
||||
|
||||
def _do(conn):
|
||||
cur = conn.execute(
|
||||
"UPDATE telegram_business_drafts SET status = 'expired', "
|
||||
"updated_at = ? WHERE status = 'pending' AND expires_at < ?",
|
||||
(cutoff, cutoff),
|
||||
)
|
||||
return cur.rowcount
|
||||
|
||||
return self._execute_write(_do)
|
||||
|
||||
def enable_telegram_topic_mode(
|
||||
self,
|
||||
*,
|
||||
|
||||
683
tests/gateway/test_telegram_business.py
Normal file
683
tests/gateway/test_telegram_business.py
Normal file
@@ -0,0 +1,683 @@
|
||||
"""Tests for Telegram Business Mode (Secretary Bots).
|
||||
|
||||
Covers two layers:
|
||||
- ``hermes_state.SessionDB`` business-connection + draft CRUD
|
||||
- ``gateway.platforms.telegram_business.BusinessModeManager`` orchestration
|
||||
|
||||
The manager tests use lightweight async stubs for the adapter callbacks
|
||||
(``send_message``, ``draft_generator``) so the agent loop / network are
|
||||
never touched — pure unit-level coverage of the state machine.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from types import SimpleNamespace
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import pytest
|
||||
|
||||
from hermes_state import SessionDB
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def db(tmp_path):
|
||||
"""Fresh SessionDB with the business migration already applied."""
|
||||
db_path = tmp_path / "biz_state.db"
|
||||
sdb = SessionDB(db_path=db_path)
|
||||
sdb.apply_telegram_business_migration()
|
||||
yield sdb
|
||||
sdb.close()
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Schema / CRUD
|
||||
# =========================================================================
|
||||
|
||||
|
||||
class TestBusinessConnectionPersistence:
|
||||
def test_upsert_and_get(self, db):
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=True, is_enabled=True,
|
||||
)
|
||||
row = db.get_telegram_business_connection("c1")
|
||||
assert row is not None
|
||||
assert row["connection_id"] == "c1"
|
||||
assert row["owner_user_id"] == "42"
|
||||
assert row["owner_chat_id"] == "100"
|
||||
assert row["can_reply"] is True
|
||||
assert row["is_enabled"] is True
|
||||
assert row["auto_draft"] is True
|
||||
assert row["paused_chats"] == []
|
||||
|
||||
def test_upsert_preserves_auto_draft_and_paused(self, db):
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=True, is_enabled=True,
|
||||
)
|
||||
db.set_telegram_business_auto_draft("c1", auto_draft=False)
|
||||
db.set_telegram_business_paused_chats("c1", ["200", "300"])
|
||||
# Simulate Telegram re-sending the BusinessConnection
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=False, is_enabled=True,
|
||||
)
|
||||
row = db.get_telegram_business_connection("c1")
|
||||
# can_reply updated, but owner preferences kept
|
||||
assert row["can_reply"] is False
|
||||
assert row["auto_draft"] is False
|
||||
assert sorted(row["paused_chats"]) == ["200", "300"]
|
||||
|
||||
def test_list_enabled_filter(self, db):
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=True, is_enabled=True,
|
||||
)
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c2", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=False, is_enabled=False,
|
||||
)
|
||||
active = db.list_telegram_business_connections(
|
||||
owner_user_id="42", enabled_only=True,
|
||||
)
|
||||
all_rows = db.list_telegram_business_connections(
|
||||
owner_user_id="42", enabled_only=False,
|
||||
)
|
||||
assert len(active) == 1 and active[0]["connection_id"] == "c1"
|
||||
assert len(all_rows) == 2
|
||||
|
||||
def test_get_returns_none_for_unknown(self, db):
|
||||
assert db.get_telegram_business_connection("missing") is None
|
||||
|
||||
def test_migration_idempotent(self, db):
|
||||
# Calling twice must not raise; subsequent CRUD still works.
|
||||
db.apply_telegram_business_migration()
|
||||
db.apply_telegram_business_migration()
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=True, is_enabled=True,
|
||||
)
|
||||
assert db.get_telegram_business_connection("c1") is not None
|
||||
|
||||
|
||||
class TestBusinessDraftLifecycle:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _conn(self, db):
|
||||
db.upsert_telegram_business_connection(
|
||||
connection_id="c1", owner_user_id="42",
|
||||
owner_chat_id="100", can_reply=True, is_enabled=True,
|
||||
)
|
||||
|
||||
def test_create_and_get(self, db):
|
||||
did = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
row = db.get_telegram_business_draft(did)
|
||||
assert row is not None
|
||||
assert row["customer_text"] == "hi"
|
||||
assert row["draft_text"] == "hello"
|
||||
assert row["status"] == "pending"
|
||||
assert row["owner_message_id"] is None
|
||||
assert row["final_sent_text"] is None
|
||||
assert row["expires_at"] > row["created_at"]
|
||||
|
||||
def test_new_draft_supersedes_prior_pending(self, db):
|
||||
d1 = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
d2 = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m2", customer_text="hi again", draft_text="hello again",
|
||||
)
|
||||
assert d1 != d2
|
||||
# Prior draft should be marked superseded.
|
||||
prior = db.get_telegram_business_draft(d1)
|
||||
new = db.get_telegram_business_draft(d2)
|
||||
assert prior["status"] == "superseded"
|
||||
assert new["status"] == "pending"
|
||||
|
||||
def test_supersede_only_affects_same_customer_chat(self, db):
|
||||
d1 = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="A", draft_text="a",
|
||||
)
|
||||
d2 = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="999",
|
||||
customer_msg_id="m2", customer_text="B", draft_text="b",
|
||||
)
|
||||
# Different customer chats — d1 should still be pending.
|
||||
assert db.get_telegram_business_draft(d1)["status"] == "pending"
|
||||
assert db.get_telegram_business_draft(d2)["status"] == "pending"
|
||||
|
||||
def test_resolve_sent_returns_prior_and_blocks_double_resolve(self, db):
|
||||
did = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
first = db.resolve_telegram_business_draft(
|
||||
did, status="sent", final_sent_text="hello",
|
||||
)
|
||||
assert first is not None
|
||||
# Status now committed
|
||||
row = db.get_telegram_business_draft(did)
|
||||
assert row["status"] == "sent"
|
||||
assert row["final_sent_text"] == "hello"
|
||||
# Second resolution must be a no-op.
|
||||
second = db.resolve_telegram_business_draft(did, status="discarded")
|
||||
assert second is None
|
||||
assert db.get_telegram_business_draft(did)["status"] == "sent"
|
||||
|
||||
def test_resolve_invalid_status_raises(self, db):
|
||||
did = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
with pytest.raises(ValueError):
|
||||
db.resolve_telegram_business_draft(did, status="bogus")
|
||||
|
||||
def test_owner_message_id_round_trip(self, db):
|
||||
did = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
db.set_telegram_business_draft_owner_message(did, "owner_msg_42")
|
||||
row = db.get_telegram_business_draft(did)
|
||||
assert row["owner_message_id"] == "owner_msg_42"
|
||||
|
||||
def test_expire_only_affects_overdue_pending(self, db):
|
||||
d_old = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m_old", customer_text="old", draft_text="old reply",
|
||||
ttl_seconds=60.0,
|
||||
)
|
||||
# Different customer chat so it doesn't supersede d_old.
|
||||
d_new = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="201",
|
||||
customer_msg_id="m_new", customer_text="new", draft_text="new reply",
|
||||
ttl_seconds=999_999.0,
|
||||
)
|
||||
# Run expiry just past d_old's expiry but well before d_new's.
|
||||
d_old_row = db.get_telegram_business_draft(d_old)
|
||||
affected = db.expire_telegram_business_drafts(now=d_old_row["expires_at"] + 1.0)
|
||||
assert affected == 1
|
||||
assert db.get_telegram_business_draft(d_old)["status"] == "expired"
|
||||
assert db.get_telegram_business_draft(d_new)["status"] == "pending"
|
||||
|
||||
def test_get_pending_for_owner(self, db):
|
||||
did = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
|
||||
customer_msg_id="m1", customer_text="hi", draft_text="hello",
|
||||
)
|
||||
# Resolved draft must not appear.
|
||||
d2 = db.create_telegram_business_draft(
|
||||
connection_id="c1", owner_chat_id="100", customer_chat_id="999",
|
||||
customer_msg_id="m2", customer_text="hi2", draft_text="hello2",
|
||||
)
|
||||
db.resolve_telegram_business_draft(d2, status="discarded")
|
||||
pending = db.get_pending_telegram_business_drafts_for_owner("100")
|
||||
ids = {row["draft_id"] for row in pending}
|
||||
assert did in ids
|
||||
assert d2 not in ids
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Manager orchestration
|
||||
# =========================================================================
|
||||
|
||||
|
||||
def _fake_business_connection(
|
||||
*, conn_id="conn1", owner_id=42, owner_chat=100,
|
||||
is_enabled=True, can_reply=True,
|
||||
) -> Any:
|
||||
"""Build a duck-typed BusinessConnection good enough for the manager."""
|
||||
rights = SimpleNamespace(can_reply=can_reply) if can_reply is not None else None
|
||||
return SimpleNamespace(
|
||||
id=conn_id,
|
||||
user=SimpleNamespace(id=owner_id, full_name="Owner"),
|
||||
user_chat_id=owner_chat,
|
||||
is_enabled=is_enabled,
|
||||
rights=rights,
|
||||
)
|
||||
|
||||
|
||||
def _fake_business_message(
|
||||
*, conn_id="conn1", customer_chat_id=200, customer_id=999,
|
||||
text="Hi there", msg_id=42,
|
||||
) -> Any:
|
||||
"""Build a duck-typed business_message-style PTB Message."""
|
||||
return SimpleNamespace(
|
||||
business_connection_id=conn_id,
|
||||
chat=SimpleNamespace(id=customer_chat_id, type="private"),
|
||||
from_user=SimpleNamespace(
|
||||
id=customer_id, full_name="Customer Carol",
|
||||
first_name="Carol", username="carol",
|
||||
),
|
||||
text=text,
|
||||
caption=None,
|
||||
message_id=msg_id,
|
||||
)
|
||||
|
||||
|
||||
class _SentRecorder:
|
||||
"""Capture all send_message kwargs the manager produces."""
|
||||
|
||||
def __init__(self, *, fail: bool = False):
|
||||
self.calls: List[Dict[str, Any]] = []
|
||||
self.fail = fail
|
||||
self._next_id = 1000
|
||||
|
||||
async def __call__(self, **kwargs):
|
||||
self.calls.append(kwargs)
|
||||
if self.fail:
|
||||
raise RuntimeError("simulated send failure")
|
||||
sent = SimpleNamespace(message_id=self._next_id)
|
||||
self._next_id += 1
|
||||
return sent
|
||||
|
||||
|
||||
def _make_manager(db, *, draft_text="Draft reply!", draft_fails=False,
|
||||
send_recorder=None, debounce=0.0):
|
||||
from gateway.platforms.telegram_business import BusinessModeManager
|
||||
|
||||
async def _draft(customer_text: str, customer_chat_id: str) -> str:
|
||||
if draft_fails:
|
||||
raise RuntimeError("model boom")
|
||||
return draft_text
|
||||
|
||||
sender = send_recorder if send_recorder is not None else _SentRecorder()
|
||||
mgr = BusinessModeManager(
|
||||
session_db=db,
|
||||
send_message=sender,
|
||||
draft_generator=_draft,
|
||||
debounce_seconds=debounce,
|
||||
)
|
||||
return mgr, sender
|
||||
|
||||
|
||||
class TestConnectionLifecycle:
|
||||
@pytest.mark.asyncio
|
||||
async def test_first_connection_persists_and_sends_onboarding(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
row = db.get_telegram_business_connection("conn1")
|
||||
assert row is not None
|
||||
assert row["is_enabled"] is True
|
||||
assert row["can_reply"] is True
|
||||
# Owner got the onboarding DM
|
||||
assert len(sender.calls) == 1
|
||||
assert sender.calls[0]["chat_id"] == 100
|
||||
assert "Business assistant" in sender.calls[0]["text"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_repeat_connection_does_not_resend_onboarding(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
assert sender.calls == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_disconnection_dms_owner(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
await mgr.handle_connection_update(
|
||||
_fake_business_connection(is_enabled=False)
|
||||
)
|
||||
assert any("ended" in c["text"].lower() for c in sender.calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_can_reply_toggle_notifies_owner(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection(can_reply=True))
|
||||
sender.calls.clear()
|
||||
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
|
||||
assert sender.calls, "owner should be told send permission flipped"
|
||||
assert "OFF" in sender.calls[0]["text"]
|
||||
|
||||
|
||||
class TestBusinessMessageDraftFlow:
|
||||
@pytest.mark.asyncio
|
||||
async def test_incoming_customer_message_drafts_and_dms_owner(self, db):
|
||||
mgr, sender = _make_manager(db, draft_text="Hey, thanks for reaching out!")
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message(text="Hi!"))
|
||||
assert sender.calls, "draft should have been DM'd to the owner"
|
||||
owner_msg = sender.calls[0]
|
||||
assert owner_msg["chat_id"] == 100
|
||||
assert "Hey, thanks for reaching out!" in owner_msg["text"]
|
||||
assert "Hi!" in owner_msg["text"]
|
||||
# Inline keyboard rendered (under the test telegram mock,
|
||||
# InlineKeyboardMarkup is a MagicMock — we just verify a keyboard
|
||||
# was attached and that three callback_datas were produced).
|
||||
assert owner_msg.get("reply_markup") is not None
|
||||
# One draft row recorded
|
||||
drafts = db.get_pending_telegram_business_drafts_for_owner("100")
|
||||
assert len(drafts) == 1
|
||||
assert drafts[0]["draft_text"] == "Hey, thanks for reaching out!"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_can_reply_drops_send_button(self, db, monkeypatch):
|
||||
# Capture callback_data passed to InlineKeyboardButton to verify
|
||||
# the Send button is omitted when can_reply is False. We wrap
|
||||
# _build_draft_keyboard so we can inspect the choices it produced
|
||||
# — the underlying telegram mock makes the resulting keyboard
|
||||
# object opaque.
|
||||
from gateway.platforms import telegram_business as biz_mod
|
||||
|
||||
original = biz_mod.BusinessModeManager._build_draft_keyboard
|
||||
captured: List[str] = []
|
||||
|
||||
def _spy(draft_id, *, can_reply):
|
||||
if can_reply:
|
||||
captured.append("send")
|
||||
captured.append("edit")
|
||||
captured.append("discard")
|
||||
return original(draft_id, can_reply=can_reply)
|
||||
|
||||
monkeypatch.setattr(
|
||||
biz_mod.BusinessModeManager,
|
||||
"_build_draft_keyboard",
|
||||
staticmethod(_spy),
|
||||
)
|
||||
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
assert captured == ["edit", "discard"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_connection_silently_ignored(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
# No prior handle_connection_update — should silently skip.
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
assert sender.calls == []
|
||||
assert db.get_pending_telegram_business_drafts_for_owner("100") == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_draft_paused_skips_drafting(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
db.set_telegram_business_auto_draft("conn1", auto_draft=False)
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
assert sender.calls == []
|
||||
assert db.get_pending_telegram_business_drafts_for_owner("100") == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_paused_customer_chat_skipped(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
db.set_telegram_business_paused_chats("conn1", ["200"])
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message(customer_chat_id=200))
|
||||
assert sender.calls == []
|
||||
# Other chat still drafts.
|
||||
await mgr.handle_business_message(_fake_business_message(customer_chat_id=300))
|
||||
assert sender.calls
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_text_skipped(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message(text=""))
|
||||
assert sender.calls == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_draft_failure_reports_to_owner(self, db):
|
||||
mgr, sender = _make_manager(db, draft_fails=True)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
await mgr.handle_business_message(_fake_business_message(text="Hi"))
|
||||
assert sender.calls
|
||||
assert "couldn't draft" in sender.calls[0]["text"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_debounce_coalesces_burst(self, db):
|
||||
# Use a real debounce window and fire 3 messages in quick succession.
|
||||
mgr, sender = _make_manager(db, debounce=0.05,
|
||||
draft_text="single draft")
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
for i in range(3):
|
||||
await mgr.handle_business_message(
|
||||
_fake_business_message(text=f"part {i}", msg_id=i)
|
||||
)
|
||||
# Let the debounce fire.
|
||||
await asyncio.sleep(0.2)
|
||||
# Only one draft should have been generated and one owner DM sent.
|
||||
owner_dms = [c for c in sender.calls if c.get("chat_id") == 100]
|
||||
assert len(owner_dms) == 1
|
||||
|
||||
|
||||
class TestCallbackDispatch:
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_button_delivers_to_customer_chat(self, db):
|
||||
mgr, sender = _make_manager(db, draft_text="hello there")
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
sender.calls.clear()
|
||||
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
|
||||
did = draft["draft_id"]
|
||||
# Fake the inline-button click
|
||||
answered: List[Dict[str, Any]] = []
|
||||
edited: List[Dict[str, Any]] = []
|
||||
|
||||
async def _answer(**kw): answered.append(kw)
|
||||
async def _edit(**kw): edited.append(kw)
|
||||
|
||||
dispatched = await mgr.handle_callback(
|
||||
data=f"bd:send:{did}", caller_user_id="42",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
assert dispatched is True
|
||||
# Sent to customer chat with business_connection_id
|
||||
sends_to_customer = [
|
||||
c for c in sender.calls if c.get("chat_id") == 200
|
||||
]
|
||||
assert sends_to_customer
|
||||
assert sends_to_customer[0]["business_connection_id"] == "conn1"
|
||||
assert sends_to_customer[0]["text"] == "hello there"
|
||||
# Draft now sent
|
||||
row = db.get_telegram_business_draft(did)
|
||||
assert row["status"] == "sent"
|
||||
assert row["final_sent_text"] == "hello there"
|
||||
# Owner DM was edited to show resolution
|
||||
assert edited and "Sent" in edited[0]["text"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_discard_marks_status(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
|
||||
did = draft["draft_id"]
|
||||
|
||||
async def _answer(**kw): pass
|
||||
async def _edit(**kw): pass
|
||||
|
||||
await mgr.handle_callback(
|
||||
data=f"bd:discard:{did}", caller_user_id="42",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
assert db.get_telegram_business_draft(did)["status"] == "discarded"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_callback_for_unknown_draft_no_ops(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
sender.calls.clear()
|
||||
|
||||
answered: List[Dict[str, Any]] = []
|
||||
|
||||
async def _answer(**kw): answered.append(kw)
|
||||
async def _edit(**kw): pass
|
||||
|
||||
await mgr.handle_callback(
|
||||
data="bd:send:999999", caller_user_id="42",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
assert answered and "expired" in answered[0]["text"].lower()
|
||||
# No customer-chat sends.
|
||||
assert not any(c.get("chat_id") == 200 for c in sender.calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_callback_rejects_non_owner(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
|
||||
did = draft["draft_id"]
|
||||
answered: List[Dict[str, Any]] = []
|
||||
|
||||
async def _answer(**kw): answered.append(kw)
|
||||
async def _edit(**kw): pass
|
||||
|
||||
# Different user — should be rejected.
|
||||
await mgr.handle_callback(
|
||||
data=f"bd:send:{did}", caller_user_id="9999",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
assert answered and "Only the connected account owner" in answered[0]["text"]
|
||||
assert db.get_telegram_business_draft(did)["status"] == "pending"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_blocked_when_can_reply_false(self, db):
|
||||
mgr, sender = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
|
||||
did = draft["draft_id"]
|
||||
answered: List[Dict[str, Any]] = []
|
||||
|
||||
async def _answer(**kw): answered.append(kw)
|
||||
async def _edit(**kw): pass
|
||||
|
||||
await mgr.handle_callback(
|
||||
data=f"bd:send:{did}", caller_user_id="42",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
# Send-on-your-behalf is OFF → reject with explanation, no customer send.
|
||||
assert any("send-on-your-behalf is off" in (a.get("text") or "").lower()
|
||||
for a in answered)
|
||||
assert not any(c.get("chat_id") == 200 for c in sender.calls)
|
||||
assert db.get_telegram_business_draft(did)["status"] == "pending"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_non_bd_callback_returns_false(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
|
||||
async def _noop(**kw):
|
||||
pass
|
||||
|
||||
dispatched = await mgr.handle_callback(
|
||||
data="ea:once:1", caller_user_id="42",
|
||||
answer=_noop, edit_message_text=_noop,
|
||||
)
|
||||
assert dispatched is False
|
||||
|
||||
|
||||
class TestEditCapture:
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_then_text_sends_override(self, db):
|
||||
mgr, sender = _make_manager(db, draft_text="original draft")
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
await mgr.handle_business_message(_fake_business_message())
|
||||
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
|
||||
did = draft["draft_id"]
|
||||
# Tap Edit
|
||||
async def _answer(**kw): pass
|
||||
async def _edit(**kw): pass
|
||||
|
||||
await mgr.handle_callback(
|
||||
data=f"bd:edit:{did}", caller_user_id="42",
|
||||
answer=_answer, edit_message_text=_edit,
|
||||
)
|
||||
sender.calls.clear()
|
||||
# Owner sends override text
|
||||
consumed = await mgr.maybe_handle_edit_capture(
|
||||
owner_chat_id="100", text="my custom reply",
|
||||
)
|
||||
assert consumed is True
|
||||
# Customer received the override
|
||||
customer_sends = [c for c in sender.calls if c.get("chat_id") == 200]
|
||||
assert customer_sends
|
||||
assert customer_sends[0]["text"] == "my custom reply"
|
||||
assert customer_sends[0]["business_connection_id"] == "conn1"
|
||||
# Draft now resolved as edited
|
||||
row = db.get_telegram_business_draft(did)
|
||||
assert row["status"] == "edited"
|
||||
assert row["final_sent_text"] == "my custom reply"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_edit_capture_idle_returns_false(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
consumed = await mgr.maybe_handle_edit_capture(
|
||||
owner_chat_id="100", text="hello",
|
||||
)
|
||||
assert consumed is False
|
||||
|
||||
|
||||
class TestBizCommand:
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_with_no_connections(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=[],
|
||||
)
|
||||
assert "haven't connected" in reply.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_status_with_connection(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=[],
|
||||
)
|
||||
assert "active" in reply
|
||||
assert "drafting ON" in reply
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pause_and_resume(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=["pause"],
|
||||
)
|
||||
assert "Paused" in reply
|
||||
assert db.get_telegram_business_connection("conn1")["auto_draft"] is False
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=["resume"],
|
||||
)
|
||||
assert "Resumed" in reply
|
||||
assert db.get_telegram_business_connection("conn1")["auto_draft"] is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_per_chat_off_and_on(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
await mgr.handle_connection_update(_fake_business_connection())
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=["off", "200"],
|
||||
)
|
||||
assert "200" in reply
|
||||
assert "200" in db.get_telegram_business_connection("conn1")["paused_chats"]
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=["on", "200"],
|
||||
)
|
||||
assert "200" in reply
|
||||
assert "200" not in db.get_telegram_business_connection("conn1")["paused_chats"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unknown_subcommand_returns_help(self, db):
|
||||
mgr, _ = _make_manager(db)
|
||||
reply = await mgr.handle_biz_command(
|
||||
owner_user_id="42", owner_chat_id="100", args=["unknown"],
|
||||
)
|
||||
assert "Usage" in reply
|
||||
@@ -920,6 +920,85 @@ Tap a button to answer, or tap **Other** to type a free-form response (the next
|
||||
|
||||
Configure the response timeout via `agent.clarify_timeout` in `~/.hermes/config.yaml` (default `600` seconds). If you don't respond within the timeout, the agent unblocks with a sentinel message and adapts rather than hanging.
|
||||
|
||||
## Business Mode (Secretary Bots)
|
||||
|
||||
Telegram Business Mode lets a Telegram Business account owner connect this bot to their personal account so it can **draft replies** to incoming customer messages. Replies are **never sent automatically** — every draft is delivered to the owner's DM with a [✓ Send] [✎ Edit] [✕ Discard] inline keyboard, and nothing reaches the customer until the owner taps Send.
|
||||
|
||||
### How the flow works
|
||||
|
||||
1. The owner connects the bot via Telegram → Settings → Business → Chatbots.
|
||||
2. A customer messages the owner in a chat covered by the connection.
|
||||
3. The bot debounces typing bursts (default 8s) and produces a single draft.
|
||||
4. The bot DMs the draft to the **owner's own chat** with this bot:
|
||||
|
||||
```
|
||||
💬 Carol wrote:
|
||||
> Hey, are you free Friday afternoon?
|
||||
|
||||
📝 Suggested reply:
|
||||
Friday afternoon works — anywhere between 2 and 5pm?
|
||||
|
||||
[✓ Send] [✎ Edit] [✕ Discard]
|
||||
```
|
||||
|
||||
5. The owner taps:
|
||||
- **Send** — the draft is delivered to the customer using the connection's `business_connection_id`, appearing as if the owner sent it.
|
||||
- **Edit** — the bot replies "send me the text you want delivered" and uses the owner's next DM as the final message.
|
||||
- **Discard** — the draft is dropped, the customer sees nothing.
|
||||
|
||||
### Requirements
|
||||
|
||||
- A **Telegram Business** subscription on the owner's personal account.
|
||||
- The bot's **Business Mode** toggle enabled in [@BotFather](https://t.me/botfather): `/mybots` → pick your bot → Bot Settings → Business Mode.
|
||||
- For the **Send** button to be active, the owner must grant `Reply to Messages` permission when configuring the bot in their Telegram Business chatbot settings. Without it, the bot can still draft replies but the Send button is hidden — the owner has to copy and send manually.
|
||||
|
||||
### Configuration
|
||||
|
||||
In `~/.hermes/config.yaml`:
|
||||
|
||||
```yaml
|
||||
telegram:
|
||||
business_mode:
|
||||
enabled: true # off by default
|
||||
debounce_seconds: 8 # coalesce typing-burst messages into one draft
|
||||
draft_ttl_hours: 24 # pending drafts older than this expire
|
||||
max_customer_text_chars: 4000
|
||||
owner_persona: "" # optional system-prompt override
|
||||
```
|
||||
|
||||
The `owner_persona` field lets you customize the voice of drafts — for example, `"You are drafting on behalf of a freelance illustrator. Sound friendly and professional; never quote prices without consulting me first."` When empty, a sensible default persona is used (warm, direct, concise; matches the customer's language).
|
||||
|
||||
Restart the gateway after changing these.
|
||||
|
||||
### `/biz` slash command
|
||||
|
||||
The owner can manage drafting via `/biz` in their DM with the bot:
|
||||
|
||||
| Command | Effect |
|
||||
|---|---|
|
||||
| `/biz` | Show status of all connections and active customer chats |
|
||||
| `/biz pause` | Globally stop producing drafts (customers' messages still arrive — owner just sees them, no drafts) |
|
||||
| `/biz resume` | Resume drafting |
|
||||
| `/biz off <chat_id>` | Mute drafting for one customer chat |
|
||||
| `/biz on <chat_id>` | Re-enable drafting for one customer chat |
|
||||
|
||||
### What stays on by default
|
||||
|
||||
- **Drafting is always observe-with-approval.** There is no auto-send setting in v1. Even with `can_reply` enabled, every reply requires an explicit owner tap.
|
||||
- **Only the connected owner can use the inline buttons.** Telegram callback queries are user-scoped — if someone else taps Send (e.g. another user with bot DM access), the bot rejects with `⛔ Only the connected account owner can use these buttons.`
|
||||
- **Drafts expire after 24h** (configurable). Stale buttons become no-ops with `That draft has expired.`
|
||||
- **Telegram never sees a draft.** Drafts live in `~/.hermes/state.db` under `telegram_business_drafts`; only the owner's DM ever sees the suggested text.
|
||||
|
||||
### Limits in v1
|
||||
|
||||
- **Text only.** Customer media (photos, voice, documents) is skipped — only text messages and captions trigger drafts. v2 will add media context via vision.
|
||||
- **No conversation history.** Each customer message is drafted in isolation; the bot doesn't remember the previous exchange. If your customer is mid-conversation, the draft may need editing — that's what the Edit button is for.
|
||||
- **No per-customer persona learning.** Owner edits override the draft for that one reply but don't train future drafts.
|
||||
|
||||
### Disabling
|
||||
|
||||
To turn the feature off entirely, set `enabled: false` and restart the gateway. To revoke individual connections, the owner can disconnect this bot from their Telegram Business settings — the bot will receive a `BusinessConnection` "ended" update and stop drafting for that account immediately.
|
||||
|
||||
## Security
|
||||
|
||||
:::warning
|
||||
|
||||
Reference in New Issue
Block a user