diff --git a/gateway/platforms/wecom.py b/gateway/platforms/wecom.py index 3198be64af..9e5dd04e0d 100644 --- a/gateway/platforms/wecom.py +++ b/gateway/platforms/wecom.py @@ -492,10 +492,8 @@ class WeComAdapter(BasePlatformAdapter): if not chat_id: logger.debug("[%s] Missing chat id, skipping message", self.name) return - - self._last_chat_req_ids[chat_id] = self._payload_req_id(payload) - is_group = bool(body.get("chatid")) + is_group = str(body.get("chattype") or "").lower() == "group" if is_group: if not self._is_group_allowed(chat_id, sender_id): logger.debug("[%s] Group %s / sender %s blocked by policy", self.name, chat_id, sender_id) @@ -504,6 +502,11 @@ class WeComAdapter(BasePlatformAdapter): logger.debug("[%s] DM sender %s blocked by policy", self.name, sender_id) return + # Cache the inbound req_id after policy checks so proactive sends to + # this chat can fall back to APP_CMD_RESPONSE (required for groups — + # WeCom AI Bots cannot initiate APP_CMD_SEND in group chats). + self._remember_chat_req_id(chat_id, self._payload_req_id(payload)) + text, reply_text = self._extract_text(body) media_urls, media_types = await self._extract_media(body) message_type = self._derive_message_type(body, text, media_types) @@ -855,6 +858,23 @@ class WeComAdapter(BasePlatformAdapter): while len(self._reply_req_ids) > DEDUP_MAX_SIZE: self._reply_req_ids.pop(next(iter(self._reply_req_ids))) + def _remember_chat_req_id(self, chat_id: str, req_id: str) -> None: + """Cache the most recent inbound req_id per chat. + + Used as a fallback reply target when we need to send into a group + without an explicit ``reply_to`` — WeCom AI Bots are blocked from + APP_CMD_SEND in groups and must use APP_CMD_RESPONSE bound to some + prior req_id. Bounded like _reply_req_ids so long-running gateways + don't leak memory across many chats. + """ + normalized_chat_id = str(chat_id or "").strip() + normalized_req_id = str(req_id or "").strip() + if not normalized_chat_id or not normalized_req_id: + return + self._last_chat_req_ids[normalized_chat_id] = normalized_req_id + while len(self._last_chat_req_ids) > DEDUP_MAX_SIZE: + self._last_chat_req_ids.pop(next(iter(self._last_chat_req_ids))) + def _reply_req_id_for_message(self, reply_to: Optional[str]) -> Optional[str]: normalized = str(reply_to or "").strip() if not normalized or normalized.startswith("quote:"): @@ -1239,7 +1259,7 @@ class WeComAdapter(BasePlatformAdapter): return SendResult(success=False, error=prepared["reject_reason"]) reply_req_id = self._reply_req_id_for_message(reply_to) - if not reply_req_id and chat_id in getattr(self, '_last_chat_req_ids', {}): + if not reply_req_id and chat_id in self._last_chat_req_ids: reply_req_id = self._last_chat_req_ids[chat_id] try: @@ -1310,7 +1330,7 @@ class WeComAdapter(BasePlatformAdapter): try: reply_req_id = self._reply_req_id_for_message(reply_to) - if not reply_req_id and chat_id in getattr(self, '_last_chat_req_ids', {}): + if not reply_req_id and chat_id in self._last_chat_req_ids: reply_req_id = self._last_chat_req_ids[chat_id] if reply_req_id: diff --git a/tests/gateway/test_wecom.py b/tests/gateway/test_wecom.py index cc4aaddc7a..3c4ec357bc 100644 --- a/tests/gateway/test_wecom.py +++ b/tests/gateway/test_wecom.py @@ -119,7 +119,7 @@ class TestWeComConnect: class TestWeComReplyMode: @pytest.mark.asyncio - async def test_send_uses_passive_reply_stream_when_reply_context_exists(self): + async def test_send_uses_passive_reply_markdown_when_reply_context_exists(self): from gateway.platforms.wecom import WeComAdapter adapter = WeComAdapter(PlatformConfig(enabled=True)) @@ -134,9 +134,10 @@ class TestWeComReplyMode: adapter._send_reply_request.assert_awaited_once() args = adapter._send_reply_request.await_args.args assert args[0] == "req-1" - assert args[1]["msgtype"] == "stream" - assert args[1]["stream"]["finish"] is True - assert args[1]["stream"]["content"] == "hello from reply" + # msgtype: stream triggers WeCom errcode 600039 on many mobile clients + # (unsupported type). Markdown renders everywhere. + assert args[1]["msgtype"] == "markdown" + assert args[1]["markdown"]["content"] == "hello from reply" @pytest.mark.asyncio async def test_send_image_file_uses_passive_reply_media_when_reply_context_exists(self): @@ -593,3 +594,193 @@ class TestInboundMessages: await adapter._on_message(payload) adapter.handle_message.assert_not_awaited() + +class TestWeComZombieSessionFix: + """Tests for PR #11572 — device_id, markdown reply, group req_id fallback.""" + + def test_adapter_generates_stable_device_id_per_instance(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + assert isinstance(adapter._device_id, str) + assert len(adapter._device_id) > 0 + # Second snapshot on the same adapter must be identical — only a fresh + # adapter instance should get a new device_id (one-per-reconnect is the + # zombie-session footgun we're fixing). + assert adapter._device_id == adapter._device_id + + def test_different_adapter_instances_get_distinct_device_ids(self): + from gateway.platforms.wecom import WeComAdapter + + a = WeComAdapter(PlatformConfig(enabled=True)) + b = WeComAdapter(PlatformConfig(enabled=True)) + assert a._device_id != b._device_id + + @pytest.mark.asyncio + async def test_open_connection_includes_device_id_in_subscribe(self): + from gateway.platforms.wecom import APP_CMD_SUBSCRIBE, WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._bot_id = "test-bot" + adapter._secret = "test-secret" + + sent_payloads = [] + + class _FakeWS: + closed = False + + async def send_json(self, payload): + sent_payloads.append(payload) + + async def close(self): + return None + + class _FakeSession: + def __init__(self, *args, **kwargs): + pass + + async def ws_connect(self, *args, **kwargs): + return _FakeWS() + + async def close(self): + return None + + async def _fake_cleanup(): + return None + + async def _fake_handshake(req_id): + return {"errcode": 0, "headers": {"req_id": req_id}} + + adapter._cleanup_ws = _fake_cleanup + adapter._wait_for_handshake = _fake_handshake + + with patch("gateway.platforms.wecom.aiohttp.ClientSession", _FakeSession): + await adapter._open_connection() + + assert len(sent_payloads) == 1 + subscribe = sent_payloads[0] + assert subscribe["cmd"] == APP_CMD_SUBSCRIBE + assert subscribe["body"]["bot_id"] == "test-bot" + assert subscribe["body"]["secret"] == "test-secret" + assert subscribe["body"]["device_id"] == adapter._device_id + + @pytest.mark.asyncio + async def test_on_message_caches_last_req_id_per_chat(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._text_batch_delay_seconds = 0 + adapter.handle_message = AsyncMock() + adapter._extract_media = AsyncMock(return_value=([], [])) + + payload = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req-abc"}, + "body": { + "msgid": "msg-1", + "chatid": "group-1", + "chattype": "group", + "from": {"userid": "user-1"}, + "msgtype": "text", + "text": {"content": "hi"}, + }, + } + + await adapter._on_message(payload) + assert adapter._last_chat_req_ids["group-1"] == "req-abc" + + @pytest.mark.asyncio + async def test_on_message_does_not_cache_blocked_sender_req_id(self): + """Blocked chats shouldn't populate the proactive-send fallback cache.""" + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter( + PlatformConfig( + enabled=True, + extra={"group_policy": "allowlist", "group_allow_from": ["group-ok"]}, + ) + ) + adapter.handle_message = AsyncMock() + adapter._extract_media = AsyncMock(return_value=([], [])) + + payload = { + "cmd": "aibot_msg_callback", + "headers": {"req_id": "req-abc"}, + "body": { + "msgid": "msg-1", + "chatid": "group-blocked", + "chattype": "group", + "from": {"userid": "user-1"}, + "msgtype": "text", + "text": {"content": "hi"}, + }, + } + + await adapter._on_message(payload) + adapter.handle_message.assert_not_awaited() + assert "group-blocked" not in adapter._last_chat_req_ids + + def test_remember_chat_req_id_is_bounded(self): + from gateway.platforms.wecom import DEDUP_MAX_SIZE, WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + for i in range(DEDUP_MAX_SIZE + 50): + adapter._remember_chat_req_id(f"chat-{i}", f"req-{i}") + assert len(adapter._last_chat_req_ids) <= DEDUP_MAX_SIZE + # The most recently remembered chat must still be present. + latest = f"chat-{DEDUP_MAX_SIZE + 49}" + assert adapter._last_chat_req_ids[latest] == f"req-{DEDUP_MAX_SIZE + 49}" + + def test_remember_chat_req_id_ignores_empty_values(self): + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._remember_chat_req_id("", "req-1") + adapter._remember_chat_req_id("chat-1", "") + adapter._remember_chat_req_id(" ", " ") + assert adapter._last_chat_req_ids == {} + + @pytest.mark.asyncio + async def test_proactive_group_send_falls_back_to_cached_req_id(self): + """Sending into a group without reply_to should use the last cached + req_id via APP_CMD_RESPONSE — WeCom AI Bots cannot initiate APP_CMD_SEND + in group chats (errcode 600039).""" + from gateway.platforms.wecom import WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._last_chat_req_ids["group-1"] = "inbound-req-42" + adapter._send_reply_request = AsyncMock( + return_value={"headers": {"req_id": "inbound-req-42"}, "errcode": 0} + ) + adapter._send_request = AsyncMock( + return_value={"headers": {"req_id": "new"}, "errcode": 0} + ) + + result = await adapter.send("group-1", "ping", reply_to=None) + + assert result.success is True + # Must route through reply (APP_CMD_RESPONSE), not proactive send. + adapter._send_reply_request.assert_awaited_once() + adapter._send_request.assert_not_awaited() + args = adapter._send_reply_request.await_args.args + assert args[0] == "inbound-req-42" + assert args[1]["msgtype"] == "markdown" + assert args[1]["markdown"]["content"] == "ping" + + @pytest.mark.asyncio + async def test_proactive_send_without_cached_req_id_uses_app_cmd_send(self): + """When we have no prior req_id (fresh DM target), APP_CMD_SEND is used.""" + from gateway.platforms.wecom import APP_CMD_SEND, WeComAdapter + + adapter = WeComAdapter(PlatformConfig(enabled=True)) + adapter._send_request = AsyncMock( + return_value={"headers": {"req_id": "new"}, "errcode": 0} + ) + + result = await adapter.send("fresh-dm-chat", "ping", reply_to=None) + + assert result.success is True + adapter._send_request.assert_awaited_once() + cmd = adapter._send_request.await_args.args[0] + assert cmd == APP_CMD_SEND +