From b59bb4e351c4cdca97e906accb4bbe9193c381b6 Mon Sep 17 00:00:00 2001 From: leprincep35700 <61830395+leprincep35700@users.noreply.github.com> Date: Fri, 1 May 2026 15:19:25 +0000 Subject: [PATCH] fix(gateway): preserve home-channel thread targets across restart notifications --- cron/scheduler.py | 17 +- gateway/config.py | 31 ++- gateway/run.py | 205 +++++++++++++++--- tests/cron/test_scheduler.py | 10 + tests/gateway/restart_test_helpers.py | 18 +- tests/gateway/test_home_target_env_var.py | 8 +- tests/gateway/test_restart_notification.py | 213 ++++++++++++++++++- tests/gateway/test_restart_resume_pending.py | 81 ++++++- 8 files changed, 544 insertions(+), 39 deletions(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 4672b24ba7..fafcbfab95 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -147,6 +147,19 @@ def _get_home_target_chat_id(platform_name: str) -> str: return value +def _get_home_target_thread_id(platform_name: str) -> Optional[str]: + """Return the optional thread/topic ID for a platform home target.""" + env_var = _HOME_TARGET_ENV_VARS.get(platform_name.lower()) + if not env_var: + return None + value = os.getenv(f"{env_var}_THREAD_ID", "").strip() + if not value: + legacy = _LEGACY_HOME_TARGET_ENV_VARS.get(env_var) + if legacy: + value = os.getenv(f"{legacy}_THREAD_ID", "").strip() + return value or None + + def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[dict]: """Resolve one concrete auto-delivery target for a cron job.""" @@ -175,7 +188,7 @@ def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[d return { "platform": platform_name, "chat_id": chat_id, - "thread_id": None, + "thread_id": _get_home_target_thread_id(platform_name), } return None @@ -229,7 +242,7 @@ def _resolve_single_delivery_target(job: dict, deliver_value: str) -> Optional[d return { "platform": platform_name, "chat_id": chat_id, - "thread_id": None, + "thread_id": _get_home_target_thread_id(platform_name), } diff --git a/gateway/config.py b/gateway/config.py index 6db8e55d84..6527accec4 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -186,18 +186,24 @@ class HomeChannel: Default destination for a platform. When a cron job specifies deliver="telegram" without a specific chat ID, - messages are sent to this home channel. + messages are sent to this home channel. Thread-aware platforms may also + store a thread/topic ID so the bare platform target routes to the exact + conversation where /sethome was run. """ platform: Platform chat_id: str name: str # Human-readable name for display + thread_id: Optional[str] = None def to_dict(self) -> Dict[str, Any]: - return { + result = { "platform": self.platform.value, "chat_id": self.chat_id, "name": self.name, } + if self.thread_id: + result["thread_id"] = self.thread_id + return result @classmethod def from_dict(cls, data: Dict[str, Any]) -> "HomeChannel": @@ -205,6 +211,7 @@ class HomeChannel: platform=Platform(data["platform"]), chat_id=str(data["chat_id"]), name=data.get("name", "Home"), + thread_id=str(data["thread_id"]) if data.get("thread_id") else None, ) @@ -1071,6 +1078,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.TELEGRAM, chat_id=telegram_home, name=os.getenv("TELEGRAM_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("TELEGRAM_HOME_CHANNEL_THREAD_ID") or None, ) # Discord @@ -1087,6 +1095,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.DISCORD, chat_id=discord_home, name=os.getenv("DISCORD_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("DISCORD_HOME_CHANNEL_THREAD_ID") or None, ) # Reply threading mode for Discord (off/first/all) @@ -1108,6 +1117,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.WHATSAPP, chat_id=whatsapp_home, name=os.getenv("WHATSAPP_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("WHATSAPP_HOME_CHANNEL_THREAD_ID") or None, ) # Slack @@ -1135,6 +1145,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.SLACK, chat_id=slack_home, name=os.getenv("SLACK_HOME_CHANNEL_NAME", ""), + thread_id=os.getenv("SLACK_HOME_CHANNEL_THREAD_ID") or None, ) # Signal @@ -1155,6 +1166,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.SIGNAL, chat_id=signal_home, name=os.getenv("SIGNAL_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("SIGNAL_HOME_CHANNEL_THREAD_ID") or None, ) # Mattermost @@ -1174,6 +1186,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.MATTERMOST, chat_id=mattermost_home, name=os.getenv("MATTERMOST_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("MATTERMOST_HOME_CHANNEL_THREAD_ID") or None, ) # Matrix @@ -1205,6 +1218,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.MATRIX, chat_id=matrix_home, name=os.getenv("MATRIX_HOME_ROOM_NAME", "Home"), + thread_id=os.getenv("MATRIX_HOME_ROOM_THREAD_ID") or None, ) # Home Assistant @@ -1238,6 +1252,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.EMAIL, chat_id=email_home, name=os.getenv("EMAIL_HOME_ADDRESS_NAME", "Home"), + thread_id=os.getenv("EMAIL_HOME_ADDRESS_THREAD_ID") or None, ) # SMS (Twilio) @@ -1253,6 +1268,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.SMS, chat_id=sms_home, name=os.getenv("SMS_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("SMS_HOME_CHANNEL_THREAD_ID") or None, ) # API Server @@ -1315,6 +1331,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.DINGTALK, chat_id=dingtalk_home, name=os.getenv("DINGTALK_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("DINGTALK_HOME_CHANNEL_THREAD_ID") or None, ) # Feishu / Lark @@ -1342,6 +1359,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.FEISHU, chat_id=feishu_home, name=os.getenv("FEISHU_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("FEISHU_HOME_CHANNEL_THREAD_ID") or None, ) # WeCom (Enterprise WeChat) @@ -1364,6 +1382,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.WECOM, chat_id=wecom_home, name=os.getenv("WECOM_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("WECOM_HOME_CHANNEL_THREAD_ID") or None, ) # WeCom callback mode (self-built apps) @@ -1422,6 +1441,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.WEIXIN, chat_id=weixin_home, name=os.getenv("WEIXIN_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("WEIXIN_HOME_CHANNEL_THREAD_ID") or None, ) # BlueBubbles (iMessage) @@ -1445,6 +1465,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.BLUEBUBBLES, chat_id=bluebubbles_home, name=os.getenv("BLUEBUBBLES_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("BLUEBUBBLES_HOME_CHANNEL_THREAD_ID") or None, ) # QQ (Official Bot API v2) @@ -1482,6 +1503,11 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.QQBOT, chat_id=qq_home, name=os.getenv("QQBOT_HOME_CHANNEL_NAME") or os.getenv(qq_home_name_env, "Home"), + thread_id=( + os.getenv("QQBOT_HOME_CHANNEL_THREAD_ID") + or os.getenv("QQ_HOME_CHANNEL_THREAD_ID") + or None + ), ) # Yuanbao — YUANBAO_APP_ID preferred @@ -1512,6 +1538,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None: platform=Platform.YUANBAO, chat_id=yuanbao_home, name=os.getenv("YUANBAO_HOME_CHANNEL_NAME", "Home"), + thread_id=os.getenv("YUANBAO_HOME_CHANNEL_THREAD_ID") or None, ) yuanbao_dm_policy = os.getenv("YUANBAO_DM_POLICY") if yuanbao_dm_policy: diff --git a/gateway/run.py b/gateway/run.py index 86076bf0bf..d604947e99 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -283,6 +283,16 @@ def _home_target_env_var(platform_name: str) -> str: ) +def _home_thread_env_var(platform_name: str) -> str: + """Return the optional thread/topic env var for a platform home target.""" + return f"{_home_target_env_var(platform_name)}_THREAD_ID" + + +def _restart_notification_pending() -> bool: + """Return True when a /restart completion marker is waiting to be delivered.""" + return (_hermes_home / ".restart_notify.json").exists() + + _ensure_ssl_certs() # Add parent directory to path @@ -507,6 +517,8 @@ from gateway.config import ( Platform, _BUILTIN_PLATFORM_VALUES, GatewayConfig, + HomeChannel, + PlatformConfig, load_gateway_config, ) from gateway.session import ( @@ -2257,15 +2269,13 @@ class GatewayRunner: logger.debug("Failed interrupting agent during shutdown: %s", e) async def _notify_active_sessions_of_shutdown(self) -> None: - """Send a notification to every chat with an active agent. + """Send shutdown/restart notifications to active chats and home channels. Called at the very start of stop() — adapters are still connected so - messages can be delivered. Best-effort: individual send failures are + messages can be delivered. Best-effort: individual send failures are logged and swallowed so they never block the shutdown sequence. """ active = self._snapshot_running_agents() - if not active: - return action = "restarting" if self._restart_requested else "shutting down" hint = ( @@ -2276,7 +2286,7 @@ class GatewayRunner: ) msg = f"⚠️ Gateway {action} — {hint}" - notified: set = set() + notified: set[tuple[str, str, Optional[str]]] = set() for session_key in active: source = None try: @@ -2293,7 +2303,7 @@ class GatewayRunner: if source is not None: platform_str = source.platform.value - chat_id = source.chat_id + chat_id = str(source.chat_id) thread_id = source.thread_id else: # Fall back to parsing the session key when no persisted @@ -2305,9 +2315,10 @@ class GatewayRunner: chat_id = _parsed["chat_id"] thread_id = _parsed.get("thread_id") - # Deduplicate: one notification per chat, even if multiple - # sessions (different users/threads) share the same chat. - dedup_key = (platform_str, chat_id) + # Deduplicate only identical delivery targets. Thread/topic-aware + # platforms can share a parent chat while still routing to distinct + # destinations via metadata. + dedup_key = (platform_str, chat_id, str(thread_id) if thread_id else None) if dedup_key in notified: continue @@ -2321,10 +2332,19 @@ class GatewayRunner: # correct forum topic / thread. metadata = {"thread_id": thread_id} if thread_id else None - await adapter.send(chat_id, msg, metadata=metadata) + result = await adapter.send(chat_id, msg, metadata=metadata) + if result is not None and getattr(result, "success", True) is False: + logger.debug( + "Failed to send shutdown notification to %s:%s: %s", + platform_str, + chat_id, + getattr(result, "error", "send returned success=False"), + ) + continue + notified.add(dedup_key) logger.info( - "Sent shutdown notification to %s:%s", + "Sent shutdown notification to active chat %s:%s", platform_str, chat_id, ) except Exception as e: @@ -2333,6 +2353,44 @@ class GatewayRunner: platform_str, chat_id, e, ) + for platform, adapter in self.adapters.items(): + home = self.config.get_home_channel(platform) + if not home or not home.chat_id: + continue + + dedup_key = (platform.value, str(home.chat_id), str(home.thread_id) if home.thread_id else None) + if dedup_key in notified: + continue + + try: + metadata = {"thread_id": home.thread_id} if home.thread_id else None + if metadata: + result = await adapter.send(str(home.chat_id), msg, metadata=metadata) + else: + result = await adapter.send(str(home.chat_id), msg) + if result is not None and getattr(result, "success", True) is False: + logger.debug( + "Failed to send shutdown notification to home channel %s:%s: %s", + platform.value, + home.chat_id, + getattr(result, "error", "send returned success=False"), + ) + continue + + notified.add(dedup_key) + logger.info( + "Sent shutdown notification to home channel %s:%s", + platform.value, + home.chat_id, + ) + except Exception as e: + logger.debug( + "Failed to send shutdown notification to home channel %s:%s: %s", + platform.value, + home.chat_id, + e, + ) + def _finalize_shutdown_agents(self, active_agents: Dict[str, Any]) -> None: for agent in active_agents.values(): try: @@ -2953,8 +3011,28 @@ class GatewayRunner: ): self._schedule_update_notification_watch() + # Give freshly connected platform adapters a brief moment to settle + # before sending restart/startup lifecycle messages. In practice this + # helps Discord thread deliveries right after reconnect. + if connected_count > 0: + await asyncio.sleep(1.0) + # Notify the chat that initiated /restart that the gateway is back. - await self._send_restart_notification() + restart_notification_pending = _restart_notification_pending() + delivered_restart_target = await self._send_restart_notification() + + # Broadcast a lightweight "gateway is back" message to configured + # home channels only when this startup is resuming from /restart. If a + # /restart requester already received a direct completion notice in the + # same chat, skip the generic broadcast there to avoid duplicates while + # still allowing a home-channel fallback when the direct send fails. + if restart_notification_pending or delivered_restart_target is not None: + skip_home_targets = ( + {delivered_restart_target} if delivered_restart_target else None + ) + await self._send_home_channel_startup_notifications( + skip_targets=skip_home_targets, + ) # Drain any recovered process watchers (from crash recovery checkpoint) try: @@ -7976,14 +8054,33 @@ class GatewayRunner: chat_name = source.chat_name or chat_id env_key = _home_target_env_var(platform_name) + thread_env_key = _home_thread_env_var(platform_name) + thread_id = source.thread_id # Save to .env so it persists across restarts try: from hermes_cli.config import save_env_value save_env_value(env_key, str(chat_id)) + # Keep thread/topic routing explicit and clear stale values when + # /sethome is run from the parent chat instead of a thread. + save_env_value(thread_env_key, str(thread_id or "")) except Exception as e: return f"Failed to save home channel: {e}" + # Keep the running gateway config in sync too. The pre-restart + # notification path reads self.config before the process reloads env. + if source.platform: + platform_config = self.config.platforms.setdefault( + source.platform, + PlatformConfig(enabled=True), + ) + platform_config.home_channel = HomeChannel( + platform=source.platform, + chat_id=str(chat_id), + name=chat_name, + thread_id=str(thread_id) if thread_id else None, + ) + return ( f"✅ Home channel set to **{chat_name}** (ID: {chat_id}).\n" f"Cron jobs and cross-platform messages will be delivered here." @@ -10467,11 +10564,11 @@ class GatewayRunner: return True - async def _send_restart_notification(self) -> None: + async def _send_restart_notification(self) -> Optional[tuple[str, str, Optional[str]]]: """Notify the chat that initiated /restart that the gateway is back.""" notify_path = _hermes_home / ".restart_notify.json" if not notify_path.exists(): - return + return None try: data = json.loads(notify_path.read_text()) @@ -10480,7 +10577,7 @@ class GatewayRunner: thread_id = data.get("thread_id") if not platform_str or not chat_id: - return + return None platform = Platform(platform_str) adapter = self.adapters.get(platform) @@ -10489,11 +10586,11 @@ class GatewayRunner: "Restart notification skipped: %s adapter not connected", platform_str, ) - return + return None metadata = {"thread_id": thread_id} if thread_id else None result = await adapter.send( - chat_id, + str(chat_id), "♻ Gateway restarted successfully. Your session continues.", metadata=metadata, ) @@ -10501,24 +10598,82 @@ class GatewayRunner: # and returns SendResult(success=False) rather than raising, so # we must inspect the result before claiming success — otherwise # the log line is misleading and hides real delivery failures. - if getattr(result, "success", False): - logger.info( - "Sent restart notification to %s:%s", - platform_str, - chat_id, - ) - else: + if result is not None and getattr(result, "success", True) is False: logger.warning( "Restart notification to %s:%s was not delivered: %s", platform_str, chat_id, - getattr(result, "error", "unknown error"), + getattr(result, "error", "send returned success=False"), ) + return None + + logger.info( + "Sent restart notification to %s:%s", + platform_str, + chat_id, + ) + return str(platform_str), str(chat_id), str(thread_id) if thread_id else None except Exception as e: logger.warning("Restart notification failed: %s", e) + return None finally: notify_path.unlink(missing_ok=True) + async def _send_home_channel_startup_notifications( + self, + *, + skip_targets: Optional[set[tuple[str, str, Optional[str]]]] = None, + ) -> set[tuple[str, str, Optional[str]]]: + """Notify configured home channels that the gateway is back online. + + The notification is best-effort and sent once per connected platform + home channel. ``skip_targets`` lets startup avoid duplicate messages + when a more specific restart notification is queued for the same chat. + """ + delivered: set[tuple[str, str, Optional[str]]] = set() + skipped = skip_targets or set() + message = "♻️ Gateway online — Hermes is back and ready." + + for platform, adapter in self.adapters.items(): + home = self.config.get_home_channel(platform) + if not home or not home.chat_id: + continue + + target = (platform.value, str(home.chat_id), str(home.thread_id) if home.thread_id else None) + if target in skipped or target in delivered: + continue + + try: + metadata = {"thread_id": home.thread_id} if home.thread_id else None + if metadata: + result = await adapter.send(str(home.chat_id), message, metadata=metadata) + else: + result = await adapter.send(str(home.chat_id), message) + if result is not None and getattr(result, "success", True) is False: + logger.warning( + "Home-channel startup notification failed for %s:%s: %s", + platform.value, + home.chat_id, + getattr(result, "error", "send returned success=False"), + ) + continue + + delivered.add(target) + logger.info( + "Sent home-channel startup notification to %s:%s", + platform.value, + home.chat_id, + ) + except Exception as exc: + logger.warning( + "Home-channel startup notification failed for %s:%s: %s", + platform.value, + home.chat_id, + exc, + ) + + return delivered + def _set_session_env(self, context: SessionContext) -> list: """Set session context variables for the current async task. diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index a5bcd4bf9b..8c204d9a51 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -118,6 +118,16 @@ class TestResolveDeliveryTarget: "thread_id": None, } + def test_bare_platform_delivery_preserves_home_thread_id(self, monkeypatch): + monkeypatch.setenv("DISCORD_HOME_CHANNEL", "parent-42") + monkeypatch.setenv("DISCORD_HOME_CHANNEL_THREAD_ID", "topic-7") + + assert _resolve_delivery_target({"deliver": "discord"}) == { + "platform": "discord", + "chat_id": "parent-42", + "thread_id": "topic-7", + } + def test_explicit_telegram_topic_target_with_thread_id(self): """deliver: 'telegram:chat_id:thread_id' parses correctly.""" job = { diff --git a/tests/gateway/restart_test_helpers.py b/tests/gateway/restart_test_helpers.py index 6332a194fe..4c5dab9960 100644 --- a/tests/gateway/restart_test_helpers.py +++ b/tests/gateway/restart_test_helpers.py @@ -12,6 +12,7 @@ class RestartTestAdapter(BasePlatformAdapter): def __init__(self): super().__init__(PlatformConfig(enabled=True, token="***"), Platform.TELEGRAM) self.sent: list[str] = [] + self.sent_calls: list[tuple[str, str, object]] = [] async def connect(self): return True @@ -21,6 +22,7 @@ class RestartTestAdapter(BasePlatformAdapter): async def send(self, chat_id, content, reply_to=None, metadata=None): self.sent.append(content) + self.sent_calls.append((chat_id, content, metadata)) return SendResult(success=True, message_id="1") async def send_typing(self, chat_id, metadata=None): @@ -30,12 +32,17 @@ class RestartTestAdapter(BasePlatformAdapter): return {"id": chat_id} -def make_restart_source(chat_id: str = "123456", chat_type: str = "dm") -> SessionSource: +def make_restart_source( + chat_id: str = "123456", + chat_type: str = "dm", + thread_id: str | None = None, +) -> SessionSource: return SessionSource( platform=Platform.TELEGRAM, chat_id=chat_id, chat_type=chat_type, user_id="u1", + thread_id=thread_id, ) @@ -81,6 +88,15 @@ def make_restart_runner( runner._handle_restart_command = GatewayRunner._handle_restart_command.__get__( runner, GatewayRunner ) + runner._handle_set_home_command = GatewayRunner._handle_set_home_command.__get__( + runner, GatewayRunner + ) + runner._send_restart_notification = GatewayRunner._send_restart_notification.__get__( + runner, GatewayRunner + ) + runner._send_home_channel_startup_notifications = ( + GatewayRunner._send_home_channel_startup_notifications.__get__(runner, GatewayRunner) + ) runner._status_action_label = GatewayRunner._status_action_label.__get__( runner, GatewayRunner ) diff --git a/tests/gateway/test_home_target_env_var.py b/tests/gateway/test_home_target_env_var.py index 27a7e8919b..2e0dee0c20 100644 --- a/tests/gateway/test_home_target_env_var.py +++ b/tests/gateway/test_home_target_env_var.py @@ -8,7 +8,7 @@ to env vars nothing read on startup — the home channel appeared to set successfully but was lost on every new gateway session. """ -from gateway.run import _home_target_env_var +from gateway.run import _home_target_env_var, _home_thread_env_var def test_matrix_home_target_env_var_uses_home_room(): @@ -34,3 +34,9 @@ def test_unknown_platform_home_target_env_var_falls_back_to_home_channel(): def test_case_insensitive_platform_name(): assert _home_target_env_var("MATRIX") == "MATRIX_HOME_ROOM" assert _home_target_env_var("Email") == "EMAIL_HOME_ADDRESS" + + +def test_home_thread_env_var_uses_home_target_name_plus_thread_id(): + assert _home_thread_env_var("discord") == "DISCORD_HOME_CHANNEL_THREAD_ID" + assert _home_thread_env_var("matrix") == "MATRIX_HOME_ROOM_THREAD_ID" + assert _home_thread_env_var("email") == "EMAIL_HOME_ADDRESS_THREAD_ID" diff --git a/tests/gateway/test_restart_notification.py b/tests/gateway/test_restart_notification.py index 254917897f..e97216072a 100644 --- a/tests/gateway/test_restart_notification.py +++ b/tests/gateway/test_restart_notification.py @@ -8,8 +8,8 @@ from unittest.mock import AsyncMock, MagicMock import pytest import gateway.run as gateway_run -from gateway.config import Platform -from gateway.platforms.base import MessageEvent, MessageType +from gateway.config import HomeChannel, Platform +from gateway.platforms.base import MessageEvent, MessageType, SendResult from gateway.session import build_session_key from tests.gateway.restart_test_helpers import ( make_restart_runner, @@ -17,6 +17,22 @@ from tests.gateway.restart_test_helpers import ( ) +# ── restart marker helpers ─────────────────────────────────────────────── + + +def test_restart_notification_pending_false_without_marker(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + assert gateway_run._restart_notification_pending() is False + + +def test_restart_notification_pending_true_with_marker(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + (tmp_path / ".restart_notify.json").write_text("{}") + + assert gateway_run._restart_notification_pending() is True + + # ── _handle_restart_command writes .restart_notify.json ────────────────── @@ -143,6 +159,184 @@ async def test_restart_command_uses_atomic_json_writes_for_marker_files(tmp_path assert calls[1][1]["platform"] == "telegram" +@pytest.mark.asyncio +async def test_sethome_updates_running_config_for_same_process_restart(tmp_path, monkeypatch): + """/sethome persists to env and updates in-memory config before restart.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + saved = {} + + def _fake_save_env_value(key, value): + saved[key] = value + + monkeypatch.setattr("hermes_cli.config.save_env_value", _fake_save_env_value) + + runner, _adapter = make_restart_runner() + source = make_restart_source(chat_id="home-42") + source.chat_name = "Ops Home" + event = MessageEvent( + text="/sethome", + message_type=MessageType.TEXT, + source=source, + message_id="m-home", + ) + + result = await runner._handle_set_home_command(event) + + home = runner.config.get_home_channel(Platform.TELEGRAM) + assert "Home channel set" in result + assert saved["TELEGRAM_HOME_CHANNEL"] == "home-42" + assert home is not None + assert home.chat_id == "home-42" + assert home.name == "Ops Home" + + +@pytest.mark.asyncio +async def test_sethome_preserves_thread_target_for_same_process_restart(tmp_path, monkeypatch): + """/sethome from a topic/thread stores the thread-aware home target.""" + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + saved = {} + + def _fake_save_env_value(key, value): + saved[key] = value + + monkeypatch.setattr("hermes_cli.config.save_env_value", _fake_save_env_value) + + runner, _adapter = make_restart_runner() + source = make_restart_source(chat_id="parent-42", thread_id="topic-7") + source.chat_name = "Ops Topic" + event = MessageEvent( + text="/sethome", + message_type=MessageType.TEXT, + source=source, + message_id="m-home-thread", + ) + + result = await runner._handle_set_home_command(event) + + home = runner.config.get_home_channel(Platform.TELEGRAM) + assert "Home channel set" in result + assert saved["TELEGRAM_HOME_CHANNEL"] == "parent-42" + assert saved["TELEGRAM_HOME_CHANNEL_THREAD_ID"] == "topic-7" + assert home is not None + assert home.chat_id == "parent-42" + assert home.thread_id == "topic-7" + + +# ── home-channel startup notifications ───────────────────────────────────── + + +@pytest.mark.asyncio +async def test_send_home_channel_startup_notification_to_configured_home(tmp_path, monkeypatch): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + adapter.send = AsyncMock() + + delivered = await runner._send_home_channel_startup_notifications() + + assert delivered == {("telegram", "home-42", None)} + adapter.send.assert_called_once_with( + "home-42", + "♻️ Gateway online — Hermes is back and ready.", + ) + + +@pytest.mark.asyncio +async def test_send_home_channel_startup_notification_preserves_thread_metadata( + tmp_path, monkeypatch +): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="parent-42", + name="Ops Topic", + thread_id="topic-7", + ) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="home")) + + delivered = await runner._send_home_channel_startup_notifications() + + assert delivered == {("telegram", "parent-42", "topic-7")} + adapter.send.assert_called_once_with( + "parent-42", + "♻️ Gateway online — Hermes is back and ready.", + metadata={"thread_id": "topic-7"}, + ) + + +@pytest.mark.asyncio +async def test_send_home_channel_startup_notification_skips_restart_target( + tmp_path, monkeypatch +): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="42", + name="Ops Home", + ) + adapter.send = AsyncMock() + + delivered = await runner._send_home_channel_startup_notifications( + skip_targets={("telegram", "42", None)} + ) + + assert delivered == set() + adapter.send.assert_not_called() + + +@pytest.mark.asyncio +async def test_send_home_channel_startup_notification_does_not_skip_different_thread( + tmp_path, monkeypatch +): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="42", + name="Ops Home", + ) + adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="home")) + + delivered = await runner._send_home_channel_startup_notifications( + skip_targets={("telegram", "42", "topic-7")} + ) + + assert delivered == {("telegram", "42", None)} + adapter.send.assert_called_once() + + +@pytest.mark.asyncio +async def test_send_home_channel_startup_notification_ignores_false_send_result( + tmp_path, monkeypatch +): + monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path) + + runner, adapter = make_restart_runner() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + adapter.send = AsyncMock(return_value=SendResult(success=False, error="network down")) + + delivered = await runner._send_home_channel_startup_notifications() + + assert delivered == set() + adapter.send.assert_called_once() + + # ── _send_restart_notification ─────────────────────────────────────────── @@ -160,8 +354,9 @@ async def test_send_restart_notification_delivers_and_cleans_up(tmp_path, monkey runner, adapter = make_restart_runner() adapter.send = AsyncMock() - await runner._send_restart_notification() + delivered_target = await runner._send_restart_notification() + assert delivered_target == ("telegram", "42", None) adapter.send.assert_called_once() call_args = adapter.send.call_args assert call_args[0][0] == "42" # chat_id @@ -185,8 +380,9 @@ async def test_send_restart_notification_with_thread(tmp_path, monkeypatch): runner, adapter = make_restart_runner() adapter.send = AsyncMock() - await runner._send_restart_notification() + delivered_target = await runner._send_restart_notification() + assert delivered_target == ("telegram", "99", "topic_7") call_args = adapter.send.call_args assert call_args[1]["metadata"] == {"thread_id": "topic_7"} assert not notify_path.exists() @@ -240,9 +436,10 @@ async def test_send_restart_notification_cleans_up_on_send_failure( runner, adapter = make_restart_runner() adapter.send = AsyncMock(side_effect=RuntimeError("network down")) - await runner._send_restart_notification() + delivered_target = await runner._send_restart_notification() # File cleaned up even though send raised. + assert delivered_target is None assert not notify_path.exists() @@ -274,7 +471,7 @@ async def test_send_restart_notification_logs_warning_on_sendresult_failure( ) with caplog.at_level("DEBUG", logger="gateway.run"): - await runner._send_restart_notification() + delivered_target = await runner._send_restart_notification() success_lines = [ r for r in caplog.records @@ -286,6 +483,7 @@ async def test_send_restart_notification_logs_warning_on_sendresult_failure( and "was not delivered" in r.getMessage() and "Chat not found" in r.getMessage() ] + assert delivered_target is None assert not success_lines, ( "Expected no INFO 'Sent restart notification' line when send failed, " f"got: {[r.getMessage() for r in success_lines]}" @@ -317,12 +515,13 @@ async def test_send_restart_notification_logs_info_on_sendresult_success( adapter.send = AsyncMock(return_value=SendResult(success=True, message_id="m-1")) with caplog.at_level("DEBUG", logger="gateway.run"): - await runner._send_restart_notification() + delivered_target = await runner._send_restart_notification() success_lines = [ r for r in caplog.records if r.levelname == "INFO" and "Sent restart notification" in r.getMessage() ] + assert delivered_target == ("telegram", "42", None) assert success_lines, ( "Expected INFO 'Sent restart notification' when send succeeded; " f"got records: {[(r.levelname, r.getMessage()) for r in caplog.records]}" diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index bda6c7a412..0b9e7c894d 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -32,7 +32,8 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.config import GatewayConfig, HomeChannel, Platform, PlatformConfig +from gateway.platforms.base import SendResult from gateway.run import ( _auto_continue_freshness_window, _coerce_gateway_timestamp, @@ -931,6 +932,84 @@ async def test_restart_banner_uses_try_to_resume_wording(): assert "try to resume" in msg +@pytest.mark.asyncio +async def test_restart_notifies_home_channel_even_without_active_sessions(): + runner, adapter = make_restart_runner() + runner._restart_requested = True + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + + await runner._notify_active_sessions_of_shutdown() + + assert adapter.sent == [ + "⚠️ Gateway restarting — Your current task will be interrupted. " + "Send any message after restart and I'll try to resume where you left off." + ] + + +@pytest.mark.asyncio +async def test_restart_home_channel_notification_dedupes_active_chat(): + runner, adapter = make_restart_runner() + runner._restart_requested = True + runner._running_agents["agent:main:telegram:dm:999"] = MagicMock() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="999", + name="Ops Home", + ) + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 1 + + +@pytest.mark.asyncio +async def test_restart_home_channel_notification_not_deduped_across_threads(): + runner, adapter = make_restart_runner() + runner._restart_requested = True + session_key = "agent:main:telegram:group:999" + runner.session_store._entries[session_key] = MagicMock( + origin=SessionSource( + platform=Platform.TELEGRAM, + chat_id="999", + chat_type="group", + user_id="u1", + thread_id="topic-7", + ) + ) + runner._running_agents[session_key] = MagicMock() + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="999", + name="Ops Home", + ) + + await runner._notify_active_sessions_of_shutdown() + + assert len(adapter.sent) == 2 + assert adapter.sent_calls[0][2] == {"thread_id": "topic-7"} + assert adapter.sent_calls[1][2] is None + + +@pytest.mark.asyncio +async def test_restart_home_channel_notification_ignores_false_send_result(): + runner, adapter = make_restart_runner() + runner._restart_requested = True + runner.config.platforms[Platform.TELEGRAM].home_channel = HomeChannel( + platform=Platform.TELEGRAM, + chat_id="home-42", + name="Ops Home", + ) + adapter.send = AsyncMock(return_value=SendResult(success=False, error="network down")) + + await runner._notify_active_sessions_of_shutdown() + + adapter.send.assert_called_once() + + # --------------------------------------------------------------------------- # Stuck-loop escalation integration # ---------------------------------------------------------------------------