mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
fix(feishu): send WebSocket CLOSE frame on disconnect (#10202)
Feishu adapter's disconnect() cancelled WSS-thread tasks but never called the lark_oapi client's _disconnect() coroutine, so no WebSocket CLOSE frame was sent. Feishu's server kept routing messages to the stale endpoint for minutes (CLOSE-WAIT timeout), silencing the channel across every shutdown path — systemd restart, hermes update, hermes gateway restart, and the --replace takeover during 'hermes dashboard' invocations. Schedule ws_client._disconnect() on the WSS thread loop via run_coroutine_threadsafe with a 5s timeout before the existing task-cancel + loop-stop sequence. Defensive hasattr guard + broad except keeps disconnect() resilient if lark_oapi's internals shift. Fixes #10202
This commit is contained in:
@@ -1556,10 +1556,49 @@ class FeishuAdapter(BasePlatformAdapter):
|
|||||||
await self._cancel_pending_tasks(self._pending_text_batch_tasks)
|
await self._cancel_pending_tasks(self._pending_text_batch_tasks)
|
||||||
await self._cancel_pending_tasks(self._pending_media_batch_tasks)
|
await self._cancel_pending_tasks(self._pending_media_batch_tasks)
|
||||||
self._reset_batch_buffers()
|
self._reset_batch_buffers()
|
||||||
|
|
||||||
|
# Send a WebSocket CLOSE frame to Feishu BEFORE tearing down the
|
||||||
|
# thread loop. Without this, Feishu's server never learns the
|
||||||
|
# connection is dead and continues routing messages to the stale
|
||||||
|
# endpoint — the channel goes silent until the server-side
|
||||||
|
# CLOSE-WAIT expires (minutes to hours). See issue #10202.
|
||||||
|
#
|
||||||
|
# ``_disable_websocket_auto_reconnect()`` nils ``self._ws_client``,
|
||||||
|
# so capture the client reference first.
|
||||||
|
ws_client = self._ws_client
|
||||||
|
ws_thread_loop = self._ws_thread_loop
|
||||||
self._disable_websocket_auto_reconnect()
|
self._disable_websocket_auto_reconnect()
|
||||||
await self._stop_webhook_server()
|
await self._stop_webhook_server()
|
||||||
|
|
||||||
ws_thread_loop = self._ws_thread_loop
|
if (
|
||||||
|
ws_client is not None
|
||||||
|
and ws_thread_loop is not None
|
||||||
|
and not ws_thread_loop.is_closed()
|
||||||
|
and hasattr(ws_client, "_disconnect")
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
future = asyncio.run_coroutine_threadsafe(
|
||||||
|
ws_client._disconnect(), ws_thread_loop
|
||||||
|
)
|
||||||
|
# 5s is generous — the CLOSE frame is a single WebSocket
|
||||||
|
# control frame. If it takes longer than that the
|
||||||
|
# connection is already wedged and we gain nothing by
|
||||||
|
# waiting further.
|
||||||
|
await asyncio.wait_for(asyncio.wrap_future(future), timeout=5.0)
|
||||||
|
logger.debug("[Feishu] Sent WebSocket CLOSE frame to Feishu")
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
"[Feishu] CLOSE frame not acknowledged within 5s — "
|
||||||
|
"Feishu may briefly route messages to the stale "
|
||||||
|
"connection until server-side timeout"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug(
|
||||||
|
"[Feishu] Could not send WebSocket CLOSE frame: %s",
|
||||||
|
exc,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
if ws_thread_loop is not None and not ws_thread_loop.is_closed():
|
if ws_thread_loop is not None and not ws_thread_loop.is_closed():
|
||||||
logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop")
|
logger.debug("[Feishu] Cancelling websocket thread tasks and stopping loop")
|
||||||
|
|
||||||
|
|||||||
@@ -249,6 +249,77 @@ class TestFeishuAdapterMessaging(unittest.TestCase):
|
|||||||
)
|
)
|
||||||
release_lock.assert_called_once_with("feishu-app-id", "cli_app")
|
release_lock.assert_called_once_with("feishu-app-id", "cli_app")
|
||||||
|
|
||||||
|
def test_disconnect_sends_websocket_close_frame(self):
|
||||||
|
"""Regression test for #10202: disconnect() must call the WSS
|
||||||
|
client's ``_disconnect()`` coroutine so a WebSocket CLOSE frame
|
||||||
|
is sent to Feishu. Without this, Feishu's server continues
|
||||||
|
routing to the stale connection, silencing the channel.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from gateway.config import PlatformConfig
|
||||||
|
from gateway.platforms.feishu import FeishuAdapter
|
||||||
|
|
||||||
|
adapter = FeishuAdapter(PlatformConfig())
|
||||||
|
|
||||||
|
# Real thread loop to schedule the close coroutine on.
|
||||||
|
ws_thread_loop = asyncio.new_event_loop()
|
||||||
|
ready = threading.Event()
|
||||||
|
|
||||||
|
def _run_loop() -> None:
|
||||||
|
asyncio.set_event_loop(ws_thread_loop)
|
||||||
|
ready.set()
|
||||||
|
ws_thread_loop.run_forever()
|
||||||
|
|
||||||
|
thread = threading.Thread(target=_run_loop, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
ready.wait()
|
||||||
|
|
||||||
|
close_called = threading.Event()
|
||||||
|
|
||||||
|
async def _fake_disconnect() -> None:
|
||||||
|
close_called.set()
|
||||||
|
|
||||||
|
ws_client = SimpleNamespace(_disconnect=_fake_disconnect, _auto_reconnect=True)
|
||||||
|
adapter._ws_client = ws_client
|
||||||
|
adapter._ws_thread_loop = ws_thread_loop
|
||||||
|
adapter._ws_future = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(adapter.disconnect())
|
||||||
|
finally:
|
||||||
|
if not ws_thread_loop.is_closed():
|
||||||
|
ws_thread_loop.call_soon_threadsafe(ws_thread_loop.stop)
|
||||||
|
thread.join(timeout=2.0)
|
||||||
|
if not ws_thread_loop.is_closed():
|
||||||
|
ws_thread_loop.close()
|
||||||
|
|
||||||
|
self.assertTrue(
|
||||||
|
close_called.is_set(),
|
||||||
|
"disconnect() must schedule ws_client._disconnect() on the ws thread loop",
|
||||||
|
)
|
||||||
|
# _disable_websocket_auto_reconnect() must still run.
|
||||||
|
self.assertIsNone(adapter._ws_client)
|
||||||
|
|
||||||
|
def test_disconnect_tolerates_missing_internal_disconnect(self):
|
||||||
|
"""If the lark_oapi client layout changes and ``_disconnect``
|
||||||
|
disappears, disconnect() must not raise — fall through to the
|
||||||
|
existing task-cancel path.
|
||||||
|
"""
|
||||||
|
from types import SimpleNamespace
|
||||||
|
from gateway.config import PlatformConfig
|
||||||
|
from gateway.platforms.feishu import FeishuAdapter
|
||||||
|
|
||||||
|
adapter = FeishuAdapter(PlatformConfig())
|
||||||
|
# No ``_disconnect`` attribute — ``hasattr`` guard should skip.
|
||||||
|
adapter._ws_client = SimpleNamespace(_auto_reconnect=True)
|
||||||
|
adapter._ws_thread_loop = None
|
||||||
|
adapter._ws_future = None
|
||||||
|
|
||||||
|
# Must not raise.
|
||||||
|
asyncio.run(adapter.disconnect())
|
||||||
|
self.assertIsNone(adapter._ws_client)
|
||||||
|
|
||||||
@patch.dict(os.environ, {
|
@patch.dict(os.environ, {
|
||||||
"FEISHU_APP_ID": "cli_app",
|
"FEISHU_APP_ID": "cli_app",
|
||||||
"FEISHU_APP_SECRET": "secret_app",
|
"FEISHU_APP_SECRET": "secret_app",
|
||||||
|
|||||||
Reference in New Issue
Block a user