diff --git a/gateway/run.py b/gateway/run.py index b4b6c6ef05..b6949fc8df 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -363,6 +363,10 @@ class GatewayRunner: # Key: session_key, Value: {"command": str, "pattern_key": str, ...} self._pending_approvals: Dict[str, Dict[str, Any]] = {} + # Track platforms that failed to connect for background reconnection. + # Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float} + self._failed_platforms: Dict[Platform, Dict[str, Any]] = {} + # Persistent Honcho managers keyed by gateway session key. # This preserves write_frequency="session" semantics across short-lived # per-message AIAgent instances. @@ -639,7 +643,11 @@ class GatewayRunner: return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary) async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None: - """React to a non-retryable adapter failure after startup.""" + """React to an adapter failure after startup. + + If the error is retryable (e.g. network blip, DNS failure), queue the + platform for background reconnection instead of giving up permanently. + """ logger.error( "Fatal %s adapter error (%s): %s", adapter.platform.value, @@ -655,7 +663,21 @@ class GatewayRunner: self.adapters.pop(adapter.platform, None) self.delivery_router.adapters = self.adapters - if not self.adapters: + # Queue retryable failures for background reconnection + if adapter.fatal_error_retryable: + platform_config = self.config.platforms.get(adapter.platform) + if platform_config and adapter.platform not in self._failed_platforms: + self._failed_platforms[adapter.platform] = { + "config": platform_config, + "attempts": 0, + "next_retry": time.monotonic() + 30, + } + logger.info( + "%s queued for background reconnection", + adapter.platform.value, + ) + + if not self.adapters and not self._failed_platforms: self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected" if adapter.fatal_error_retryable: self._exit_with_failure = True @@ -663,6 +685,11 @@ class GatewayRunner: else: logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.") await self.stop() + elif not self.adapters and self._failed_platforms: + logger.warning( + "No connected messaging platforms remain, but %d platform(s) queued for reconnection", + len(self._failed_platforms), + ) def _request_clean_exit(self, reason: str) -> None: self._exit_cleanly = True @@ -940,13 +967,32 @@ class GatewayRunner: target.append( f"{platform.value}: {adapter.fatal_error_message}" ) + # Queue for reconnection if the error is retryable + if adapter.fatal_error_retryable: + self._failed_platforms[platform] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() + 30, + } else: startup_retryable_errors.append( f"{platform.value}: failed to connect" ) + # No fatal error info means likely a transient issue — queue for retry + self._failed_platforms[platform] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() + 30, + } except Exception as e: logger.error("✗ %s error: %s", platform.value, e) startup_retryable_errors.append(f"{platform.value}: {e}") + # Unexpected exceptions are typically transient — queue for retry + self._failed_platforms[platform] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() + 30, + } if connected_count == 0: if startup_nonretryable_errors: @@ -1026,6 +1072,15 @@ class GatewayRunner: # Start background session expiry watcher for proactive memory flushing asyncio.create_task(self._session_expiry_watcher()) + # Start background reconnection watcher for platforms that failed at startup + if self._failed_platforms: + logger.info( + "Starting reconnection watcher for %d failed platform(s): %s", + len(self._failed_platforms), + ", ".join(p.value for p in self._failed_platforms), + ) + asyncio.create_task(self._platform_reconnect_watcher()) + logger.info("Press Ctrl+C to stop") return True @@ -1068,6 +1123,107 @@ class GatewayRunner: break await asyncio.sleep(1) + async def _platform_reconnect_watcher(self) -> None: + """Background task that periodically retries connecting failed platforms. + + Uses exponential backoff: 30s → 60s → 120s → 240s → 300s (cap). + Stops retrying a platform after 20 failed attempts or if the error + is non-retryable (e.g. bad auth token). + """ + _MAX_ATTEMPTS = 20 + _BACKOFF_CAP = 300 # 5 minutes max between retries + + await asyncio.sleep(10) # initial delay — let startup finish + while self._running: + if not self._failed_platforms: + # Nothing to reconnect — sleep and check again + for _ in range(30): + if not self._running: + return + await asyncio.sleep(1) + continue + + now = time.monotonic() + for platform in list(self._failed_platforms.keys()): + if not self._running: + return + info = self._failed_platforms[platform] + if now < info["next_retry"]: + continue # not time yet + + if info["attempts"] >= _MAX_ATTEMPTS: + logger.warning( + "Giving up reconnecting %s after %d attempts", + platform.value, info["attempts"], + ) + del self._failed_platforms[platform] + continue + + platform_config = info["config"] + attempt = info["attempts"] + 1 + logger.info( + "Reconnecting %s (attempt %d/%d)...", + platform.value, attempt, _MAX_ATTEMPTS, + ) + + try: + adapter = self._create_adapter(platform, platform_config) + if not adapter: + logger.warning( + "Reconnect %s: adapter creation returned None, removing from retry queue", + platform.value, + ) + del self._failed_platforms[platform] + continue + + adapter.set_message_handler(self._handle_message) + adapter.set_fatal_error_handler(self._handle_adapter_fatal_error) + + success = await adapter.connect() + if success: + self.adapters[platform] = adapter + self._sync_voice_mode_state_to_adapter(adapter) + self.delivery_router.adapters = self.adapters + del self._failed_platforms[platform] + logger.info("✓ %s reconnected successfully", platform.value) + + # Rebuild channel directory with the new adapter + try: + from gateway.channel_directory import build_channel_directory + build_channel_directory(self.adapters) + except Exception: + pass + else: + # Check if the failure is non-retryable + if adapter.has_fatal_error and not adapter.fatal_error_retryable: + logger.warning( + "Reconnect %s: non-retryable error (%s), removing from retry queue", + platform.value, adapter.fatal_error_message, + ) + del self._failed_platforms[platform] + else: + backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP) + info["attempts"] = attempt + info["next_retry"] = time.monotonic() + backoff + logger.info( + "Reconnect %s failed, next retry in %ds", + platform.value, backoff, + ) + except Exception as e: + backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP) + info["attempts"] = attempt + info["next_retry"] = time.monotonic() + backoff + logger.warning( + "Reconnect %s error: %s, next retry in %ds", + platform.value, e, backoff, + ) + + # Check every 10 seconds for platforms that need reconnection + for _ in range(10): + if not self._running: + return + await asyncio.sleep(1) + async def stop(self) -> None: """Stop the gateway and disconnect all adapters.""" logger.info("Stopping gateway...") diff --git a/tests/gateway/test_platform_reconnect.py b/tests/gateway/test_platform_reconnect.py new file mode 100644 index 0000000000..3073f2f5da --- /dev/null +++ b/tests/gateway/test_platform_reconnect.py @@ -0,0 +1,401 @@ +"""Tests for the gateway platform reconnection watcher.""" + +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult +from gateway.run import GatewayRunner + + +class StubAdapter(BasePlatformAdapter): + """Adapter whose connect() result can be controlled.""" + + def __init__(self, *, succeed=True, fatal_error=None, fatal_retryable=True): + super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) + self._succeed = succeed + self._fatal_error = fatal_error + self._fatal_retryable = fatal_retryable + + async def connect(self): + if self._fatal_error: + self._set_fatal_error("test_error", self._fatal_error, retryable=self._fatal_retryable) + return False + return self._succeed + + async def disconnect(self): + return None + + async def send(self, chat_id, content, reply_to=None, metadata=None): + return SendResult(success=True, message_id="1") + + async def send_typing(self, chat_id, metadata=None): + return None + + async def get_chat_info(self, chat_id): + return {"id": chat_id} + + +def _make_runner(): + """Create a minimal GatewayRunner via object.__new__ to skip __init__.""" + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="test")} + ) + runner._running = True + runner._shutdown_event = asyncio.Event() + runner._exit_reason = None + runner._exit_with_failure = False + runner._exit_cleanly = False + runner._failed_platforms = {} + runner.adapters = {} + runner.delivery_router = MagicMock() + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._honcho_managers = {} + runner._honcho_configs = {} + runner._shutdown_all_gateway_honcho = lambda: None + return runner + + +# --- Startup queueing --- + +class TestStartupFailureQueuing: + """Verify that failed platforms are queued during startup.""" + + def test_failed_platform_queued_on_connect_failure(self): + """When adapter.connect() returns False without fatal error, queue for retry.""" + runner = _make_runner() + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() + 30, + } + assert Platform.TELEGRAM in runner._failed_platforms + assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 1 + + def test_failed_platform_not_queued_for_nonretryable(self): + """Non-retryable errors should not be in the retry queue.""" + runner = _make_runner() + # Simulate: adapter had a non-retryable error, wasn't queued + assert Platform.TELEGRAM not in runner._failed_platforms + + +# --- Reconnect watcher --- + +class TestPlatformReconnectWatcher: + """Test the _platform_reconnect_watcher background task.""" + + @pytest.mark.asyncio + async def test_reconnect_succeeds_on_retry(self): + """Watcher should reconnect a failed platform when connect() succeeds.""" + runner = _make_runner() + runner._sync_voice_mode_state_to_adapter = MagicMock() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() - 1, # Already past retry time + } + + succeed_adapter = StubAdapter(succeed=True) + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter", return_value=succeed_adapter): + with patch("gateway.run.build_channel_directory", create=True): + # Run one iteration of the watcher then stop + async def run_one_iteration(): + runner._running = True + # Patch the sleep to exit after first check + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM not in runner._failed_platforms + assert Platform.TELEGRAM in runner.adapters + + @pytest.mark.asyncio + async def test_reconnect_nonretryable_removed_from_queue(self): + """Non-retryable errors should remove the platform from the retry queue.""" + runner = _make_runner() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() - 1, + } + + fail_adapter = StubAdapter( + succeed=False, fatal_error="bad token", fatal_retryable=False + ) + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter", return_value=fail_adapter): + async def run_one_iteration(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM not in runner._failed_platforms + assert Platform.TELEGRAM not in runner.adapters + + @pytest.mark.asyncio + async def test_reconnect_retryable_stays_in_queue(self): + """Retryable failures should remain in the queue with incremented attempts.""" + runner = _make_runner() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() - 1, + } + + fail_adapter = StubAdapter( + succeed=False, fatal_error="DNS failure", fatal_retryable=True + ) + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter", return_value=fail_adapter): + async def run_one_iteration(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM in runner._failed_platforms + assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 2 + + @pytest.mark.asyncio + async def test_reconnect_gives_up_after_max_attempts(self): + """After max attempts, platform should be removed from retry queue.""" + runner = _make_runner() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 20, # At max + "next_retry": time.monotonic() - 1, + } + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter") as mock_create: + async def run_one_iteration(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM not in runner._failed_platforms + mock_create.assert_not_called() # Should give up without trying + + @pytest.mark.asyncio + async def test_reconnect_skips_when_not_time_yet(self): + """Watcher should skip platforms whose next_retry is in the future.""" + runner = _make_runner() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() + 9999, # Far in the future + } + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter") as mock_create: + async def run_one_iteration(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM in runner._failed_platforms + mock_create.assert_not_called() + + @pytest.mark.asyncio + async def test_no_failed_platforms_watcher_idles(self): + """When no platforms are failed, watcher should just idle.""" + runner = _make_runner() + # No failed platforms + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter") as mock_create: + async def run_briefly(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 2: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_briefly() + + mock_create.assert_not_called() + + @pytest.mark.asyncio + async def test_adapter_create_returns_none(self): + """If _create_adapter returns None, remove from queue (missing deps).""" + runner = _make_runner() + + platform_config = PlatformConfig(enabled=True, token="test") + runner._failed_platforms[Platform.TELEGRAM] = { + "config": platform_config, + "attempts": 1, + "next_retry": time.monotonic() - 1, + } + + real_sleep = asyncio.sleep + + with patch.object(runner, "_create_adapter", return_value=None): + async def run_one_iteration(): + runner._running = True + call_count = 0 + + async def fake_sleep(n): + nonlocal call_count + call_count += 1 + if call_count > 1: + runner._running = False + await real_sleep(0) + + with patch("asyncio.sleep", side_effect=fake_sleep): + await runner._platform_reconnect_watcher() + + await run_one_iteration() + + assert Platform.TELEGRAM not in runner._failed_platforms + + +# --- Runtime disconnection queueing --- + +class TestRuntimeDisconnectQueuing: + """Test that _handle_adapter_fatal_error queues retryable disconnections.""" + + @pytest.mark.asyncio + async def test_retryable_runtime_error_queued_for_reconnect(self): + """Retryable runtime errors should add the platform to _failed_platforms.""" + runner = _make_runner() + + adapter = StubAdapter(succeed=True) + adapter._set_fatal_error("network_error", "DNS failure", retryable=True) + runner.adapters[Platform.TELEGRAM] = adapter + + await runner._handle_adapter_fatal_error(adapter) + + assert Platform.TELEGRAM in runner._failed_platforms + assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 0 + + @pytest.mark.asyncio + async def test_nonretryable_runtime_error_not_queued(self): + """Non-retryable runtime errors should not be queued for reconnection.""" + runner = _make_runner() + + adapter = StubAdapter(succeed=True) + adapter._set_fatal_error("auth_error", "bad token", retryable=False) + runner.adapters[Platform.TELEGRAM] = adapter + + # Need to prevent stop() from running fully + runner.stop = AsyncMock() + + await runner._handle_adapter_fatal_error(adapter) + + assert Platform.TELEGRAM not in runner._failed_platforms + + @pytest.mark.asyncio + async def test_retryable_error_prevents_shutdown_when_queued(self): + """Gateway should not shut down if failed platforms are queued for reconnection.""" + runner = _make_runner() + runner.stop = AsyncMock() + + adapter = StubAdapter(succeed=True) + adapter._set_fatal_error("network_error", "DNS failure", retryable=True) + runner.adapters[Platform.TELEGRAM] = adapter + + await runner._handle_adapter_fatal_error(adapter) + + # stop() should NOT have been called since we have platforms queued + runner.stop.assert_not_called() + assert Platform.TELEGRAM in runner._failed_platforms + + @pytest.mark.asyncio + async def test_nonretryable_error_triggers_shutdown(self): + """Gateway should shut down when no adapters remain and nothing is queued.""" + runner = _make_runner() + runner.stop = AsyncMock() + + adapter = StubAdapter(succeed=True) + adapter._set_fatal_error("auth_error", "bad token", retryable=False) + runner.adapters[Platform.TELEGRAM] = adapter + + await runner._handle_adapter_fatal_error(adapter) + + runner.stop.assert_called_once() diff --git a/tests/gateway/test_runner_fatal_adapter.py b/tests/gateway/test_runner_fatal_adapter.py index 2badb87c4d..6eb2850598 100644 --- a/tests/gateway/test_runner_fatal_adapter.py +++ b/tests/gateway/test_runner_fatal_adapter.py @@ -66,7 +66,9 @@ async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monk @pytest.mark.asyncio -async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypatch, tmp_path): +async def test_runner_queues_retryable_runtime_fatal_for_reconnection(monkeypatch, tmp_path): + """Retryable runtime fatal errors queue the platform for reconnection + instead of shutting down the gateway.""" config = GatewayConfig( platforms={ Platform.WHATSAPP: PlatformConfig(enabled=True, token="token") @@ -87,7 +89,7 @@ async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypa await runner._handle_adapter_fatal_error(adapter) - assert runner.should_exit_cleanly is False - assert runner.should_exit_with_failure is True - assert "exited unexpectedly" in runner.exit_reason - runner.stop.assert_awaited_once() + # Should NOT shut down — platform is queued for reconnection + runner.stop.assert_not_awaited() + assert Platform.WHATSAPP in runner._failed_platforms + assert runner._failed_platforms[Platform.WHATSAPP]["attempts"] == 0