From cbf66fcfcd4f44b77eab0a52189bbe528cc28daa Mon Sep 17 00:00:00 2001 From: Teknium Date: Sun, 26 Apr 2026 21:18:59 -0700 Subject: [PATCH] fix(feishu): send WebSocket CLOSE frame on disconnect (#10202) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Feishu adapter's disconnect() cancelled WSS-thread tasks but never called the lark_oapi client's _disconnect() coroutine, so no WebSocket CLOSE frame was sent. Feishu's server kept routing messages to the stale endpoint for minutes (CLOSE-WAIT timeout), silencing the channel across every shutdown path — systemd restart, hermes update, hermes gateway restart, and the --replace takeover during 'hermes dashboard' invocations. Schedule ws_client._disconnect() on the WSS thread loop via run_coroutine_threadsafe with a 5s timeout before the existing task-cancel + loop-stop sequence. Defensive hasattr guard + broad except keeps disconnect() resilient if lark_oapi's internals shift. Fixes #10202 --- gateway/platforms/feishu.py | 41 ++++++++++++++++++++- tests/gateway/test_feishu.py | 71 ++++++++++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/gateway/platforms/feishu.py b/gateway/platforms/feishu.py index 718f01e995..061f8e7652 100644 --- a/gateway/platforms/feishu.py +++ b/gateway/platforms/feishu.py @@ -1556,10 +1556,49 @@ class FeishuAdapter(BasePlatformAdapter): await self._cancel_pending_tasks(self._pending_text_batch_tasks) await self._cancel_pending_tasks(self._pending_media_batch_tasks) self._reset_batch_buffers() + + # Send a WebSocket CLOSE frame to Feishu BEFORE tearing down the + # thread loop. Without this, Feishu's server never learns the + # connection is dead and continues routing messages to the stale + # endpoint — the channel goes silent until the server-side + # CLOSE-WAIT expires (minutes to hours). See issue #10202. + # + # ``_disable_websocket_auto_reconnect()`` nils ``self._ws_client``, + # so capture the client reference first. + ws_client = self._ws_client + ws_thread_loop = self._ws_thread_loop self._disable_websocket_auto_reconnect() await self._stop_webhook_server() - ws_thread_loop = self._ws_thread_loop + if ( + ws_client is not None + and ws_thread_loop is not None + and not ws_thread_loop.is_closed() + and hasattr(ws_client, "_disconnect") + ): + try: + future = asyncio.run_coroutine_threadsafe( + ws_client._disconnect(), ws_thread_loop + ) + # 5s is generous — the CLOSE frame is a single WebSocket + # control frame. If it takes longer than that the + # connection is already wedged and we gain nothing by + # waiting further. + await asyncio.wait_for(asyncio.wrap_future(future), timeout=5.0) + logger.debug("[Feishu] Sent WebSocket CLOSE frame to Feishu") + except asyncio.TimeoutError: + logger.warning( + "[Feishu] CLOSE frame not acknowledged within 5s — " + "Feishu may briefly route messages to the stale " + "connection until server-side timeout" + ) + except Exception as exc: + logger.debug( + "[Feishu] Could not send WebSocket CLOSE frame: %s", + exc, + exc_info=True, + ) + if ws_thread_loop is not None and not ws_thread_loop.is_closed(): logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop") diff --git a/tests/gateway/test_feishu.py b/tests/gateway/test_feishu.py index f21b7dcef8..f3664f41b3 100644 --- a/tests/gateway/test_feishu.py +++ b/tests/gateway/test_feishu.py @@ -249,6 +249,77 @@ class TestFeishuAdapterMessaging(unittest.TestCase): ) release_lock.assert_called_once_with("feishu-app-id", "cli_app") + def test_disconnect_sends_websocket_close_frame(self): + """Regression test for #10202: disconnect() must call the WSS + client's ``_disconnect()`` coroutine so a WebSocket CLOSE frame + is sent to Feishu. Without this, Feishu's server continues + routing to the stale connection, silencing the channel. + """ + import threading + from types import SimpleNamespace + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + + # Real thread loop to schedule the close coroutine on. + ws_thread_loop = asyncio.new_event_loop() + ready = threading.Event() + + def _run_loop() -> None: + asyncio.set_event_loop(ws_thread_loop) + ready.set() + ws_thread_loop.run_forever() + + thread = threading.Thread(target=_run_loop, daemon=True) + thread.start() + ready.wait() + + close_called = threading.Event() + + async def _fake_disconnect() -> None: + close_called.set() + + ws_client = SimpleNamespace(_disconnect=_fake_disconnect, _auto_reconnect=True) + adapter._ws_client = ws_client + adapter._ws_thread_loop = ws_thread_loop + adapter._ws_future = None + + try: + asyncio.run(adapter.disconnect()) + finally: + if not ws_thread_loop.is_closed(): + ws_thread_loop.call_soon_threadsafe(ws_thread_loop.stop) + thread.join(timeout=2.0) + if not ws_thread_loop.is_closed(): + ws_thread_loop.close() + + self.assertTrue( + close_called.is_set(), + "disconnect() must schedule ws_client._disconnect() on the ws thread loop", + ) + # _disable_websocket_auto_reconnect() must still run. + self.assertIsNone(adapter._ws_client) + + def test_disconnect_tolerates_missing_internal_disconnect(self): + """If the lark_oapi client layout changes and ``_disconnect`` + disappears, disconnect() must not raise — fall through to the + existing task-cancel path. + """ + from types import SimpleNamespace + from gateway.config import PlatformConfig + from gateway.platforms.feishu import FeishuAdapter + + adapter = FeishuAdapter(PlatformConfig()) + # No ``_disconnect`` attribute — ``hasattr`` guard should skip. + adapter._ws_client = SimpleNamespace(_auto_reconnect=True) + adapter._ws_thread_loop = None + adapter._ws_future = None + + # Must not raise. + asyncio.run(adapter.disconnect()) + self.assertIsNone(adapter._ws_client) + @patch.dict(os.environ, { "FEISHU_APP_ID": "cli_app", "FEISHU_APP_SECRET": "secret_app",