Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
3144585f38 feat(telegram): observe-with-approval Telegram Business Mode (Secretary Bots)
Owners of a Telegram Business account can now connect this bot to draft
replies for their incoming customer messages. Drafts go to the owner's
DM with [✓ Send] [✎ Edit] [✕ Discard] inline buttons — nothing reaches
the customer until the owner taps Send.

The flow:
  1. Owner connects the bot via Telegram → Settings → Business → Chatbots.
  2. Customer messages a chat covered by the connection.
  3. Bot debounces typing bursts (default 8s) and produces one draft via
     the auxiliary LLM client.
  4. Bot DMs the draft to the owner's chat with [✓ Send] [✎ Edit]
     [✕ Discard] buttons.
  5. Send → delivers to the customer using business_connection_id.
     Edit → bot captures the owner's next text DM as the override.
     Discard → drops the draft, customer sees nothing.

Owner controls:
  /biz                 status dashboard
  /biz pause / resume  global drafting kill switch
  /biz off|on <chat>   per-customer-chat mute toggle

Implementation:
  - New module gateway/platforms/telegram_business.py owns the state
    machine: connection lifecycle, debounce buffer, draft generation,
    callback dispatch, edit capture, /biz command.
  - Two new state.db tables (telegram_business_connections,
    telegram_business_drafts) added via a lazy migration that only
    runs when the feature is enabled.
  - telegram.py wires three new PTB handlers (BusinessConnection +
    BUSINESS_MESSAGE + EDITED_BUSINESS_MESSAGE) plus a 'bd:' callback
    prefix on the existing CallbackQueryHandler.
  - Config gate: telegram.business_mode.enabled (off by default);
    requires Business Mode toggled in @BotFather too.

Safety properties baked in:
  - Drafting is always observe-with-approval — there is no auto-send
    setting in v1, even when can_reply is granted.
  - Only the connected owner can use the inline buttons (callback
    authorization checks owner_user_id against the connection row).
  - Drafts expire after 24h; stale buttons no-op with 'expired'.
  - Without can_reply permission, the Send button is hidden and the
    owner is told to copy/paste manually.

v1 limits documented in website/docs/user-guide/messaging/telegram.md:
text only (no media context), no conversation history (each draft is
single-message), no persona learning from edits. v2 will iterate.

Tests: 38 new in tests/gateway/test_telegram_business.py covering schema
CRUD, connection lifecycle, draft supersession, debounce coalescing,
all three button paths, edit capture, /biz subcommands, owner-scoped
callback authorization.
2026-05-21 14:31:41 -07:00
8 changed files with 2157 additions and 1 deletions

View File

@@ -1042,7 +1042,7 @@ def load_gateway_config() -> GatewayConfig:
if isinstance(group_allowed_chats, list):
group_allowed_chats = ",".join(str(v) for v in group_allowed_chats)
os.environ["TELEGRAM_GROUP_ALLOWED_CHATS"] = str(group_allowed_chats)
for _telegram_extra_key in ("guest_mode", "disable_link_previews"):
for _telegram_extra_key in ("guest_mode", "disable_link_previews", "business_mode"):
if _telegram_extra_key in telegram_cfg:
plat_data = platforms_data.setdefault(Platform.TELEGRAM.value, {})
if not isinstance(plat_data, dict):

View File

