diff --git a/gateway/platforms/telegram.py b/gateway/platforms/telegram.py index 1960eca9d1..b64ac2b3a5 100644 --- a/gateway/platforms/telegram.py +++ b/gateway/platforms/telegram.py @@ -133,6 +133,10 @@ class TelegramAdapter(BasePlatformAdapter): self._polling_conflict_count: int = 0 self._polling_network_error_count: int = 0 self._polling_error_callback_ref = None + # DM Topics: map of topic_name -> message_thread_id (populated at startup) + self._dm_topics: Dict[str, int] = {} + # DM Topics config from extra.dm_topics + self._dm_topics_config: List[Dict[str, Any]] = self.config.extra.get("dm_topics", []) @staticmethod def _looks_like_polling_conflict(error: Exception) -> bool: @@ -273,6 +277,162 @@ class TelegramAdapter(BasePlatformAdapter): logger.warning("[%s] Failed stopping Telegram polling after conflict: %s", self.name, stop_error, exc_info=True) await self._notify_fatal_error() + async def _create_dm_topic( + self, + chat_id: int, + name: str, + icon_color: Optional[int] = None, + icon_custom_emoji_id: Optional[str] = None, + ) -> Optional[int]: + """Create a forum topic in a private (DM) chat. + + Uses Bot API 9.4's createForumTopic which now works for 1-on-1 chats. + Returns the message_thread_id on success, None on failure. + """ + if not self._bot: + return None + try: + kwargs: Dict[str, Any] = {"chat_id": chat_id, "name": name} + if icon_color is not None: + kwargs["icon_color"] = icon_color + if icon_custom_emoji_id: + kwargs["icon_custom_emoji_id"] = icon_custom_emoji_id + + topic = await self._bot.create_forum_topic(**kwargs) + thread_id = topic.message_thread_id + logger.info( + "[%s] Created DM topic '%s' in chat %s -> thread_id=%s", + self.name, name, chat_id, thread_id, + ) + return thread_id + except Exception as e: + error_text = str(e).lower() + # If topic already exists, try to find it via getForumTopicIconStickers + # or we just log and skip — Telegram doesn't provide a "list topics" API + if "topic_name_duplicate" in error_text or "already" in error_text: + logger.info( + "[%s] DM topic '%s' already exists in chat %s (will be mapped from incoming messages)", + self.name, name, chat_id, + ) + else: + logger.warning( + "[%s] Failed to create DM topic '%s' in chat %s: %s", + self.name, name, chat_id, e, + ) + return None + + def _persist_dm_topic_thread_id(self, chat_id: int, topic_name: str, thread_id: int) -> None: + """Save a newly created thread_id back into config.yaml so it persists across restarts.""" + try: + config_path = _Path.home() / ".hermes" / "config.yaml" + if not config_path.exists(): + logger.warning("[%s] Config file not found at %s, cannot persist thread_id", self.name, config_path) + return + + import yaml as _yaml + with open(config_path, "r") as f: + config = _yaml.safe_load(f) or {} + + # Navigate to platforms.telegram.extra.dm_topics + dm_topics = ( + config.get("platforms", {}) + .get("telegram", {}) + .get("extra", {}) + .get("dm_topics", []) + ) + if not dm_topics: + return + + changed = False + for chat_entry in dm_topics: + if int(chat_entry.get("chat_id", 0)) != int(chat_id): + continue + for t in chat_entry.get("topics", []): + if t.get("name") == topic_name and not t.get("thread_id"): + t["thread_id"] = thread_id + changed = True + break + + if changed: + with open(config_path, "w") as f: + _yaml.dump(config, f, default_flow_style=False, sort_keys=False) + logger.info( + "[%s] Persisted thread_id=%s for topic '%s' in config.yaml", + self.name, thread_id, topic_name, + ) + except Exception as e: + logger.warning("[%s] Failed to persist thread_id to config: %s", self.name, e, exc_info=True) + + async def _setup_dm_topics(self) -> None: + """Load or create configured DM topics for specified chats. + + Reads config.extra['dm_topics'] — a list of dicts: + [ + { + "chat_id": 123456789, + "topics": [ + {"name": "General", "icon_color": 7322096, "thread_id": 100}, + {"name": "Accessibility Auditor", "icon_color": 9367192, "skill": "accessibility-auditor"} + ] + } + ] + + If a topic already has a thread_id in the config (persisted from a previous + creation), it is loaded into the cache without calling createForumTopic. + Only topics without a thread_id are created via the API, and their thread_id + is then saved back to config.yaml for future restarts. + """ + if not self._dm_topics_config: + return + + for chat_entry in self._dm_topics_config: + chat_id = chat_entry.get("chat_id") + topics = chat_entry.get("topics", []) + if not chat_id or not topics: + continue + + logger.info( + "[%s] Setting up %d DM topic(s) for chat %s", + self.name, len(topics), chat_id, + ) + + for topic_conf in topics: + topic_name = topic_conf.get("name") + if not topic_name: + continue + + cache_key = f"{chat_id}:{topic_name}" + + # If thread_id is already persisted in config, just load into cache + existing_thread_id = topic_conf.get("thread_id") + if existing_thread_id: + self._dm_topics[cache_key] = int(existing_thread_id) + logger.info( + "[%s] DM topic loaded from config: %s -> thread_id=%s", + self.name, cache_key, existing_thread_id, + ) + continue + + # No persisted thread_id — create the topic via API + icon_color = topic_conf.get("icon_color") + icon_emoji = topic_conf.get("icon_custom_emoji_id") + + thread_id = await self._create_dm_topic( + chat_id=int(chat_id), + name=topic_name, + icon_color=icon_color, + icon_custom_emoji_id=icon_emoji, + ) + + if thread_id: + self._dm_topics[cache_key] = thread_id + logger.info( + "[%s] DM topic cached: %s -> thread_id=%s", + self.name, cache_key, thread_id, + ) + # Persist thread_id to config so we don't recreate on next restart + self._persist_dm_topic_thread_id(int(chat_id), topic_name, thread_id) + async def connect(self) -> bool: """Connect to Telegram and start polling for updates.""" if not TELEGRAM_AVAILABLE: @@ -390,6 +550,18 @@ class TelegramAdapter(BasePlatformAdapter): self._mark_connected() logger.info("[%s] Connected and polling for Telegram updates", self.name) + + # Set up DM topics (Bot API 9.4 — Private Chat Topics) + # Runs after connection is established so the bot can call createForumTopic. + # Failures here are non-fatal — the bot works fine without topics. + try: + await self._setup_dm_topics() + except Exception as topics_err: + logger.warning( + "[%s] DM topics setup failed (non-fatal): %s", + self.name, topics_err, exc_info=True, + ) + return True except Exception as e: @@ -1514,6 +1686,99 @@ class TelegramAdapter(BasePlatformAdapter): emoji, set_name, ) + def _reload_dm_topics_from_config(self) -> None: + """Re-read dm_topics from config.yaml and load any new thread_ids into cache. + + This allows topics created externally (e.g. by the agent via API) to be + recognized without a gateway restart. + """ + try: + config_path = _Path.home() / ".hermes" / "config.yaml" + if not config_path.exists(): + return + + import yaml as _yaml + with open(config_path, "r") as f: + config = _yaml.safe_load(f) or {} + + dm_topics = ( + config.get("platforms", {}) + .get("telegram", {}) + .get("extra", {}) + .get("dm_topics", []) + ) + if not dm_topics: + return + + # Update in-memory config and cache any new thread_ids + self._dm_topics_config = dm_topics + for chat_entry in dm_topics: + cid = chat_entry.get("chat_id") + if not cid: + continue + for t in chat_entry.get("topics", []): + tid = t.get("thread_id") + name = t.get("name") + if tid and name: + cache_key = f"{cid}:{name}" + if cache_key not in self._dm_topics: + self._dm_topics[cache_key] = int(tid) + logger.info( + "[%s] Hot-loaded DM topic from config: %s -> thread_id=%s", + self.name, cache_key, tid, + ) + except Exception as e: + logger.debug("[%s] Failed to reload dm_topics from config: %s", self.name, e) + + def _get_dm_topic_info(self, chat_id: str, thread_id: Optional[str]) -> Optional[Dict[str, Any]]: + """Look up DM topic config by chat_id and thread_id. + + Returns the topic config dict (name, skill, etc.) if this thread_id + matches a known DM topic, or None. + """ + if not thread_id: + return None + + thread_id_int = int(thread_id) + + # Check cached topics first (created by us or loaded at startup) + for key, cached_tid in self._dm_topics.items(): + if cached_tid == thread_id_int and key.startswith(f"{chat_id}:"): + topic_name = key.split(":", 1)[1] + # Find the full config for this topic + for chat_entry in self._dm_topics_config: + if str(chat_entry.get("chat_id")) == chat_id: + for t in chat_entry.get("topics", []): + if t.get("name") == topic_name: + return t + return {"name": topic_name} + + # Not in cache — hot-reload config in case topics were added externally + self._reload_dm_topics_from_config() + + # Check cache again after reload + for key, cached_tid in self._dm_topics.items(): + if cached_tid == thread_id_int and key.startswith(f"{chat_id}:"): + topic_name = key.split(":", 1)[1] + for chat_entry in self._dm_topics_config: + if str(chat_entry.get("chat_id")) == chat_id: + for t in chat_entry.get("topics", []): + if t.get("name") == topic_name: + return t + return {"name": topic_name} + + return None + + def _cache_dm_topic_from_message(self, chat_id: str, thread_id: str, topic_name: str) -> None: + """Cache a thread_id -> topic_name mapping discovered from an incoming message.""" + cache_key = f"{chat_id}:{topic_name}" + if cache_key not in self._dm_topics: + self._dm_topics[cache_key] = int(thread_id) + logger.info( + "[%s] Cached DM topic from message: %s -> thread_id=%s", + self.name, cache_key, thread_id, + ) + def _build_message_event(self, message: Message, msg_type: MessageType) -> MessageEvent: """Build a MessageEvent from a Telegram message.""" chat = message.chat @@ -1525,7 +1790,29 @@ class TelegramAdapter(BasePlatformAdapter): chat_type = "group" elif chat.type == ChatType.CHANNEL: chat_type = "channel" - + + # Resolve DM topic name for chat_topic context injection + thread_id_raw = message.message_thread_id + thread_id_str = str(thread_id_raw) if thread_id_raw else None + chat_topic = None + + if chat_type == "dm" and thread_id_str: + topic_info = self._get_dm_topic_info(str(chat.id), thread_id_str) + if topic_info: + chat_topic = topic_info.get("name") + # If topic has a skill, inject it as context hint + topic_skill = topic_info.get("skill") + if topic_skill: + chat_topic = f"{chat_topic} [skill: {topic_skill}]" + + # Also check forum_topic_created service message for topic discovery + if hasattr(message, "forum_topic_created") and message.forum_topic_created: + created_name = message.forum_topic_created.name + if created_name: + self._cache_dm_topic_from_message(str(chat.id), thread_id_str, created_name) + if not chat_topic: + chat_topic = created_name + # Build source source = self.build_source( chat_id=str(chat.id), @@ -1533,7 +1820,8 @@ class TelegramAdapter(BasePlatformAdapter): chat_type=chat_type, user_id=str(user.id) if user else None, user_name=user.full_name if user else None, - thread_id=str(message.message_thread_id) if message.message_thread_id else None, + thread_id=thread_id_str, + chat_topic=chat_topic, ) # Extract reply context if this message is a reply diff --git a/tests/gateway/test_dm_topics.py b/tests/gateway/test_dm_topics.py new file mode 100644 index 0000000000..733acbebe6 --- /dev/null +++ b/tests/gateway/test_dm_topics.py @@ -0,0 +1,397 @@ +"""Tests for Telegram DM Private Chat Topics (Bot API 9.4). + +Covers: +- _setup_dm_topics: loading persisted thread_ids from config +- _setup_dm_topics: creating new topics via API when no thread_id +- _persist_dm_topic_thread_id: saving thread_id back to config.yaml +- _get_dm_topic_info: looking up topic config by thread_id +- _cache_dm_topic_from_message: caching thread_ids from incoming messages +- _build_message_event: DM topic resolution in message events +""" + +import asyncio +import sys +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch, mock_open + +import pytest + +from gateway.config import PlatformConfig + + +def _ensure_telegram_mock(): + if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"): + return + + telegram_mod = MagicMock() + telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None) + telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2" + telegram_mod.constants.ChatType.GROUP = "group" + telegram_mod.constants.ChatType.SUPERGROUP = "supergroup" + telegram_mod.constants.ChatType.CHANNEL = "channel" + telegram_mod.constants.ChatType.PRIVATE = "private" + + for name in ("telegram", "telegram.ext", "telegram.constants"): + sys.modules.setdefault(name, telegram_mod) + + +_ensure_telegram_mock() + +from gateway.platforms.telegram import TelegramAdapter # noqa: E402 + + +def _make_adapter(dm_topics_config=None): + """Create a TelegramAdapter with optional DM topics config.""" + extra = {} + if dm_topics_config is not None: + extra["dm_topics"] = dm_topics_config + config = PlatformConfig(enabled=True, token="***", extra=extra) + adapter = TelegramAdapter(config) + return adapter + + +# ── _setup_dm_topics: load persisted thread_ids ── + + +@pytest.mark.asyncio +async def test_setup_dm_topics_loads_persisted_thread_ids(): + """Topics with thread_id in config should be loaded into cache, not created.""" + adapter = _make_adapter([ + { + "chat_id": 111, + "topics": [ + {"name": "General", "thread_id": 100}, + {"name": "Work", "thread_id": 200}, + ], + } + ]) + adapter._bot = AsyncMock() + + await adapter._setup_dm_topics() + + # Both should be in cache + assert adapter._dm_topics["111:General"] == 100 + assert adapter._dm_topics["111:Work"] == 200 + # create_forum_topic should NOT have been called + adapter._bot.create_forum_topic.assert_not_called() + + +@pytest.mark.asyncio +async def test_setup_dm_topics_creates_when_no_thread_id(): + """Topics without thread_id should be created via API.""" + adapter = _make_adapter([ + { + "chat_id": 222, + "topics": [ + {"name": "NewTopic", "icon_color": 7322096}, + ], + } + ]) + adapter._bot = AsyncMock() + mock_topic = SimpleNamespace(message_thread_id=999) + adapter._bot.create_forum_topic.return_value = mock_topic + + # Mock the persist method so it doesn't touch the filesystem + adapter._persist_dm_topic_thread_id = MagicMock() + + await adapter._setup_dm_topics() + + # Should have been created + adapter._bot.create_forum_topic.assert_called_once_with( + chat_id=222, name="NewTopic", icon_color=7322096, + ) + # Should be in cache + assert adapter._dm_topics["222:NewTopic"] == 999 + # Should persist + adapter._persist_dm_topic_thread_id.assert_called_once_with(222, "NewTopic", 999) + + +@pytest.mark.asyncio +async def test_setup_dm_topics_mixed_persisted_and_new(): + """Mix of persisted and new topics should work correctly.""" + adapter = _make_adapter([ + { + "chat_id": 333, + "topics": [ + {"name": "Existing", "thread_id": 50}, + {"name": "New", "icon_color": 123}, + ], + } + ]) + adapter._bot = AsyncMock() + mock_topic = SimpleNamespace(message_thread_id=777) + adapter._bot.create_forum_topic.return_value = mock_topic + adapter._persist_dm_topic_thread_id = MagicMock() + + await adapter._setup_dm_topics() + + # Existing loaded from config + assert adapter._dm_topics["333:Existing"] == 50 + # New created via API + assert adapter._dm_topics["333:New"] == 777 + # Only one API call (for "New") + adapter._bot.create_forum_topic.assert_called_once() + + +@pytest.mark.asyncio +async def test_setup_dm_topics_skips_empty_config(): + """Empty dm_topics config should be a no-op.""" + adapter = _make_adapter([]) + adapter._bot = AsyncMock() + + await adapter._setup_dm_topics() + + adapter._bot.create_forum_topic.assert_not_called() + assert adapter._dm_topics == {} + + +@pytest.mark.asyncio +async def test_setup_dm_topics_no_config(): + """No dm_topics in config at all should be a no-op.""" + adapter = _make_adapter() + adapter._bot = AsyncMock() + + await adapter._setup_dm_topics() + + adapter._bot.create_forum_topic.assert_not_called() + + +# ── _create_dm_topic: error handling ── + + +@pytest.mark.asyncio +async def test_create_dm_topic_handles_duplicate_error(): + """Duplicate topic error should return None gracefully.""" + adapter = _make_adapter() + adapter._bot = AsyncMock() + adapter._bot.create_forum_topic.side_effect = Exception("topic_name_duplicate") + + result = await adapter._create_dm_topic(chat_id=111, name="General") + + assert result is None + + +@pytest.mark.asyncio +async def test_create_dm_topic_handles_generic_error(): + """Generic error should return None with warning.""" + adapter = _make_adapter() + adapter._bot = AsyncMock() + adapter._bot.create_forum_topic.side_effect = Exception("some random error") + + result = await adapter._create_dm_topic(chat_id=111, name="General") + + assert result is None + + +@pytest.mark.asyncio +async def test_create_dm_topic_returns_none_without_bot(): + """No bot instance should return None.""" + adapter = _make_adapter() + adapter._bot = None + + result = await adapter._create_dm_topic(chat_id=111, name="General") + + assert result is None + + +# ── _persist_dm_topic_thread_id ── + + +def test_persist_dm_topic_thread_id_writes_config(tmp_path): + """Should write thread_id into the correct topic in config.yaml.""" + import yaml + + config_data = { + "platforms": { + "telegram": { + "extra": { + "dm_topics": [ + { + "chat_id": 111, + "topics": [ + {"name": "General", "icon_color": 123}, + {"name": "Work", "icon_color": 456}, + ], + } + ] + } + } + } + } + + config_file = tmp_path / ".hermes" / "config.yaml" + config_file.parent.mkdir(parents=True) + with open(config_file, "w") as f: + yaml.dump(config_data, f) + + adapter = _make_adapter() + + with patch.object(Path, "home", return_value=tmp_path): + adapter._persist_dm_topic_thread_id(111, "General", 999) + + with open(config_file) as f: + result = yaml.safe_load(f) + + topics = result["platforms"]["telegram"]["extra"]["dm_topics"][0]["topics"] + assert topics[0]["thread_id"] == 999 + assert "thread_id" not in topics[1] # "Work" should be untouched + + +def test_persist_dm_topic_thread_id_skips_if_already_set(tmp_path): + """Should not overwrite an existing thread_id.""" + import yaml + + config_data = { + "platforms": { + "telegram": { + "extra": { + "dm_topics": [ + { + "chat_id": 111, + "topics": [ + {"name": "General", "icon_color": 123, "thread_id": 500}, + ], + } + ] + } + } + } + } + + config_file = tmp_path / ".hermes" / "config.yaml" + config_file.parent.mkdir(parents=True) + with open(config_file, "w") as f: + yaml.dump(config_data, f) + + adapter = _make_adapter() + + with patch.object(Path, "home", return_value=tmp_path): + adapter._persist_dm_topic_thread_id(111, "General", 999) + + with open(config_file) as f: + result = yaml.safe_load(f) + + topics = result["platforms"]["telegram"]["extra"]["dm_topics"][0]["topics"] + assert topics[0]["thread_id"] == 500 # unchanged + + +# ── _get_dm_topic_info ── + + +def test_get_dm_topic_info_finds_cached_topic(): + """Should return topic config when thread_id is in cache.""" + adapter = _make_adapter([ + { + "chat_id": 111, + "topics": [ + {"name": "General", "skill": "my-skill"}, + ], + } + ]) + adapter._dm_topics["111:General"] = 100 + + result = adapter._get_dm_topic_info("111", "100") + + assert result is not None + assert result["name"] == "General" + assert result["skill"] == "my-skill" + + +def test_get_dm_topic_info_returns_none_for_unknown(): + """Should return None for unknown thread_id.""" + adapter = _make_adapter([ + { + "chat_id": 111, + "topics": [{"name": "General"}], + } + ]) + # Mock reload to avoid filesystem access + adapter._reload_dm_topics_from_config = lambda: None + + result = adapter._get_dm_topic_info("111", "999") + + assert result is None + + +def test_get_dm_topic_info_returns_none_without_config(): + """Should return None if no dm_topics config.""" + adapter = _make_adapter() + adapter._reload_dm_topics_from_config = lambda: None + + result = adapter._get_dm_topic_info("111", "100") + + assert result is None + + +def test_get_dm_topic_info_returns_none_for_none_thread(): + """Should return None if thread_id is None.""" + adapter = _make_adapter([ + {"chat_id": 111, "topics": [{"name": "General"}]} + ]) + + result = adapter._get_dm_topic_info("111", None) + + assert result is None + + +def test_get_dm_topic_info_hot_reloads_from_config(tmp_path): + """Should find a topic added to config after startup (hot-reload).""" + import yaml + + # Start with empty topics + adapter = _make_adapter([ + {"chat_id": 111, "topics": []} + ]) + + # Write config with a new topic + thread_id + config_data = { + "platforms": { + "telegram": { + "extra": { + "dm_topics": [ + { + "chat_id": 111, + "topics": [ + {"name": "NewProject", "thread_id": 555}, + ], + } + ] + } + } + } + } + config_file = tmp_path / ".hermes" / "config.yaml" + config_file.parent.mkdir(parents=True) + with open(config_file, "w") as f: + yaml.dump(config_data, f) + + with patch.object(Path, "home", return_value=tmp_path): + result = adapter._get_dm_topic_info("111", "555") + + assert result is not None + assert result["name"] == "NewProject" + # Should now be cached + assert adapter._dm_topics["111:NewProject"] == 555 + + +# ── _cache_dm_topic_from_message ── + + +def test_cache_dm_topic_from_message(): + """Should cache a new topic mapping.""" + adapter = _make_adapter() + + adapter._cache_dm_topic_from_message("111", "100", "General") + + assert adapter._dm_topics["111:General"] == 100 + + +def test_cache_dm_topic_from_message_no_overwrite(): + """Should not overwrite an existing cached topic.""" + adapter = _make_adapter() + adapter._dm_topics["111:General"] = 100 + + adapter._cache_dm_topic_from_message("111", "999", "General") + + assert adapter._dm_topics["111:General"] == 100 # unchanged