mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-02 08:47:26 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4a2765153 |
160
gateway/run.py
160
gateway/run.py
@@ -363,6 +363,10 @@ class GatewayRunner:
|
|||||||
# Key: session_key, Value: {"command": str, "pattern_key": str, ...}
|
# Key: session_key, Value: {"command": str, "pattern_key": str, ...}
|
||||||
self._pending_approvals: Dict[str, Dict[str, Any]] = {}
|
self._pending_approvals: Dict[str, Dict[str, Any]] = {}
|
||||||
|
|
||||||
|
# Track platforms that failed to connect for background reconnection.
|
||||||
|
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
|
||||||
|
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
|
||||||
|
|
||||||
# Persistent Honcho managers keyed by gateway session key.
|
# Persistent Honcho managers keyed by gateway session key.
|
||||||
# This preserves write_frequency="session" semantics across short-lived
|
# This preserves write_frequency="session" semantics across short-lived
|
||||||
# per-message AIAgent instances.
|
# per-message AIAgent instances.
|
||||||
@@ -639,7 +643,11 @@ class GatewayRunner:
|
|||||||
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
|
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
|
||||||
|
|
||||||
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
|
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
|
||||||
"""React to a non-retryable adapter failure after startup."""
|
"""React to an adapter failure after startup.
|
||||||
|
|
||||||
|
If the error is retryable (e.g. network blip, DNS failure), queue the
|
||||||
|
platform for background reconnection instead of giving up permanently.
|
||||||
|
"""
|
||||||
logger.error(
|
logger.error(
|
||||||
"Fatal %s adapter error (%s): %s",
|
"Fatal %s adapter error (%s): %s",
|
||||||
adapter.platform.value,
|
adapter.platform.value,
|
||||||
@@ -655,7 +663,21 @@ class GatewayRunner:
|
|||||||
self.adapters.pop(adapter.platform, None)
|
self.adapters.pop(adapter.platform, None)
|
||||||
self.delivery_router.adapters = self.adapters
|
self.delivery_router.adapters = self.adapters
|
||||||
|
|
||||||
if not self.adapters:
|
# Queue retryable failures for background reconnection
|
||||||
|
if adapter.fatal_error_retryable:
|
||||||
|
platform_config = self.config.platforms.get(adapter.platform)
|
||||||
|
if platform_config and adapter.platform not in self._failed_platforms:
|
||||||
|
self._failed_platforms[adapter.platform] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 0,
|
||||||
|
"next_retry": time.monotonic() + 30,
|
||||||
|
}
|
||||||
|
logger.info(
|
||||||
|
"%s queued for background reconnection",
|
||||||
|
adapter.platform.value,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not self.adapters and not self._failed_platforms:
|
||||||
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
self._exit_reason = adapter.fatal_error_message or "All messaging adapters disconnected"
|
||||||
if adapter.fatal_error_retryable:
|
if adapter.fatal_error_retryable:
|
||||||
self._exit_with_failure = True
|
self._exit_with_failure = True
|
||||||
@@ -663,6 +685,11 @@ class GatewayRunner:
|
|||||||
else:
|
else:
|
||||||
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
logger.error("No connected messaging platforms remain. Shutting down gateway cleanly.")
|
||||||
await self.stop()
|
await self.stop()
|
||||||
|
elif not self.adapters and self._failed_platforms:
|
||||||
|
logger.warning(
|
||||||
|
"No connected messaging platforms remain, but %d platform(s) queued for reconnection",
|
||||||
|
len(self._failed_platforms),
|
||||||
|
)
|
||||||
|
|
||||||
def _request_clean_exit(self, reason: str) -> None:
|
def _request_clean_exit(self, reason: str) -> None:
|
||||||
self._exit_cleanly = True
|
self._exit_cleanly = True
|
||||||
@@ -940,13 +967,32 @@ class GatewayRunner:
|
|||||||
target.append(
|
target.append(
|
||||||
f"{platform.value}: {adapter.fatal_error_message}"
|
f"{platform.value}: {adapter.fatal_error_message}"
|
||||||
)
|
)
|
||||||
|
# Queue for reconnection if the error is retryable
|
||||||
|
if adapter.fatal_error_retryable:
|
||||||
|
self._failed_platforms[platform] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() + 30,
|
||||||
|
}
|
||||||
else:
|
else:
|
||||||
startup_retryable_errors.append(
|
startup_retryable_errors.append(
|
||||||
f"{platform.value}: failed to connect"
|
f"{platform.value}: failed to connect"
|
||||||
)
|
)
|
||||||
|
# No fatal error info means likely a transient issue — queue for retry
|
||||||
|
self._failed_platforms[platform] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() + 30,
|
||||||
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("✗ %s error: %s", platform.value, e)
|
logger.error("✗ %s error: %s", platform.value, e)
|
||||||
startup_retryable_errors.append(f"{platform.value}: {e}")
|
startup_retryable_errors.append(f"{platform.value}: {e}")
|
||||||
|
# Unexpected exceptions are typically transient — queue for retry
|
||||||
|
self._failed_platforms[platform] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() + 30,
|
||||||
|
}
|
||||||
|
|
||||||
if connected_count == 0:
|
if connected_count == 0:
|
||||||
if startup_nonretryable_errors:
|
if startup_nonretryable_errors:
|
||||||
@@ -1026,6 +1072,15 @@ class GatewayRunner:
|
|||||||
# Start background session expiry watcher for proactive memory flushing
|
# Start background session expiry watcher for proactive memory flushing
|
||||||
asyncio.create_task(self._session_expiry_watcher())
|
asyncio.create_task(self._session_expiry_watcher())
|
||||||
|
|
||||||
|
# Start background reconnection watcher for platforms that failed at startup
|
||||||
|
if self._failed_platforms:
|
||||||
|
logger.info(
|
||||||
|
"Starting reconnection watcher for %d failed platform(s): %s",
|
||||||
|
len(self._failed_platforms),
|
||||||
|
", ".join(p.value for p in self._failed_platforms),
|
||||||
|
)
|
||||||
|
asyncio.create_task(self._platform_reconnect_watcher())
|
||||||
|
|
||||||
logger.info("Press Ctrl+C to stop")
|
logger.info("Press Ctrl+C to stop")
|
||||||
|
|
||||||
return True
|
return True
|
||||||
@@ -1068,6 +1123,107 @@ class GatewayRunner:
|
|||||||
break
|
break
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
async def _platform_reconnect_watcher(self) -> None:
|
||||||
|
"""Background task that periodically retries connecting failed platforms.
|
||||||
|
|
||||||
|
Uses exponential backoff: 30s → 60s → 120s → 240s → 300s (cap).
|
||||||
|
Stops retrying a platform after 20 failed attempts or if the error
|
||||||
|
is non-retryable (e.g. bad auth token).
|
||||||
|
"""
|
||||||
|
_MAX_ATTEMPTS = 20
|
||||||
|
_BACKOFF_CAP = 300 # 5 minutes max between retries
|
||||||
|
|
||||||
|
await asyncio.sleep(10) # initial delay — let startup finish
|
||||||
|
while self._running:
|
||||||
|
if not self._failed_platforms:
|
||||||
|
# Nothing to reconnect — sleep and check again
|
||||||
|
for _ in range(30):
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
continue
|
||||||
|
|
||||||
|
now = time.monotonic()
|
||||||
|
for platform in list(self._failed_platforms.keys()):
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
info = self._failed_platforms[platform]
|
||||||
|
if now < info["next_retry"]:
|
||||||
|
continue # not time yet
|
||||||
|
|
||||||
|
if info["attempts"] >= _MAX_ATTEMPTS:
|
||||||
|
logger.warning(
|
||||||
|
"Giving up reconnecting %s after %d attempts",
|
||||||
|
platform.value, info["attempts"],
|
||||||
|
)
|
||||||
|
del self._failed_platforms[platform]
|
||||||
|
continue
|
||||||
|
|
||||||
|
platform_config = info["config"]
|
||||||
|
attempt = info["attempts"] + 1
|
||||||
|
logger.info(
|
||||||
|
"Reconnecting %s (attempt %d/%d)...",
|
||||||
|
platform.value, attempt, _MAX_ATTEMPTS,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
adapter = self._create_adapter(platform, platform_config)
|
||||||
|
if not adapter:
|
||||||
|
logger.warning(
|
||||||
|
"Reconnect %s: adapter creation returned None, removing from retry queue",
|
||||||
|
platform.value,
|
||||||
|
)
|
||||||
|
del self._failed_platforms[platform]
|
||||||
|
continue
|
||||||
|
|
||||||
|
adapter.set_message_handler(self._handle_message)
|
||||||
|
adapter.set_fatal_error_handler(self._handle_adapter_fatal_error)
|
||||||
|
|
||||||
|
success = await adapter.connect()
|
||||||
|
if success:
|
||||||
|
self.adapters[platform] = adapter
|
||||||
|
self._sync_voice_mode_state_to_adapter(adapter)
|
||||||
|
self.delivery_router.adapters = self.adapters
|
||||||
|
del self._failed_platforms[platform]
|
||||||
|
logger.info("✓ %s reconnected successfully", platform.value)
|
||||||
|
|
||||||
|
# Rebuild channel directory with the new adapter
|
||||||
|
try:
|
||||||
|
from gateway.channel_directory import build_channel_directory
|
||||||
|
build_channel_directory(self.adapters)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# Check if the failure is non-retryable
|
||||||
|
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
|
||||||
|
logger.warning(
|
||||||
|
"Reconnect %s: non-retryable error (%s), removing from retry queue",
|
||||||
|
platform.value, adapter.fatal_error_message,
|
||||||
|
)
|
||||||
|
del self._failed_platforms[platform]
|
||||||
|
else:
|
||||||
|
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
|
||||||
|
info["attempts"] = attempt
|
||||||
|
info["next_retry"] = time.monotonic() + backoff
|
||||||
|
logger.info(
|
||||||
|
"Reconnect %s failed, next retry in %ds",
|
||||||
|
platform.value, backoff,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
|
||||||
|
info["attempts"] = attempt
|
||||||
|
info["next_retry"] = time.monotonic() + backoff
|
||||||
|
logger.warning(
|
||||||
|
"Reconnect %s error: %s, next retry in %ds",
|
||||||
|
platform.value, e, backoff,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check every 10 seconds for platforms that need reconnection
|
||||||
|
for _ in range(10):
|
||||||
|
if not self._running:
|
||||||
|
return
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
async def stop(self) -> None:
|
async def stop(self) -> None:
|
||||||
"""Stop the gateway and disconnect all adapters."""
|
"""Stop the gateway and disconnect all adapters."""
|
||||||
logger.info("Stopping gateway...")
|
logger.info("Stopping gateway...")
|
||||||
|
|||||||
401
tests/gateway/test_platform_reconnect.py
Normal file
401
tests/gateway/test_platform_reconnect.py
Normal file
@@ -0,0 +1,401 @@
|
|||||||
|
"""Tests for the gateway platform reconnection watcher."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import time
|
||||||
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
||||||
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
|
||||||
|
class StubAdapter(BasePlatformAdapter):
|
||||||
|
"""Adapter whose connect() result can be controlled."""
|
||||||
|
|
||||||
|
def __init__(self, *, succeed=True, fatal_error=None, fatal_retryable=True):
|
||||||
|
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
|
||||||
|
self._succeed = succeed
|
||||||
|
self._fatal_error = fatal_error
|
||||||
|
self._fatal_retryable = fatal_retryable
|
||||||
|
|
||||||
|
async def connect(self):
|
||||||
|
if self._fatal_error:
|
||||||
|
self._set_fatal_error("test_error", self._fatal_error, retryable=self._fatal_retryable)
|
||||||
|
return False
|
||||||
|
return self._succeed
|
||||||
|
|
||||||
|
async def disconnect(self):
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def send(self, chat_id, content, reply_to=None, metadata=None):
|
||||||
|
return SendResult(success=True, message_id="1")
|
||||||
|
|
||||||
|
async def send_typing(self, chat_id, metadata=None):
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def get_chat_info(self, chat_id):
|
||||||
|
return {"id": chat_id}
|
||||||
|
|
||||||
|
|
||||||
|
def _make_runner():
|
||||||
|
"""Create a minimal GatewayRunner via object.__new__ to skip __init__."""
|
||||||
|
runner = object.__new__(GatewayRunner)
|
||||||
|
runner.config = GatewayConfig(
|
||||||
|
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="test")}
|
||||||
|
)
|
||||||
|
runner._running = True
|
||||||
|
runner._shutdown_event = asyncio.Event()
|
||||||
|
runner._exit_reason = None
|
||||||
|
runner._exit_with_failure = False
|
||||||
|
runner._exit_cleanly = False
|
||||||
|
runner._failed_platforms = {}
|
||||||
|
runner.adapters = {}
|
||||||
|
runner.delivery_router = MagicMock()
|
||||||
|
runner._running_agents = {}
|
||||||
|
runner._pending_messages = {}
|
||||||
|
runner._pending_approvals = {}
|
||||||
|
runner._honcho_managers = {}
|
||||||
|
runner._honcho_configs = {}
|
||||||
|
runner._shutdown_all_gateway_honcho = lambda: None
|
||||||
|
return runner
|
||||||
|
|
||||||
|
|
||||||
|
# --- Startup queueing ---
|
||||||
|
|
||||||
|
class TestStartupFailureQueuing:
|
||||||
|
"""Verify that failed platforms are queued during startup."""
|
||||||
|
|
||||||
|
def test_failed_platform_queued_on_connect_failure(self):
|
||||||
|
"""When adapter.connect() returns False without fatal error, queue for retry."""
|
||||||
|
runner = _make_runner()
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() + 30,
|
||||||
|
}
|
||||||
|
assert Platform.TELEGRAM in runner._failed_platforms
|
||||||
|
assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 1
|
||||||
|
|
||||||
|
def test_failed_platform_not_queued_for_nonretryable(self):
|
||||||
|
"""Non-retryable errors should not be in the retry queue."""
|
||||||
|
runner = _make_runner()
|
||||||
|
# Simulate: adapter had a non-retryable error, wasn't queued
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
|
||||||
|
|
||||||
|
# --- Reconnect watcher ---
|
||||||
|
|
||||||
|
class TestPlatformReconnectWatcher:
|
||||||
|
"""Test the _platform_reconnect_watcher background task."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reconnect_succeeds_on_retry(self):
|
||||||
|
"""Watcher should reconnect a failed platform when connect() succeeds."""
|
||||||
|
runner = _make_runner()
|
||||||
|
runner._sync_voice_mode_state_to_adapter = MagicMock()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() - 1, # Already past retry time
|
||||||
|
}
|
||||||
|
|
||||||
|
succeed_adapter = StubAdapter(succeed=True)
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter", return_value=succeed_adapter):
|
||||||
|
with patch("gateway.run.build_channel_directory", create=True):
|
||||||
|
# Run one iteration of the watcher then stop
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
# Patch the sleep to exit after first check
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
assert Platform.TELEGRAM in runner.adapters
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reconnect_nonretryable_removed_from_queue(self):
|
||||||
|
"""Non-retryable errors should remove the platform from the retry queue."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() - 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
fail_adapter = StubAdapter(
|
||||||
|
succeed=False, fatal_error="bad token", fatal_retryable=False
|
||||||
|
)
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter", return_value=fail_adapter):
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
assert Platform.TELEGRAM not in runner.adapters
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reconnect_retryable_stays_in_queue(self):
|
||||||
|
"""Retryable failures should remain in the queue with incremented attempts."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() - 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
fail_adapter = StubAdapter(
|
||||||
|
succeed=False, fatal_error="DNS failure", fatal_retryable=True
|
||||||
|
)
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter", return_value=fail_adapter):
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM in runner._failed_platforms
|
||||||
|
assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reconnect_gives_up_after_max_attempts(self):
|
||||||
|
"""After max attempts, platform should be removed from retry queue."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 20, # At max
|
||||||
|
"next_retry": time.monotonic() - 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter") as mock_create:
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
mock_create.assert_not_called() # Should give up without trying
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reconnect_skips_when_not_time_yet(self):
|
||||||
|
"""Watcher should skip platforms whose next_retry is in the future."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() + 9999, # Far in the future
|
||||||
|
}
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter") as mock_create:
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM in runner._failed_platforms
|
||||||
|
mock_create.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_failed_platforms_watcher_idles(self):
|
||||||
|
"""When no platforms are failed, watcher should just idle."""
|
||||||
|
runner = _make_runner()
|
||||||
|
# No failed platforms
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter") as mock_create:
|
||||||
|
async def run_briefly():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 2:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_briefly()
|
||||||
|
|
||||||
|
mock_create.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_adapter_create_returns_none(self):
|
||||||
|
"""If _create_adapter returns None, remove from queue (missing deps)."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
platform_config = PlatformConfig(enabled=True, token="test")
|
||||||
|
runner._failed_platforms[Platform.TELEGRAM] = {
|
||||||
|
"config": platform_config,
|
||||||
|
"attempts": 1,
|
||||||
|
"next_retry": time.monotonic() - 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
real_sleep = asyncio.sleep
|
||||||
|
|
||||||
|
with patch.object(runner, "_create_adapter", return_value=None):
|
||||||
|
async def run_one_iteration():
|
||||||
|
runner._running = True
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
async def fake_sleep(n):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count > 1:
|
||||||
|
runner._running = False
|
||||||
|
await real_sleep(0)
|
||||||
|
|
||||||
|
with patch("asyncio.sleep", side_effect=fake_sleep):
|
||||||
|
await runner._platform_reconnect_watcher()
|
||||||
|
|
||||||
|
await run_one_iteration()
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
|
||||||
|
|
||||||
|
# --- Runtime disconnection queueing ---
|
||||||
|
|
||||||
|
class TestRuntimeDisconnectQueuing:
|
||||||
|
"""Test that _handle_adapter_fatal_error queues retryable disconnections."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retryable_runtime_error_queued_for_reconnect(self):
|
||||||
|
"""Retryable runtime errors should add the platform to _failed_platforms."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
adapter = StubAdapter(succeed=True)
|
||||||
|
adapter._set_fatal_error("network_error", "DNS failure", retryable=True)
|
||||||
|
runner.adapters[Platform.TELEGRAM] = adapter
|
||||||
|
|
||||||
|
await runner._handle_adapter_fatal_error(adapter)
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM in runner._failed_platforms
|
||||||
|
assert runner._failed_platforms[Platform.TELEGRAM]["attempts"] == 0
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_nonretryable_runtime_error_not_queued(self):
|
||||||
|
"""Non-retryable runtime errors should not be queued for reconnection."""
|
||||||
|
runner = _make_runner()
|
||||||
|
|
||||||
|
adapter = StubAdapter(succeed=True)
|
||||||
|
adapter._set_fatal_error("auth_error", "bad token", retryable=False)
|
||||||
|
runner.adapters[Platform.TELEGRAM] = adapter
|
||||||
|
|
||||||
|
# Need to prevent stop() from running fully
|
||||||
|
runner.stop = AsyncMock()
|
||||||
|
|
||||||
|
await runner._handle_adapter_fatal_error(adapter)
|
||||||
|
|
||||||
|
assert Platform.TELEGRAM not in runner._failed_platforms
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retryable_error_prevents_shutdown_when_queued(self):
|
||||||
|
"""Gateway should not shut down if failed platforms are queued for reconnection."""
|
||||||
|
runner = _make_runner()
|
||||||
|
runner.stop = AsyncMock()
|
||||||
|
|
||||||
|
adapter = StubAdapter(succeed=True)
|
||||||
|
adapter._set_fatal_error("network_error", "DNS failure", retryable=True)
|
||||||
|
runner.adapters[Platform.TELEGRAM] = adapter
|
||||||
|
|
||||||
|
await runner._handle_adapter_fatal_error(adapter)
|
||||||
|
|
||||||
|
# stop() should NOT have been called since we have platforms queued
|
||||||
|
runner.stop.assert_not_called()
|
||||||
|
assert Platform.TELEGRAM in runner._failed_platforms
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_nonretryable_error_triggers_shutdown(self):
|
||||||
|
"""Gateway should shut down when no adapters remain and nothing is queued."""
|
||||||
|
runner = _make_runner()
|
||||||
|
runner.stop = AsyncMock()
|
||||||
|
|
||||||
|
adapter = StubAdapter(succeed=True)
|
||||||
|
adapter._set_fatal_error("auth_error", "bad token", retryable=False)
|
||||||
|
runner.adapters[Platform.TELEGRAM] = adapter
|
||||||
|
|
||||||
|
await runner._handle_adapter_fatal_error(adapter)
|
||||||
|
|
||||||
|
runner.stop.assert_called_once()
|
||||||
@@ -66,7 +66,9 @@ async def test_runner_requests_clean_exit_for_nonretryable_startup_conflict(monk
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypatch, tmp_path):
|
async def test_runner_queues_retryable_runtime_fatal_for_reconnection(monkeypatch, tmp_path):
|
||||||
|
"""Retryable runtime fatal errors queue the platform for reconnection
|
||||||
|
instead of shutting down the gateway."""
|
||||||
config = GatewayConfig(
|
config = GatewayConfig(
|
||||||
platforms={
|
platforms={
|
||||||
Platform.WHATSAPP: PlatformConfig(enabled=True, token="token")
|
Platform.WHATSAPP: PlatformConfig(enabled=True, token="token")
|
||||||
@@ -87,7 +89,7 @@ async def test_runner_requests_failure_exit_for_retryable_runtime_fatal(monkeypa
|
|||||||
|
|
||||||
await runner._handle_adapter_fatal_error(adapter)
|
await runner._handle_adapter_fatal_error(adapter)
|
||||||
|
|
||||||
assert runner.should_exit_cleanly is False
|
# Should NOT shut down — platform is queued for reconnection
|
||||||
assert runner.should_exit_with_failure is True
|
runner.stop.assert_not_awaited()
|
||||||
assert "exited unexpectedly" in runner.exit_reason
|
assert Platform.WHATSAPP in runner._failed_platforms
|
||||||
runner.stop.assert_awaited_once()
|
assert runner._failed_platforms[Platform.WHATSAPP]["attempts"] == 0
|
||||||
|
|||||||
Reference in New Issue
Block a user