@@ -447,6 +447,16 @@ class TelegramAdapter(BasePlatformAdapter):
# behavior; opt-in via display.platforms.telegram.notifications).
self._notifications_mode: str = "important"
# Business Mode (Secretary Bots) — owner-approved drafting against
# incoming customer messages. Initialized lazily in ``connect()``
# once the bot is available, only when the feature is enabled.
# See ``telegram_business.py`` for the manager + state machine.
self._business_manager: Optional[Any] = None
self._business_mode_enabled: bool = bool(
self.config.extra.get("business_mode", {}).get("enabled", False)
if isinstance(self.config.extra.get("business_mode"), dict) else False
)
def _notification_kwargs(
self, metadata: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
@@ -1292,6 +1302,43 @@ class TelegramAdapter(BasePlatformAdapter):
))
# Handle inline keyboard button callbacks (update prompts)
self._app.add_handler(CallbackQueryHandler(self._handle_callback_query))
# Telegram Business Mode (Secretary Bots) handlers — gated on
# the ``telegram.business_mode.enabled`` config key. Registers
# three update types: connection lifecycle, customer messages,
# and edits to customer messages. Deletes are ignored for v1.
if self._business_mode_enabled:
try:
self._init_business_manager()
# Connection lifecycle (established / edited / ended)
try:
from telegram.ext import BusinessConnectionHandler
self._app.add_handler(
BusinessConnectionHandler(self._handle_business_connection)
)
except ImportError:
logger.warning(
"[%s] BusinessConnectionHandler unavailable — Business Mode requires python-telegram-bot >= 21.1",
self.name,
)
raise
# Incoming customer messages (TEXT only for v1)
self._app.add_handler(TelegramMessageHandler(
filters.UpdateType.BUSINESS_MESSAGE & filters.TEXT,
self._handle_business_message,
))
# Edited customer messages — informational; treated like a fresh draft.
self._app.add_handler(TelegramMessageHandler(
filters.UpdateType.EDITED_BUSINESS_MESSAGE & filters.TEXT,
self._handle_business_message,
))
logger.info("[%s] Telegram Business Mode handlers registered", self.name)
except Exception as biz_err:
logger.warning(
"[%s] Could not initialize Business Mode (%s) — feature disabled",
self.name, biz_err,
)
self._business_manager = None
# Start polling — retry initialize() for transient TLS resets
try:
@@ -2897,6 +2944,26 @@ class TelegramAdapter(BasePlatformAdapter):
)
return
# --- Business Mode draft callbacks (bd:choice:draft_id) ---
if data.startswith("bd:") and self._business_manager is not None:
caller_id = str(getattr(query.from_user, "id", ""))
try:
await self._business_manager.handle_callback(
data=data,
caller_user_id=caller_id or None,
answer=query.answer,
edit_message_text=query.edit_message_text,
)
except Exception as exc:
logger.exception(
"[%s] Business draft callback failed: %s", self.name, exc,
)
try:
await query.answer(text="⚠️ Action failed.")
except Exception:
pass
return
# --- Update prompt callbacks ---
if not data.startswith("update_prompt:"):
return
@@ -3985,6 +4052,146 @@ class TelegramAdapter(BasePlatformAdapter):
return True
return self._message_matches_mention_patterns(message)
# ------------------------------------------------------------------
# Telegram Business Mode (Secretary Bots)
# ------------------------------------------------------------------
# Only wired when ``telegram.business_mode.enabled`` is True in
# config.yaml. See ``telegram_business.py`` for the manager and
# state machine; the methods below are just thin PTB-update glue.
# ------------------------------------------------------------------
def _init_business_manager(self) -> None:
"""Construct the BusinessModeManager lazily, once the bot is up."""
if self._business_manager is not None:
return
from gateway.platforms.telegram_business import BusinessModeManager
biz_cfg = self.config.extra.get("business_mode", {}) or {}
debounce = float(biz_cfg.get("debounce_seconds", 8.0))
draft_ttl_hours = float(biz_cfg.get("draft_ttl_hours", 24.0))
max_chars = int(biz_cfg.get("max_customer_text_chars", 4000))
async def _send(**kwargs):
# Strip Markdown formatting from owner DMs — the draft body has
# raw customer text that could contain Telegram-special chars.
# The business-mode renderer emits plain text only.
kwargs.setdefault("parse_mode", None)
kwargs.setdefault("disable_web_page_preview", True)
if self._bot is None:
raise RuntimeError("Telegram bot is not connected")
return await self._bot.send_message(**kwargs)
async def _draft(customer_text: str, customer_chat_id: str) -> str:
return await self._business_draft_via_auxiliary(customer_text, customer_chat_id)
from hermes_state import SessionDB
from hermes_constants import get_hermes_home
from pathlib import Path
# Lazily acquire the shared state DB. ``hermes_state`` singleton
# is created elsewhere in the gateway runner; if we can't grab
# it from there, fall back to opening our own handle against the
# canonical state.db path.
runner = getattr(getattr(self, "_message_handler", None), "__self__", None)
db = getattr(runner, "_session_db", None) if runner else None
if db is None:
try:
db = SessionDB(Path(get_hermes_home()) / "state.db")
except Exception as exc:
raise RuntimeError(f"Failed to open state.db for Business Mode: {exc}")
self._business_manager = BusinessModeManager(
session_db=db,
send_message=_send,
draft_generator=_draft,
debounce_seconds=debounce,
draft_ttl_hours=draft_ttl_hours,
max_customer_text_chars=max_chars,
)
async def _business_draft_via_auxiliary(
self, customer_text: str, customer_chat_id: str
) -> str:
"""Generate a draft reply via the auxiliary LLM client.
Synchronous ``call_llm`` is offloaded to a thread so it doesn't
block the asyncio loop. System prompt is plain — v1 doesn't
carry any prior history; v2 will plumb a per-customer session.
"""
from agent.auxiliary_client import call_llm
biz_cfg = self.config.extra.get("business_mode", {}) or {}
owner_persona = (
biz_cfg.get("owner_persona")
or "You are drafting a short, friendly reply on behalf of the "
"account owner. Match the tone of a personal message — "
"warm, direct, and concise. Do not introduce yourself as "
"an assistant or AI. Reply in the same language the "
"customer used."
)
system_prompt = (
f"{owner_persona}\n\n"
"Output only the message text. No preamble, no quotes, no "
"markdown headers. Keep it under 4 sentences unless the "
"customer asked something that genuinely needs a longer reply."
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": customer_text},
]
loop = asyncio.get_running_loop()
response = await loop.run_in_executor(
None,
lambda: call_llm(
task="title_generation", # reuse light auxiliary task slot
messages=messages,
temperature=0.7,
max_tokens=500,
timeout=60.0,
),
)
try:
content = response.choices[0].message.content or ""
except (AttributeError, IndexError, TypeError):
content = ""
return str(content).strip()
async def _handle_business_connection(
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
) -> None:
"""PTB callback: BusinessConnection updates."""
if self._business_manager is None:
return
conn = getattr(update, "business_connection", None)
if conn is None:
return
try:
await self._business_manager.handle_connection_update(conn)
except Exception as exc:
logger.exception(
"[%s] Business connection handler failed: %s", self.name, exc,
)
async def _handle_business_message(
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
) -> None:
"""PTB callback: business_message and edited_business_message updates."""
if self._business_manager is None:
return
message = (
getattr(update, "business_message", None)
or getattr(update, "edited_business_message", None)
)
if message is None:
return
try:
await self._business_manager.handle_business_message(message)
except Exception as exc:
logger.exception(
"[%s] Business message handler failed: %s", self.name, exc,
)
async def _handle_text_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle incoming text messages.
@@ -3994,6 +4201,25 @@ class TelegramAdapter(BasePlatformAdapter):
"""
if not update.message or not update.message.text:
return
# Business Mode: if this text DM is the owner replying after they
# tapped Edit on a draft, consume it as the edit override and stop.
if self._business_manager is not None and update.message.chat is not None:
chat = update.message.chat
if getattr(chat, "type", None) == ChatType.PRIVATE:
try:
consumed = await self._business_manager.maybe_handle_edit_capture(
owner_chat_id=str(chat.id),
text=update.message.text or "",
)
except Exception as exc:
logger.exception(
"[%s] Business edit-capture failed: %s", self.name, exc,
)
consumed = False
if consumed:
return
if not self._should_process_message(update.message):
return
@@ -4005,6 +4231,41 @@ class TelegramAdapter(BasePlatformAdapter):
"""Handle incoming command messages."""
if not update.message or not update.message.text:
return
# Business Mode: /biz is handled entirely at the adapter layer
# since it manipulates per-owner business state with no agent loop
# involvement. Routed before _should_process_message so it works
# in private DMs regardless of mention requirements.
if self._business_manager is not None and update.message.chat is not None:
text = (update.message.text or "").strip()
parts = text.split()
head = parts[0] if parts else ""
if head.startswith("/"):
cmd = head[1:].split("@", 1)[0].lower()
if cmd == "biz":
args = parts[1:]
user = update.message.from_user
chat = update.message.chat
try:
reply = await self._business_manager.handle_biz_command(
owner_user_id=str(getattr(user, "id", "")),
owner_chat_id=str(chat.id),
args=args,
)
except Exception as exc:
logger.exception("[%s] /biz handler failed: %s", self.name, exc)
reply = f"⚠️ /biz failed: {exc}"
try:
if self._bot is not None:
await self._bot.send_message(
chat_id=chat.id,
text=reply,
disable_web_page_preview=True,
)
except Exception:
logger.debug("Failed to deliver /biz reply", exc_info=True)
return
if not self._should_process_message(update.message, is_command=True):
return

View File

@@ -0,0 +1,760 @@
"""Telegram Business Mode (Secretary Bots) — owner-approved drafting.
When a Telegram Business account owner connects this bot via BotFather's
Business Mode, the bot starts receiving messages addressed to the owner
in chats the owner has whitelisted. This module turns those incoming
customer messages into drafted replies that the owner approves before
they go out — no auto-send, ever.
User flow (the simplest path that works end-to-end):
1. Owner enables Business Mode in BotFather and connects this bot.
→ bot receives ``BusinessConnection`` update, persists it, DMs the
owner an onboarding message with a quick how-it-works summary.
2. Customer messages the owner in a chat covered by the connection.
→ bot debounces ``debounce_seconds`` (default 8s) to coalesce
typing bursts, then drafts a single reply using the most-recent
customer text.
→ bot DMs the draft to the *owner's* chat with this bot, with
inline buttons: [✓ Send] [✎ Edit] [✕ Discard].
3. Owner taps:
Send → bot sends the draft to the customer chat using
``business_connection_id``. Owner gets a "✓ Sent" confirmation.
Edit → bot replies "send me the text you want delivered" and
captures the owner's next text DM as the outgoing reply.
Discard → draft dropped, nothing goes to the customer.
The owner is always in control: ``can_reply`` from Telegram is required
for the Send button to appear, ``/biz pause`` globally suspends drafting,
and per-chat pauses live in ``telegram_business_connections.paused_chats``.
State lives in two SQLite tables (``telegram_business_connections`` and
``telegram_business_drafts``) created lazily on first use by
``SessionDB.apply_telegram_business_migration()``. See ``hermes_state.py``
for the schema.
This module is glued onto ``gateway/platforms/telegram.py`` via three
PTB update handlers plus an inline-button callback prefix (``bd:`` for
"business draft"). The host adapter owns all Telegram I/O — this module
just calls back into it for sending.
"""
from __future__ import annotations
import asyncio
import logging
import time
from typing import Any, Awaitable, Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
# Callback-data prefix for the inline buttons. Keep it short — Telegram
# caps callback_data at 64 bytes. Format: "bd:<choice>:<draft_id>".
CALLBACK_PREFIX = "bd:"
# Choice values rendered on the inline keyboard.
CHOICE_SEND = "send"
CHOICE_EDIT = "edit"
CHOICE_DISCARD = "discard"
# Onboarding text the bot DMs the owner the first time a BusinessConnection
# arrives. Plain text (Telegram MarkdownV2 escaping is fragile, and this
# message is content-stable enough to hand-author).
ONBOARDING_MESSAGE = (
"🤝 You've connected me as your Telegram Business assistant.\n\n"
"Here's how it works:\n"
"• When someone messages you in a chat I have access to, I'll draft "
"a reply and send it to you here.\n"
"• You'll see [✓ Send] [✎ Edit] [✕ Discard] buttons. Nothing goes "
"to your contact until you tap Send.\n"
"• Tap Edit to override the draft with your own wording.\n"
"• Tap Discard to drop the draft entirely.\n\n"
"Useful commands:\n"
" /biz — show status and active connections\n"
" /biz pause — pause drafting (you still see the messages)\n"
" /biz resume — resume drafting\n"
" /biz off — disable drafting for one chat (reply to that customer's draft)\n\n"
"I never auto-send. Every reply is yours to approve."
)
# ---------------------------------------------------------------------------
# Type aliases for the adapter callbacks the manager depends on.
# Defined as Callables so this module stays import-light and decoupled
# from the rest of the telegram.py module.
# ---------------------------------------------------------------------------
# Generates a draft reply text for a given customer message.
# Returns the draft text, or raises on failure.
DraftGenerator = Callable[[str, str], Awaitable[str]] # (customer_text, customer_chat_id) -> draft
# Sends a message to a chat. For owner DMs, business_connection_id is None.
# For customer chats reached via business mode, business_connection_id is set.
SendMessage = Callable[..., Awaitable[Any]] # delegates to bot.send_message kwargs
# ---------------------------------------------------------------------------
# Manager
# ---------------------------------------------------------------------------
class BusinessModeManager:
"""Owns Business Mode state and orchestrates drafting + approval.
One instance per Telegram adapter. The adapter wires up handlers in
``connect()`` and calls into this manager for every business update.
"""
def __init__(
self,
*,
session_db: Any,
send_message: SendMessage,
draft_generator: DraftGenerator,
debounce_seconds: float = 8.0,
draft_ttl_hours: float = 24.0,
max_customer_text_chars: int = 4000,
) -> None:
self._db = session_db
self._send = send_message
self._draft_generator = draft_generator
self._debounce_seconds = max(0.0, float(debounce_seconds))
self._draft_ttl_seconds = max(60.0, float(draft_ttl_hours) * 3600.0)
self._max_customer_text_chars = int(max_customer_text_chars)
# In-flight debounce tasks, keyed by (connection_id, customer_chat_id).
# New customer messages reset the timer so a typing burst yields one draft.
self._debounce_tasks: Dict[str, asyncio.Task] = {}
self._debounce_buffers: Dict[str, Dict[str, Any]] = {}
# Owner DMs that are in "next message = edited reply" mode.
# Keyed by owner_chat_id → draft_id awaiting the override text.
self._edit_capture: Dict[str, int] = {}
# ------------------------------------------------------------------
# BusinessConnection updates (established / edited / ended).
# ------------------------------------------------------------------
async def handle_connection_update(self, business_connection: Any) -> None:
"""Persist or remove a connection row.
``business_connection`` is a PTB ``BusinessConnection`` object.
On Telegram Bot API 9.0+, ``rights`` is an object with ``can_reply``
among other flags; on older versions a ``can_reply`` bool sits on
the connection itself. We tolerate both shapes.
"""
conn_id = getattr(business_connection, "id", None)
user = getattr(business_connection, "user", None)
owner_user_id = getattr(user, "id", None)
owner_chat_id = getattr(business_connection, "user_chat_id", None)
is_enabled = bool(getattr(business_connection, "is_enabled", False))
if conn_id is None or owner_user_id is None or owner_chat_id is None:
logger.warning(
"BusinessConnection update missing required fields: id=%s, user.id=%s, user_chat_id=%s",
conn_id, owner_user_id, owner_chat_id,
)
return
# ``can_reply`` location moved in API 9.0 from connection root to
# ``rights.can_reply``. Probe rights first.
can_reply = False
rights = getattr(business_connection, "rights", None)
if rights is not None:
can_reply = bool(getattr(rights, "can_reply", False))
else:
can_reply = bool(getattr(business_connection, "can_reply", False))
previous = self._db.get_telegram_business_connection(str(conn_id))
self._db.upsert_telegram_business_connection(
connection_id=str(conn_id),
owner_user_id=str(owner_user_id),
owner_chat_id=str(owner_chat_id),
can_reply=can_reply,
is_enabled=is_enabled,
)
# First-time onboarding DM.
if previous is None and is_enabled:
try:
await self._send(
chat_id=int(owner_chat_id),
text=ONBOARDING_MESSAGE,
disable_notification=False,
)
except Exception as exc:
logger.warning(
"Failed to deliver business-mode onboarding DM to %s: %s",
owner_chat_id, exc,
)
# Connection ended → DM the owner a confirmation.
if previous is not None and previous.get("is_enabled") and not is_enabled:
try:
await self._send(
chat_id=int(owner_chat_id),
text="🔌 Business connection ended. I won't draft any more replies.",
disable_notification=True,
)
except Exception as exc:
logger.debug("Disconnection notice send failed (%s): %s", owner_chat_id, exc)
# can_reply changed → tell the owner.
if previous is not None and previous.get("is_enabled") and is_enabled:
if previous.get("can_reply") != can_reply:
msg = (
"✅ Send-on-your-behalf permission enabled — Send buttons are now live."
if can_reply else
"⚠️ Send-on-your-behalf permission is OFF in your Telegram Business "
"settings. I'll still draft replies but you'll need to copy and "
"send them yourself."
)
try:
await self._send(
chat_id=int(owner_chat_id), text=msg, disable_notification=True,
)
except Exception:
pass
# ------------------------------------------------------------------
# business_message updates (customer talks to owner).
# ------------------------------------------------------------------
async def handle_business_message(self, message: Any) -> None:
"""Schedule a debounced draft for an incoming customer message.
``message`` is a PTB ``Message`` from a business_message update.
It has ``business_connection_id``, ``chat`` (the customer chat),
``from_user`` (the customer), and ``text``/``caption``.
"""
conn_id = getattr(message, "business_connection_id", None)
if not conn_id:
return
conn = self._db.get_telegram_business_connection(str(conn_id))
if not conn:
logger.debug("Received business_message for unknown connection %s", conn_id)
return
if not conn.get("is_enabled"):
return
if not conn.get("auto_draft", True):
logger.debug("Business connection %s has auto_draft=False; skipping draft", conn_id)
return
chat = getattr(message, "chat", None)
customer_chat_id = getattr(chat, "id", None)
if customer_chat_id is None:
return
if str(customer_chat_id) in (conn.get("paused_chats") or []):
logger.debug("Customer chat %s paused; skipping draft", customer_chat_id)
return
# Pull text. Captions on media count too — they're often the only
# part the customer typed.
text = (getattr(message, "text", None) or getattr(message, "caption", None) or "").strip()
if not text:
# Pure media without caption — out of scope for v1 (no vision pipeline).
return
if len(text) > self._max_customer_text_chars:
text = text[: self._max_customer_text_chars]
key = f"{conn_id}:{customer_chat_id}"
# Cancel any in-flight debounce for this (connection, customer chat) so
# rapid bursts coalesce into a single draft against the latest text.
prior = self._debounce_tasks.pop(key, None)
if prior is not None and not prior.done():
prior.cancel()
# Buffer the latest message info — the timer will read this at fire time.
self._debounce_buffers[key] = {
"conn_id": str(conn_id),
"owner_chat_id": str(conn.get("owner_chat_id")),
"customer_chat_id": str(customer_chat_id),
"customer_msg_id": str(getattr(message, "message_id", "") or ""),
"customer_text": text,
"customer_name": _customer_display_name(message),
}
# Schedule the actual draft. If debounce is zero (test mode), fire
# immediately so the test doesn't have to wait.
if self._debounce_seconds <= 0:
await self._run_draft(key)
else:
self._debounce_tasks[key] = asyncio.create_task(
self._debounce_then_draft(key)
)
async def _debounce_then_draft(self, key: str) -> None:
try:
await asyncio.sleep(self._debounce_seconds)
except asyncio.CancelledError:
return
try:
await self._run_draft(key)
finally:
self._debounce_tasks.pop(key, None)
async def _run_draft(self, key: str) -> None:
buf = self._debounce_buffers.pop(key, None)
if not buf:
return
# Re-check the connection state — owner may have hit /biz pause
# during the debounce window.
conn = self._db.get_telegram_business_connection(buf["conn_id"])
if not conn or not conn.get("is_enabled") or not conn.get("auto_draft", True):
return
if buf["customer_chat_id"] in (conn.get("paused_chats") or []):
return
try:
draft_text = await self._draft_generator(
buf["customer_text"], buf["customer_chat_id"]
)
except Exception as exc:
logger.exception("Business-mode draft generator failed: %s", exc)
try:
await self._send(
chat_id=int(buf["owner_chat_id"]),
text=(
"⚠️ I couldn't draft a reply to "
f"{buf['customer_name']}: {exc}.\n\n"
f"Their message was:\n\n{buf['customer_text']}"
),
disable_notification=True,
)
except Exception:
pass
return
draft_text = (draft_text or "").strip()
if not draft_text:
logger.debug("Empty draft for %s — skipping", key)
return
draft_id = self._db.create_telegram_business_draft(
connection_id=buf["conn_id"],
owner_chat_id=buf["owner_chat_id"],
customer_chat_id=buf["customer_chat_id"],
customer_msg_id=buf["customer_msg_id"] or None,
customer_text=buf["customer_text"],
draft_text=draft_text,
ttl_seconds=self._draft_ttl_seconds,
)
owner_message = self._render_draft_owner_message(
customer_name=buf["customer_name"],
customer_text=buf["customer_text"],
draft_text=draft_text,
)
keyboard = self._build_draft_keyboard(draft_id, can_reply=bool(conn.get("can_reply")))
try:
sent = await self._send(
chat_id=int(buf["owner_chat_id"]),
text=owner_message,
reply_markup=keyboard,
disable_notification=False,
)
except Exception as exc:
logger.warning("Failed to deliver business-mode draft to owner %s: %s",
buf["owner_chat_id"], exc)
# Mark the draft expired so we don't leave an unactionable row.
self._db.resolve_telegram_business_draft(draft_id, status="expired")
return
owner_msg_id = getattr(sent, "message_id", None)
if owner_msg_id is not None:
self._db.set_telegram_business_draft_owner_message(draft_id, str(owner_msg_id))
# ------------------------------------------------------------------
# Inline-button callback dispatch (bd:choice:draft_id)
# ------------------------------------------------------------------
async def handle_callback(
self,
*,
data: str,
caller_user_id: Optional[str],
answer: Callable[..., Awaitable[Any]],
edit_message_text: Callable[..., Awaitable[Any]],
) -> bool:
"""Handle a callback_query whose data starts with ``bd:``.
Returns True if the callback was dispatched (caller should stop
further handling), False if it wasn't ours.
"""
if not data.startswith(CALLBACK_PREFIX):
return False
parts = data.split(":", 2)
if len(parts) != 3:
await answer(text="Invalid draft action.")
return True
choice = parts[1]
try:
draft_id = int(parts[2])
except ValueError:
await answer(text="Invalid draft action.")
return True
draft = self._db.get_telegram_business_draft(draft_id)
if draft is None:
await answer(text="That draft has expired.")
return True
if draft.get("status") != "pending":
await answer(text="That draft has already been resolved.")
return True
# Only the owner of this connection may act on the buttons.
conn = self._db.get_telegram_business_connection(draft["connection_id"])
if not conn:
await answer(text="Connection no longer exists.")
return True
if caller_user_id and str(caller_user_id) != str(conn.get("owner_user_id")):
await answer(text="⛔ Only the connected account owner can use these buttons.")
return True
if choice == CHOICE_DISCARD:
self._db.resolve_telegram_business_draft(draft_id, status="discarded")
await answer(text="✕ Discarded")
try:
await edit_message_text(
text=self._render_resolved_message(draft, status="discarded"),
reply_markup=None,
)
except Exception:
pass
return True
if choice == CHOICE_EDIT:
self._edit_capture[str(conn["owner_chat_id"])] = draft_id
await answer(text="✎ Send me the text to deliver")
try:
await edit_message_text(
text=(
self._render_resolved_message(draft, status="awaiting_edit")
+ "\n\n✎ Reply to this DM with the text you want delivered."
),
reply_markup=None,
)
except Exception:
pass
return True
if choice == CHOICE_SEND:
if not conn.get("can_reply"):
await answer(
text=(
"⚠️ Send-on-your-behalf is OFF — enable it in Telegram → "
"Business → Chatbots, then try again."
)
)
return True
try:
await self._send(
chat_id=int(draft["customer_chat_id"]),
text=draft["draft_text"],
business_connection_id=draft["connection_id"],
)
except Exception as exc:
logger.warning("Business send failed for draft %s: %s", draft_id, exc)
await answer(text=f"⚠️ Send failed: {exc}")
return True
self._db.resolve_telegram_business_draft(
draft_id, status="sent", final_sent_text=draft["draft_text"],
)
await answer(text="✓ Sent")
try:
await edit_message_text(
text=self._render_resolved_message(draft, status="sent"),
reply_markup=None,
)
except Exception:
pass
return True
await answer(text="Unknown action.")
return True
# ------------------------------------------------------------------
# Owner-side "next message after Edit = the actual reply text" capture.
# ------------------------------------------------------------------
async def maybe_handle_edit_capture(
self,
*,
owner_chat_id: str,
text: str,
) -> bool:
"""If the owner just tapped Edit, treat their next DM as the override.
Returns True if the message was consumed by the edit-capture flow
(so the caller shouldn't dispatch it to the normal command path).
"""
draft_id = self._edit_capture.pop(str(owner_chat_id), None)
if draft_id is None:
return False
override = (text or "").strip()
if not override:
# Empty edit attempt → restore capture and let the user retry.
self._edit_capture[str(owner_chat_id)] = draft_id
try:
await self._send(
chat_id=int(owner_chat_id),
text="✎ Edit cancelled (empty text). Tap Edit again if you want to try.",
disable_notification=True,
)
except Exception:
pass
return True
draft = self._db.get_telegram_business_draft(draft_id)
if not draft or draft.get("status") not in {"pending", "awaiting_edit"}:
try:
await self._send(
chat_id=int(owner_chat_id),
text="That draft has expired or was already resolved.",
disable_notification=True,
)
except Exception:
pass
return True
conn = self._db.get_telegram_business_connection(draft["connection_id"])
if not conn:
return True
if not conn.get("can_reply"):
try:
await self._send(
chat_id=int(owner_chat_id),
text=(
"⚠️ Send-on-your-behalf is OFF in Telegram → Business → "
"Chatbots. I can't deliver this — copy the text and send it "
"manually."
),
disable_notification=False,
)
except Exception:
pass
return True
try:
await self._send(
chat_id=int(draft["customer_chat_id"]),
text=override,
business_connection_id=draft["connection_id"],
)
except Exception as exc:
logger.warning("Business edit-send failed for draft %s: %s", draft_id, exc)
try:
await self._send(
chat_id=int(owner_chat_id),
text=f"⚠️ Send failed: {exc}",
disable_notification=False,
)
except Exception:
pass
return True
self._db.resolve_telegram_business_draft(
draft_id, status="edited", final_sent_text=override,
)
try:
await self._send(
chat_id=int(owner_chat_id),
text=f"✓ Sent (edited):\n\n{override}",
disable_notification=True,
)
except Exception:
pass
return True
# ------------------------------------------------------------------
# /biz slash command (owner-only).
# ------------------------------------------------------------------
async def handle_biz_command(
self,
*,
owner_user_id: str,
owner_chat_id: str,
args: List[str],
) -> str:
"""Process /biz subcommands. Returns text the adapter should send back.
Supported:
/biz — status dashboard
/biz pause — set auto_draft=False on all this user's connections
/biz resume — set auto_draft=True on all this user's connections
/biz off <chat_id> — add a customer chat to the paused list
/biz on <chat_id> — remove a customer chat from the paused list
"""
connections = self._db.list_telegram_business_connections(
owner_user_id=str(owner_user_id), enabled_only=False,
)
active = [c for c in connections if c.get("is_enabled")]
if not args:
return self._render_status(connections=connections, owner_chat_id=owner_chat_id)
sub = args[0].lower()
if sub in {"pause", "resume"}:
target = (sub == "resume")
if not active:
return "You don't have any active business connections."
for conn in active:
self._db.set_telegram_business_auto_draft(
conn["connection_id"], auto_draft=target,
)
verb = "▶ Resumed drafting." if target else "⏸ Paused drafting."
return f"{verb} ({len(active)} connection{'s' if len(active) != 1 else ''})"
if sub in {"off", "on"} and len(args) >= 2:
chat_arg = args[1].strip()
if not active:
return "You don't have any active business connections."
updated = 0
for conn in active:
paused = list(conn.get("paused_chats") or [])
if sub == "off" and chat_arg not in paused:
paused.append(chat_arg)
self._db.set_telegram_business_paused_chats(
conn["connection_id"], paused,
)
updated += 1
elif sub == "on" and chat_arg in paused:
paused.remove(chat_arg)
self._db.set_telegram_business_paused_chats(
conn["connection_id"], paused,
)
updated += 1
if updated:
verb = "muted in" if sub == "off" else "re-enabled for"
return f"✓ Chat {chat_arg} {verb} {updated} connection(s)."
return f"No change — {chat_arg} was already in that state."
return (
"Usage:\n"
" /biz — show status\n"
" /biz pause — pause drafting (still see messages)\n"
" /biz resume — resume drafting\n"
" /biz off <id> — mute drafting for one customer chat\n"
" /biz on <id> — re-enable drafting for one customer chat"
)
# ------------------------------------------------------------------
# Rendering helpers
# ------------------------------------------------------------------
@staticmethod
def _render_draft_owner_message(
*, customer_name: str, customer_text: str, draft_text: str
) -> str:
"""Build the plain-text owner-DM body that carries one draft."""
return (
f"💬 {customer_name} wrote:\n"
f"{_quote_block(customer_text)}\n\n"
f"📝 Suggested reply:\n"
f"{draft_text}"
)
@staticmethod
def _render_resolved_message(draft: Dict[str, Any], *, status: str) -> str:
"""Re-render the owner DM after a button decision."""
header = {
"sent": "✓ Sent",
"edited": "✓ Sent (edited)",
"discarded": "✕ Discarded",
"expired": "⏰ Expired",
"awaiting_edit": "✎ Edit",
}.get(status, status)
return (
f"{header}\n\n"
f"💬 Customer wrote:\n{_quote_block(draft.get('customer_text', ''))}\n\n"
f"📝 Draft was:\n{draft.get('draft_text', '')}"
)
@staticmethod
def _render_status(*, connections: List[Dict[str, Any]], owner_chat_id: str) -> str:
if not connections:
return (
"You haven't connected this bot to any Telegram Business account yet.\n\n"
"Open Telegram → Settings → Business → Chatbots, paste my @username, "
"and pick which chats I can see."
)
lines = ["📋 Business Mode status\n"]
for c in connections:
state = "🟢 active" if c.get("is_enabled") else "⚪ ended"
auto = "drafting ON" if c.get("auto_draft") else "drafting PAUSED"
send_perm = "send ON" if c.get("can_reply") else "send OFF"
paused = c.get("paused_chats") or []
paused_note = f" — muted chats: {', '.join(paused)}" if paused else ""
lines.append(
f"{state} · {auto} · {send_perm}{paused_note}"
)
lines.append("")
lines.append(
"Commands: /biz pause · /biz resume · /biz off <chat_id> · /biz on <chat_id>"
)
return "\n".join(lines)
@staticmethod
def _build_draft_keyboard(draft_id: int, *, can_reply: bool):
"""Construct the inline keyboard for one draft.
Imported lazily so the module is importable when python-telegram-bot
is missing (matches the lazy-deps pattern used elsewhere in the
adapter).
"""
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
row = []
if can_reply:
row.append(InlineKeyboardButton(
"✓ Send", callback_data=f"{CALLBACK_PREFIX}{CHOICE_SEND}:{draft_id}",
))
row.append(InlineKeyboardButton(
"✎ Edit", callback_data=f"{CALLBACK_PREFIX}{CHOICE_EDIT}:{draft_id}",
))
row.append(InlineKeyboardButton(
"✕ Discard", callback_data=f"{CALLBACK_PREFIX}{CHOICE_DISCARD}:{draft_id}",
))
return InlineKeyboardMarkup([row])
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _customer_display_name(message: Any) -> str:
"""Best-effort human name for the customer who sent ``message``."""
user = getattr(message, "from_user", None)
if user is not None:
for attr in ("full_name", "first_name", "username"):
val = getattr(user, attr, None)
if val:
return str(val)
chat = getattr(message, "chat", None)
if chat is not None:
for attr in ("full_name", "title", "username"):
val = getattr(chat, attr, None)
if val:
return str(val)
return "Customer"
def _quote_block(text: str, *, max_len: int = 600) -> str:
"""Render a customer message as a quoted block for the owner DM."""
if not text:
return " (no text)"
if len(text) > max_len:
text = text[: max_len - 1].rstrip() + ""
return "\n".join(f" > {line}" for line in text.splitlines())

View File

@@ -67,6 +67,8 @@ COMMAND_REGISTRY: list[CommandDef] = [
aliases=("reset",), args_hint="[name]"),
CommandDef("topic", "Enable or inspect Telegram DM topic sessions", "Session",
gateway_only=True, args_hint="[off|help|session-id]"),
CommandDef("biz", "Manage Telegram Business Mode (Secretary Bots) drafting", "Session",
gateway_only=True, args_hint="[pause|resume|off <chat>|on <chat>]"),
CommandDef("clear", "Clear screen and start a new session", "Session",
cli_only=True),
CommandDef("redraw", "Force a full UI repaint (recovers from terminal drift)", "Session",

View File

@@ -1321,6 +1321,19 @@ DEFAULT_CONFIG = {
"reactions": False, # Add 👀/✅/❌ reactions to messages during processing
"channel_prompts": {}, # Per-chat/topic ephemeral system prompts (topics inherit from parent group)
"allowed_chats": "", # If set, bot ONLY responds in these group/supergroup chat IDs (whitelist)
# Business Mode (Secretary Bots) — owner-approved drafting for
# Telegram Business accounts. When enabled, customer messages
# routed via Business Mode are turned into drafts delivered to
# the owner with Send / Edit / Discard buttons. Off by default;
# also requires Business Mode toggled in @BotFather.
# See website/docs/user-guide/messaging/telegram.md#business-mode.
"business_mode": {
"enabled": False,
"debounce_seconds": 8, # Coalesce typing-burst customer messages into one draft
"draft_ttl_hours": 24, # Pending drafts expire after this many hours
"max_customer_text_chars": 4000, # Truncate very long customer messages before drafting
"owner_persona": "", # Optional override; empty uses a sensible default persona
},
},
# Mattermost platform settings (gateway mode)

View File

@@ -2484,6 +2484,364 @@ class SessionDB:
)
self._execute_write(_do)
# ------------------------------------------------------------------
# Telegram Business Mode (Secretary Bots)
# ------------------------------------------------------------------
# Business Mode lets the owner of a Telegram Business account connect
# this bot to their personal account so it can observe customer
# conversations and draft replies for owner approval. Two tables:
#
# telegram_business_connections — one row per account that linked
# this bot via BotFather Business Mode. Updated by
# BusinessConnection updates (established / edited / ended).
#
# telegram_business_drafts — pending owner-approval drafts. A
# draft is created when a customer messages a connected chat
# and the manager produces a candidate reply; it's deleted when
# the owner taps Send / Edit / Discard or when it expires.
#
# Like the topic-mode tables, this migration is lazy: it runs only
# when business mode is actually enabled, keeping the core schema
# clean for users who never use the feature.
# ------------------------------------------------------------------
def apply_telegram_business_migration(self) -> None:
"""Create Telegram Business Mode tables on first use.
Idempotent — safe to call from every business handler.
"""
def _do(conn):
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS telegram_business_connections (
connection_id TEXT PRIMARY KEY,
owner_user_id TEXT NOT NULL,
owner_chat_id TEXT NOT NULL,
can_reply INTEGER NOT NULL DEFAULT 0,
is_enabled INTEGER NOT NULL DEFAULT 1,
auto_draft INTEGER NOT NULL DEFAULT 1,
paused_chats TEXT NOT NULL DEFAULT '[]',
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tg_biz_conn_owner
ON telegram_business_connections(owner_user_id);
CREATE TABLE IF NOT EXISTS telegram_business_drafts (
draft_id INTEGER PRIMARY KEY AUTOINCREMENT,
connection_id TEXT NOT NULL,
owner_chat_id TEXT NOT NULL,
customer_chat_id TEXT NOT NULL,
customer_msg_id TEXT,
customer_text TEXT NOT NULL,
draft_text TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
owner_message_id TEXT,
final_sent_text TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
expires_at REAL NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tg_biz_drafts_conn_customer
ON telegram_business_drafts(connection_id, customer_chat_id, status);
CREATE INDEX IF NOT EXISTS idx_tg_biz_drafts_owner_msg
ON telegram_business_drafts(owner_chat_id, owner_message_id)
WHERE owner_message_id IS NOT NULL;
"""
)
conn.execute(
"INSERT INTO state_meta (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("telegram_business_schema_version", "1"),
)
self._execute_write(_do)
def upsert_telegram_business_connection(
self,
*,
connection_id: str,
owner_user_id: str,
owner_chat_id: str,
can_reply: bool,
is_enabled: bool,
) -> None:
"""Insert or update a business connection row.
Called from BusinessConnection update handlers. Preserves
``auto_draft`` and ``paused_chats`` across updates so the owner's
preferences survive Telegram re-issuing the connection.
"""
self.apply_telegram_business_migration()
now = time.time()
def _do(conn):
conn.execute(
"""
INSERT INTO telegram_business_connections (
connection_id, owner_user_id, owner_chat_id,
can_reply, is_enabled, auto_draft, paused_chats,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 1, '[]', ?, ?)
ON CONFLICT(connection_id) DO UPDATE SET
owner_user_id = excluded.owner_user_id,
owner_chat_id = excluded.owner_chat_id,
can_reply = excluded.can_reply,
is_enabled = excluded.is_enabled,
updated_at = excluded.updated_at
""",
(
str(connection_id),
str(owner_user_id),
str(owner_chat_id),
1 if can_reply else 0,
1 if is_enabled else 0,
now,
now,
),
)
self._execute_write(_do)
def get_telegram_business_connection(
self, connection_id: str
) -> Optional[Dict[str, Any]]:
"""Return the connection row as a dict, or None."""
with self._lock:
try:
row = self._conn.execute(
"SELECT * FROM telegram_business_connections WHERE connection_id = ?",
(str(connection_id),),
).fetchone()
except sqlite3.OperationalError:
return None
if row is None:
return None
d = dict(row) if isinstance(row, sqlite3.Row) else dict(zip(
["connection_id", "owner_user_id", "owner_chat_id", "can_reply",
"is_enabled", "auto_draft", "paused_chats", "created_at",
"updated_at"],
row,
))
try:
d["paused_chats"] = json.loads(d.get("paused_chats") or "[]")
except (TypeError, ValueError):
d["paused_chats"] = []
d["can_reply"] = bool(d.get("can_reply"))
d["is_enabled"] = bool(d.get("is_enabled"))
d["auto_draft"] = bool(d.get("auto_draft", 1))
return d
def list_telegram_business_connections(
self, *, owner_user_id: Optional[str] = None, enabled_only: bool = True
) -> List[Dict[str, Any]]:
"""List connections, optionally filtered by owner_user_id."""
sql = "SELECT * FROM telegram_business_connections"
clauses: List[str] = []
params: List[Any] = []
if owner_user_id is not None:
clauses.append("owner_user_id = ?")
params.append(str(owner_user_id))
if enabled_only:
clauses.append("is_enabled = 1")
if clauses:
sql += " WHERE " + " AND ".join(clauses)
sql += " ORDER BY updated_at DESC"
with self._lock:
try:
rows = self._conn.execute(sql, params).fetchall()
except sqlite3.OperationalError:
return []
out: List[Dict[str, Any]] = []
for row in rows:
d = dict(row) if isinstance(row, sqlite3.Row) else dict(zip(
["connection_id", "owner_user_id", "owner_chat_id", "can_reply",
"is_enabled", "auto_draft", "paused_chats", "created_at",
"updated_at"],
row,
))
try:
d["paused_chats"] = json.loads(d.get("paused_chats") or "[]")
except (TypeError, ValueError):
d["paused_chats"] = []
d["can_reply"] = bool(d.get("can_reply"))
d["is_enabled"] = bool(d.get("is_enabled"))
d["auto_draft"] = bool(d.get("auto_draft", 1))
out.append(d)
return out
def set_telegram_business_auto_draft(
self, connection_id: str, *, auto_draft: bool
) -> None:
"""Toggle auto-drafting globally for one connection (the /biz pause switch)."""
self.apply_telegram_business_migration()
def _do(conn):
conn.execute(
"UPDATE telegram_business_connections SET auto_draft = ?, "
"updated_at = ? WHERE connection_id = ?",
(1 if auto_draft else 0, time.time(), str(connection_id)),
)
self._execute_write(_do)
def set_telegram_business_paused_chats(
self, connection_id: str, paused_chats: List[str]
) -> None:
"""Replace the per-chat pause list for one connection."""
self.apply_telegram_business_migration()
payload = json.dumps([str(c) for c in (paused_chats or [])])
def _do(conn):
conn.execute(
"UPDATE telegram_business_connections SET paused_chats = ?, "
"updated_at = ? WHERE connection_id = ?",
(payload, time.time(), str(connection_id)),
)
self._execute_write(_do)
def create_telegram_business_draft(
self,
*,
connection_id: str,
owner_chat_id: str,
customer_chat_id: str,
customer_msg_id: Optional[str],
customer_text: str,
draft_text: str,
ttl_seconds: float = 86400.0,
) -> int:
"""Insert a pending draft row and return its draft_id.
Any prior pending drafts for the same (connection, customer_chat)
are marked superseded so only one Send button is ever live per
conversation.
"""
self.apply_telegram_business_migration()
now = time.time()
def _do(conn):
conn.execute(
"UPDATE telegram_business_drafts SET status = 'superseded', "
"updated_at = ? WHERE connection_id = ? AND customer_chat_id = ? "
"AND status = 'pending'",
(now, str(connection_id), str(customer_chat_id)),
)
cur = conn.execute(
"""
INSERT INTO telegram_business_drafts (
connection_id, owner_chat_id, customer_chat_id,
customer_msg_id, customer_text, draft_text,
status, created_at, updated_at, expires_at
) VALUES (?, ?, ?, ?, ?, ?, 'pending', ?, ?, ?)
""",
(
str(connection_id),
str(owner_chat_id),
str(customer_chat_id),
str(customer_msg_id) if customer_msg_id is not None else None,
customer_text,
draft_text,
now, now, now + max(60.0, float(ttl_seconds)),
),
)
return cur.lastrowid
return self._execute_write(_do)
def get_telegram_business_draft(self, draft_id: int) -> Optional[Dict[str, Any]]:
"""Fetch one draft row by id."""
with self._lock:
try:
row = self._conn.execute(
"SELECT * FROM telegram_business_drafts WHERE draft_id = ?",
(int(draft_id),),
).fetchone()
except sqlite3.OperationalError:
return None
if row is None:
return None
return dict(row) if isinstance(row, sqlite3.Row) else None
def set_telegram_business_draft_owner_message(
self, draft_id: int, owner_message_id: str
) -> None:
"""Record the Telegram message_id of the owner-DM that carries the draft."""
self.apply_telegram_business_migration()
def _do(conn):
conn.execute(
"UPDATE telegram_business_drafts SET owner_message_id = ?, "
"updated_at = ? WHERE draft_id = ?",
(str(owner_message_id), time.time(), int(draft_id)),
)
self._execute_write(_do)
def resolve_telegram_business_draft(
self,
draft_id: int,
*,
status: str,
final_sent_text: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Atomically mark a draft sent / edited / discarded and return its prior row.
Returns None if the draft no longer exists or was already resolved
(so callbacks for stale buttons no-op safely).
"""
self.apply_telegram_business_migration()
if status not in {"sent", "edited", "discarded", "expired"}:
raise ValueError(f"invalid business draft status: {status}")
now = time.time()
def _do(conn):
row = conn.execute(
"SELECT * FROM telegram_business_drafts WHERE draft_id = ? AND status = 'pending'",
(int(draft_id),),
).fetchone()
if row is None:
return None
conn.execute(
"UPDATE telegram_business_drafts SET status = ?, "
"final_sent_text = COALESCE(?, final_sent_text), "
"updated_at = ? WHERE draft_id = ?",
(status, final_sent_text, now, int(draft_id)),
)
return dict(row) if isinstance(row, sqlite3.Row) else None
return self._execute_write(_do)
def get_pending_telegram_business_drafts_for_owner(
self, owner_chat_id: str
) -> List[Dict[str, Any]]:
"""List all pending drafts addressed to one owner DM (for /biz status)."""
with self._lock:
try:
rows = self._conn.execute(
"SELECT * FROM telegram_business_drafts WHERE owner_chat_id = ? "
"AND status = 'pending' ORDER BY created_at ASC",
(str(owner_chat_id),),
).fetchall()
except sqlite3.OperationalError:
return []
return [dict(r) if isinstance(r, sqlite3.Row) else r for r in rows]
def expire_telegram_business_drafts(self, *, now: Optional[float] = None) -> int:
"""Mark drafts past their expiry as expired. Returns count affected."""
self.apply_telegram_business_migration()
cutoff = now if now is not None else time.time()
def _do(conn):
cur = conn.execute(
"UPDATE telegram_business_drafts SET status = 'expired', "
"updated_at = ? WHERE status = 'pending' AND expires_at < ?",
(cutoff, cutoff),
)
return cur.rowcount
return self._execute_write(_do)
def enable_telegram_topic_mode(
self,
*,

View File

@@ -0,0 +1,683 @@
"""Tests for Telegram Business Mode (Secretary Bots).
Covers two layers:
- ``hermes_state.SessionDB`` business-connection + draft CRUD
- ``gateway.platforms.telegram_business.BusinessModeManager`` orchestration
The manager tests use lightweight async stubs for the adapter callbacks
(``send_message``, ``draft_generator``) so the agent loop / network are
never touched — pure unit-level coverage of the state machine.
"""
from __future__ import annotations
import asyncio
import time
from types import SimpleNamespace
from typing import Any, Dict, List
import pytest
from hermes_state import SessionDB
@pytest.fixture()
def db(tmp_path):
"""Fresh SessionDB with the business migration already applied."""
db_path = tmp_path / "biz_state.db"
sdb = SessionDB(db_path=db_path)
sdb.apply_telegram_business_migration()
yield sdb
sdb.close()
# =========================================================================
# Schema / CRUD
# =========================================================================
class TestBusinessConnectionPersistence:
def test_upsert_and_get(self, db):
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=True, is_enabled=True,
)
row = db.get_telegram_business_connection("c1")
assert row is not None
assert row["connection_id"] == "c1"
assert row["owner_user_id"] == "42"
assert row["owner_chat_id"] == "100"
assert row["can_reply"] is True
assert row["is_enabled"] is True
assert row["auto_draft"] is True
assert row["paused_chats"] == []
def test_upsert_preserves_auto_draft_and_paused(self, db):
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=True, is_enabled=True,
)
db.set_telegram_business_auto_draft("c1", auto_draft=False)
db.set_telegram_business_paused_chats("c1", ["200", "300"])
# Simulate Telegram re-sending the BusinessConnection
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=False, is_enabled=True,
)
row = db.get_telegram_business_connection("c1")
# can_reply updated, but owner preferences kept
assert row["can_reply"] is False
assert row["auto_draft"] is False
assert sorted(row["paused_chats"]) == ["200", "300"]
def test_list_enabled_filter(self, db):
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=True, is_enabled=True,
)
db.upsert_telegram_business_connection(
connection_id="c2", owner_user_id="42",
owner_chat_id="100", can_reply=False, is_enabled=False,
)
active = db.list_telegram_business_connections(
owner_user_id="42", enabled_only=True,
)
all_rows = db.list_telegram_business_connections(
owner_user_id="42", enabled_only=False,
)
assert len(active) == 1 and active[0]["connection_id"] == "c1"
assert len(all_rows) == 2
def test_get_returns_none_for_unknown(self, db):
assert db.get_telegram_business_connection("missing") is None
def test_migration_idempotent(self, db):
# Calling twice must not raise; subsequent CRUD still works.
db.apply_telegram_business_migration()
db.apply_telegram_business_migration()
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=True, is_enabled=True,
)
assert db.get_telegram_business_connection("c1") is not None
class TestBusinessDraftLifecycle:
@pytest.fixture(autouse=True)
def _conn(self, db):
db.upsert_telegram_business_connection(
connection_id="c1", owner_user_id="42",
owner_chat_id="100", can_reply=True, is_enabled=True,
)
def test_create_and_get(self, db):
did = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
row = db.get_telegram_business_draft(did)
assert row is not None
assert row["customer_text"] == "hi"
assert row["draft_text"] == "hello"
assert row["status"] == "pending"
assert row["owner_message_id"] is None
assert row["final_sent_text"] is None
assert row["expires_at"] > row["created_at"]
def test_new_draft_supersedes_prior_pending(self, db):
d1 = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
d2 = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m2", customer_text="hi again", draft_text="hello again",
)
assert d1 != d2
# Prior draft should be marked superseded.
prior = db.get_telegram_business_draft(d1)
new = db.get_telegram_business_draft(d2)
assert prior["status"] == "superseded"
assert new["status"] == "pending"
def test_supersede_only_affects_same_customer_chat(self, db):
d1 = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="A", draft_text="a",
)
d2 = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="999",
customer_msg_id="m2", customer_text="B", draft_text="b",
)
# Different customer chats — d1 should still be pending.
assert db.get_telegram_business_draft(d1)["status"] == "pending"
assert db.get_telegram_business_draft(d2)["status"] == "pending"
def test_resolve_sent_returns_prior_and_blocks_double_resolve(self, db):
did = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
first = db.resolve_telegram_business_draft(
did, status="sent", final_sent_text="hello",
)
assert first is not None
# Status now committed
row = db.get_telegram_business_draft(did)
assert row["status"] == "sent"
assert row["final_sent_text"] == "hello"
# Second resolution must be a no-op.
second = db.resolve_telegram_business_draft(did, status="discarded")
assert second is None
assert db.get_telegram_business_draft(did)["status"] == "sent"
def test_resolve_invalid_status_raises(self, db):
did = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
with pytest.raises(ValueError):
db.resolve_telegram_business_draft(did, status="bogus")
def test_owner_message_id_round_trip(self, db):
did = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
db.set_telegram_business_draft_owner_message(did, "owner_msg_42")
row = db.get_telegram_business_draft(did)
assert row["owner_message_id"] == "owner_msg_42"
def test_expire_only_affects_overdue_pending(self, db):
d_old = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m_old", customer_text="old", draft_text="old reply",
ttl_seconds=60.0,
)
# Different customer chat so it doesn't supersede d_old.
d_new = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="201",
customer_msg_id="m_new", customer_text="new", draft_text="new reply",
ttl_seconds=999_999.0,
)
# Run expiry just past d_old's expiry but well before d_new's.
d_old_row = db.get_telegram_business_draft(d_old)
affected = db.expire_telegram_business_drafts(now=d_old_row["expires_at"] + 1.0)
assert affected == 1
assert db.get_telegram_business_draft(d_old)["status"] == "expired"
assert db.get_telegram_business_draft(d_new)["status"] == "pending"
def test_get_pending_for_owner(self, db):
did = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="200",
customer_msg_id="m1", customer_text="hi", draft_text="hello",
)
# Resolved draft must not appear.
d2 = db.create_telegram_business_draft(
connection_id="c1", owner_chat_id="100", customer_chat_id="999",
customer_msg_id="m2", customer_text="hi2", draft_text="hello2",
)
db.resolve_telegram_business_draft(d2, status="discarded")
pending = db.get_pending_telegram_business_drafts_for_owner("100")
ids = {row["draft_id"] for row in pending}
assert did in ids
assert d2 not in ids
# =========================================================================
# Manager orchestration
# =========================================================================
def _fake_business_connection(
*, conn_id="conn1", owner_id=42, owner_chat=100,
is_enabled=True, can_reply=True,
) -> Any:
"""Build a duck-typed BusinessConnection good enough for the manager."""
rights = SimpleNamespace(can_reply=can_reply) if can_reply is not None else None
return SimpleNamespace(
id=conn_id,
user=SimpleNamespace(id=owner_id, full_name="Owner"),
user_chat_id=owner_chat,
is_enabled=is_enabled,
rights=rights,
)
def _fake_business_message(
*, conn_id="conn1", customer_chat_id=200, customer_id=999,
text="Hi there", msg_id=42,
) -> Any:
"""Build a duck-typed business_message-style PTB Message."""
return SimpleNamespace(
business_connection_id=conn_id,
chat=SimpleNamespace(id=customer_chat_id, type="private"),
from_user=SimpleNamespace(
id=customer_id, full_name="Customer Carol",
first_name="Carol", username="carol",
),
text=text,
caption=None,
message_id=msg_id,
)
class _SentRecorder:
"""Capture all send_message kwargs the manager produces."""
def __init__(self, *, fail: bool = False):
self.calls: List[Dict[str, Any]] = []
self.fail = fail
self._next_id = 1000
async def __call__(self, **kwargs):
self.calls.append(kwargs)
if self.fail:
raise RuntimeError("simulated send failure")
sent = SimpleNamespace(message_id=self._next_id)
self._next_id += 1
return sent
def _make_manager(db, *, draft_text="Draft reply!", draft_fails=False,
send_recorder=None, debounce=0.0):
from gateway.platforms.telegram_business import BusinessModeManager
async def _draft(customer_text: str, customer_chat_id: str) -> str:
if draft_fails:
raise RuntimeError("model boom")
return draft_text
sender = send_recorder if send_recorder is not None else _SentRecorder()
mgr = BusinessModeManager(
session_db=db,
send_message=sender,
draft_generator=_draft,
debounce_seconds=debounce,
)
return mgr, sender
class TestConnectionLifecycle:
@pytest.mark.asyncio
async def test_first_connection_persists_and_sends_onboarding(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
row = db.get_telegram_business_connection("conn1")
assert row is not None
assert row["is_enabled"] is True
assert row["can_reply"] is True
# Owner got the onboarding DM
assert len(sender.calls) == 1
assert sender.calls[0]["chat_id"] == 100
assert "Business assistant" in sender.calls[0]["text"]
@pytest.mark.asyncio
async def test_repeat_connection_does_not_resend_onboarding(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
await mgr.handle_connection_update(_fake_business_connection())
assert sender.calls == []
@pytest.mark.asyncio
async def test_disconnection_dms_owner(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
await mgr.handle_connection_update(
_fake_business_connection(is_enabled=False)
)
assert any("ended" in c["text"].lower() for c in sender.calls)
@pytest.mark.asyncio
async def test_can_reply_toggle_notifies_owner(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection(can_reply=True))
sender.calls.clear()
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
assert sender.calls, "owner should be told send permission flipped"
assert "OFF" in sender.calls[0]["text"]
class TestBusinessMessageDraftFlow:
@pytest.mark.asyncio
async def test_incoming_customer_message_drafts_and_dms_owner(self, db):
mgr, sender = _make_manager(db, draft_text="Hey, thanks for reaching out!")
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message(text="Hi!"))
assert sender.calls, "draft should have been DM'd to the owner"
owner_msg = sender.calls[0]
assert owner_msg["chat_id"] == 100
assert "Hey, thanks for reaching out!" in owner_msg["text"]
assert "Hi!" in owner_msg["text"]
# Inline keyboard rendered (under the test telegram mock,
# InlineKeyboardMarkup is a MagicMock — we just verify a keyboard
# was attached and that three callback_datas were produced).
assert owner_msg.get("reply_markup") is not None
# One draft row recorded
drafts = db.get_pending_telegram_business_drafts_for_owner("100")
assert len(drafts) == 1
assert drafts[0]["draft_text"] == "Hey, thanks for reaching out!"
@pytest.mark.asyncio
async def test_no_can_reply_drops_send_button(self, db, monkeypatch):
# Capture callback_data passed to InlineKeyboardButton to verify
# the Send button is omitted when can_reply is False. We wrap
# _build_draft_keyboard so we can inspect the choices it produced
# — the underlying telegram mock makes the resulting keyboard
# object opaque.
from gateway.platforms import telegram_business as biz_mod
original = biz_mod.BusinessModeManager._build_draft_keyboard
captured: List[str] = []
def _spy(draft_id, *, can_reply):
if can_reply:
captured.append("send")
captured.append("edit")
captured.append("discard")
return original(draft_id, can_reply=can_reply)
monkeypatch.setattr(
biz_mod.BusinessModeManager,
"_build_draft_keyboard",
staticmethod(_spy),
)
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message())
assert captured == ["edit", "discard"]
@pytest.mark.asyncio
async def test_unknown_connection_silently_ignored(self, db):
mgr, sender = _make_manager(db)
# No prior handle_connection_update — should silently skip.
await mgr.handle_business_message(_fake_business_message())
assert sender.calls == []
assert db.get_pending_telegram_business_drafts_for_owner("100") == []
@pytest.mark.asyncio
async def test_auto_draft_paused_skips_drafting(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
db.set_telegram_business_auto_draft("conn1", auto_draft=False)
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message())
assert sender.calls == []
assert db.get_pending_telegram_business_drafts_for_owner("100") == []
@pytest.mark.asyncio
async def test_paused_customer_chat_skipped(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
db.set_telegram_business_paused_chats("conn1", ["200"])
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message(customer_chat_id=200))
assert sender.calls == []
# Other chat still drafts.
await mgr.handle_business_message(_fake_business_message(customer_chat_id=300))
assert sender.calls
@pytest.mark.asyncio
async def test_empty_text_skipped(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message(text=""))
assert sender.calls == []
@pytest.mark.asyncio
async def test_draft_failure_reports_to_owner(self, db):
mgr, sender = _make_manager(db, draft_fails=True)
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
await mgr.handle_business_message(_fake_business_message(text="Hi"))
assert sender.calls
assert "couldn't draft" in sender.calls[0]["text"]
@pytest.mark.asyncio
async def test_debounce_coalesces_burst(self, db):
# Use a real debounce window and fire 3 messages in quick succession.
mgr, sender = _make_manager(db, debounce=0.05,
draft_text="single draft")
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
for i in range(3):
await mgr.handle_business_message(
_fake_business_message(text=f"part {i}", msg_id=i)
)
# Let the debounce fire.
await asyncio.sleep(0.2)
# Only one draft should have been generated and one owner DM sent.
owner_dms = [c for c in sender.calls if c.get("chat_id") == 100]
assert len(owner_dms) == 1
class TestCallbackDispatch:
@pytest.mark.asyncio
async def test_send_button_delivers_to_customer_chat(self, db):
mgr, sender = _make_manager(db, draft_text="hello there")
await mgr.handle_connection_update(_fake_business_connection())
await mgr.handle_business_message(_fake_business_message())
sender.calls.clear()
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
did = draft["draft_id"]
# Fake the inline-button click
answered: List[Dict[str, Any]] = []
edited: List[Dict[str, Any]] = []
async def _answer(**kw): answered.append(kw)
async def _edit(**kw): edited.append(kw)
dispatched = await mgr.handle_callback(
data=f"bd:send:{did}", caller_user_id="42",
answer=_answer, edit_message_text=_edit,
)
assert dispatched is True
# Sent to customer chat with business_connection_id
sends_to_customer = [
c for c in sender.calls if c.get("chat_id") == 200
]
assert sends_to_customer
assert sends_to_customer[0]["business_connection_id"] == "conn1"
assert sends_to_customer[0]["text"] == "hello there"
# Draft now sent
row = db.get_telegram_business_draft(did)
assert row["status"] == "sent"
assert row["final_sent_text"] == "hello there"
# Owner DM was edited to show resolution
assert edited and "Sent" in edited[0]["text"]
@pytest.mark.asyncio
async def test_discard_marks_status(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
await mgr.handle_business_message(_fake_business_message())
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
did = draft["draft_id"]
async def _answer(**kw): pass
async def _edit(**kw): pass
await mgr.handle_callback(
data=f"bd:discard:{did}", caller_user_id="42",
answer=_answer, edit_message_text=_edit,
)
assert db.get_telegram_business_draft(did)["status"] == "discarded"
@pytest.mark.asyncio
async def test_callback_for_unknown_draft_no_ops(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
sender.calls.clear()
answered: List[Dict[str, Any]] = []
async def _answer(**kw): answered.append(kw)
async def _edit(**kw): pass
await mgr.handle_callback(
data="bd:send:999999", caller_user_id="42",
answer=_answer, edit_message_text=_edit,
)
assert answered and "expired" in answered[0]["text"].lower()
# No customer-chat sends.
assert not any(c.get("chat_id") == 200 for c in sender.calls)
@pytest.mark.asyncio
async def test_callback_rejects_non_owner(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
await mgr.handle_business_message(_fake_business_message())
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
did = draft["draft_id"]
answered: List[Dict[str, Any]] = []
async def _answer(**kw): answered.append(kw)
async def _edit(**kw): pass
# Different user — should be rejected.
await mgr.handle_callback(
data=f"bd:send:{did}", caller_user_id="9999",
answer=_answer, edit_message_text=_edit,
)
assert answered and "Only the connected account owner" in answered[0]["text"]
assert db.get_telegram_business_draft(did)["status"] == "pending"
@pytest.mark.asyncio
async def test_send_blocked_when_can_reply_false(self, db):
mgr, sender = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection(can_reply=False))
await mgr.handle_business_message(_fake_business_message())
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
did = draft["draft_id"]
answered: List[Dict[str, Any]] = []
async def _answer(**kw): answered.append(kw)
async def _edit(**kw): pass
await mgr.handle_callback(
data=f"bd:send:{did}", caller_user_id="42",
answer=_answer, edit_message_text=_edit,
)
# Send-on-your-behalf is OFF → reject with explanation, no customer send.
assert any("send-on-your-behalf is off" in (a.get("text") or "").lower()
for a in answered)
assert not any(c.get("chat_id") == 200 for c in sender.calls)
assert db.get_telegram_business_draft(did)["status"] == "pending"
@pytest.mark.asyncio
async def test_non_bd_callback_returns_false(self, db):
mgr, _ = _make_manager(db)
async def _noop(**kw):
pass
dispatched = await mgr.handle_callback(
data="ea:once:1", caller_user_id="42",
answer=_noop, edit_message_text=_noop,
)
assert dispatched is False
class TestEditCapture:
@pytest.mark.asyncio
async def test_edit_then_text_sends_override(self, db):
mgr, sender = _make_manager(db, draft_text="original draft")
await mgr.handle_connection_update(_fake_business_connection())
await mgr.handle_business_message(_fake_business_message())
draft = db.get_pending_telegram_business_drafts_for_owner("100")[0]
did = draft["draft_id"]
# Tap Edit
async def _answer(**kw): pass
async def _edit(**kw): pass
await mgr.handle_callback(
data=f"bd:edit:{did}", caller_user_id="42",
answer=_answer, edit_message_text=_edit,
)
sender.calls.clear()
# Owner sends override text
consumed = await mgr.maybe_handle_edit_capture(
owner_chat_id="100", text="my custom reply",
)
assert consumed is True
# Customer received the override
customer_sends = [c for c in sender.calls if c.get("chat_id") == 200]
assert customer_sends
assert customer_sends[0]["text"] == "my custom reply"
assert customer_sends[0]["business_connection_id"] == "conn1"
# Draft now resolved as edited
row = db.get_telegram_business_draft(did)
assert row["status"] == "edited"
assert row["final_sent_text"] == "my custom reply"
@pytest.mark.asyncio
async def test_edit_capture_idle_returns_false(self, db):
mgr, _ = _make_manager(db)
consumed = await mgr.maybe_handle_edit_capture(
owner_chat_id="100", text="hello",
)
assert consumed is False
class TestBizCommand:
@pytest.mark.asyncio
async def test_status_with_no_connections(self, db):
mgr, _ = _make_manager(db)
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=[],
)
assert "haven't connected" in reply.lower()
@pytest.mark.asyncio
async def test_status_with_connection(self, db):
mgr, _ = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=[],
)
assert "active" in reply
assert "drafting ON" in reply
@pytest.mark.asyncio
async def test_pause_and_resume(self, db):
mgr, _ = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=["pause"],
)
assert "Paused" in reply
assert db.get_telegram_business_connection("conn1")["auto_draft"] is False
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=["resume"],
)
assert "Resumed" in reply
assert db.get_telegram_business_connection("conn1")["auto_draft"] is True
@pytest.mark.asyncio
async def test_per_chat_off_and_on(self, db):
mgr, _ = _make_manager(db)
await mgr.handle_connection_update(_fake_business_connection())
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=["off", "200"],
)
assert "200" in reply
assert "200" in db.get_telegram_business_connection("conn1")["paused_chats"]
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=["on", "200"],
)
assert "200" in reply
assert "200" not in db.get_telegram_business_connection("conn1")["paused_chats"]
@pytest.mark.asyncio
async def test_unknown_subcommand_returns_help(self, db):
mgr, _ = _make_manager(db)
reply = await mgr.handle_biz_command(
owner_user_id="42", owner_chat_id="100", args=["unknown"],
)
assert "Usage" in reply

View File

@@ -920,6 +920,85 @@ Tap a button to answer, or tap **Other** to type a free-form response (the next
Configure the response timeout via `agent.clarify_timeout` in `~/.hermes/config.yaml` (default `600` seconds). If you don't respond within the timeout, the agent unblocks with a sentinel message and adapts rather than hanging.
## Business Mode (Secretary Bots)
Telegram Business Mode lets a Telegram Business account owner connect this bot to their personal account so it can **draft replies** to incoming customer messages. Replies are **never sent automatically** — every draft is delivered to the owner's DM with a [✓ Send] [✎ Edit] [✕ Discard] inline keyboard, and nothing reaches the customer until the owner taps Send.
### How the flow works
1. The owner connects the bot via Telegram → Settings → Business → Chatbots.
2. A customer messages the owner in a chat covered by the connection.
3. The bot debounces typing bursts (default 8s) and produces a single draft.
4. The bot DMs the draft to the **owner's own chat** with this bot:
```
💬 Carol wrote:
> Hey, are you free Friday afternoon?
📝 Suggested reply:
Friday afternoon works — anywhere between 2 and 5pm?
[✓ Send] [✎ Edit] [✕ Discard]
```
5. The owner taps:
- **Send** — the draft is delivered to the customer using the connection's `business_connection_id`, appearing as if the owner sent it.
- **Edit** — the bot replies "send me the text you want delivered" and uses the owner's next DM as the final message.
- **Discard** — the draft is dropped, the customer sees nothing.
### Requirements
- A **Telegram Business** subscription on the owner's personal account.
- The bot's **Business Mode** toggle enabled in [@BotFather](https://t.me/botfather): `/mybots` → pick your bot → Bot Settings → Business Mode.
- For the **Send** button to be active, the owner must grant `Reply to Messages` permission when configuring the bot in their Telegram Business chatbot settings. Without it, the bot can still draft replies but the Send button is hidden — the owner has to copy and send manually.
### Configuration
In `~/.hermes/config.yaml`:
```yaml
telegram:
business_mode:
enabled: true # off by default
debounce_seconds: 8 # coalesce typing-burst messages into one draft
draft_ttl_hours: 24 # pending drafts older than this expire
max_customer_text_chars: 4000
owner_persona: "" # optional system-prompt override
```
The `owner_persona` field lets you customize the voice of drafts — for example, `"You are drafting on behalf of a freelance illustrator. Sound friendly and professional; never quote prices without consulting me first."` When empty, a sensible default persona is used (warm, direct, concise; matches the customer's language).
Restart the gateway after changing these.
### `/biz` slash command
The owner can manage drafting via `/biz` in their DM with the bot:
| Command | Effect |
|---|---|
| `/biz` | Show status of all connections and active customer chats |
| `/biz pause` | Globally stop producing drafts (customers' messages still arrive — owner just sees them, no drafts) |
| `/biz resume` | Resume drafting |
| `/biz off <chat_id>` | Mute drafting for one customer chat |
| `/biz on <chat_id>` | Re-enable drafting for one customer chat |
### What stays on by default
- **Drafting is always observe-with-approval.** There is no auto-send setting in v1. Even with `can_reply` enabled, every reply requires an explicit owner tap.
- **Only the connected owner can use the inline buttons.** Telegram callback queries are user-scoped — if someone else taps Send (e.g. another user with bot DM access), the bot rejects with `⛔ Only the connected account owner can use these buttons.`
- **Drafts expire after 24h** (configurable). Stale buttons become no-ops with `That draft has expired.`
- **Telegram never sees a draft.** Drafts live in `~/.hermes/state.db` under `telegram_business_drafts`; only the owner's DM ever sees the suggested text.
### Limits in v1
- **Text only.** Customer media (photos, voice, documents) is skipped — only text messages and captions trigger drafts. v2 will add media context via vision.
- **No conversation history.** Each customer message is drafted in isolation; the bot doesn't remember the previous exchange. If your customer is mid-conversation, the draft may need editing — that's what the Edit button is for.
- **No per-customer persona learning.** Owner edits override the draft for that one reply but don't train future drafts.
### Disabling
To turn the feature off entirely, set `enabled: false` and restart the gateway. To revoke individual connections, the owner can disconnect this bot from their Telegram Business settings — the bot will receive a `BusinessConnection` "ended" update and stop drafting for that account immediately.
## Security
:::warning