fix(wecom): bound req_id cache, revert undocumented is_group change, add tests

Follow-up to the cherry-picked contributor fix:

- Extract `_remember_chat_req_id()` and bound it at DEDUP_MAX_SIZE like
  `_reply_req_ids` — the unbounded dict would grow forever on a long-
  running gateway with many chats.
- Move the cache write to AFTER the group/DM policy check so we don't
  cache req_ids from blocked senders.
- Revert the undocumented `is_group` change: the contributor flipped
  `chattype == 'group'` to `bool(chatid)`, which wasn't mentioned in
  the PR description and weakens the signal (chattype is the explicit
  hint; relying on chatid presence assumes DMs never carry it). Keep
  the original check.
- Drop the defensive `getattr(self, '_last_chat_req_ids', {})` reads
  at both send sites — the attribute is initialized in __init__.
- Update `test_send_uses_passive_reply_stream_...` → `_markdown_...`
  to match the new msgtype, and add a new TestWeComZombieSessionFix
  class covering device_id presence in subscribe, per-chat req_id
  caching + bounding, blocked-sender cache exclusion, and the group
  APP_CMD_RESPONSE fallback path.
This commit is contained in:
Teknium
2026-04-17 18:59:43 -07:00
committed by Teknium
parent 2992802b35
commit 9b14b76eb3
2 changed files with 220 additions and 9 deletions

View File

@@ -492,10 +492,8 @@ class WeComAdapter(BasePlatformAdapter):
if not chat_id:
logger.debug("[%s] Missing chat id, skipping message", self.name)
return
self._last_chat_req_ids[chat_id] = self._payload_req_id(payload)
is_group = bool(body.get("chatid"))
is_group = str(body.get("chattype") or "").lower() == "group"
if is_group:
if not self._is_group_allowed(chat_id, sender_id):
logger.debug("[%s] Group %s / sender %s blocked by policy", self.name, chat_id, sender_id)
@@ -504,6 +502,11 @@ class WeComAdapter(BasePlatformAdapter):
logger.debug("[%s] DM sender %s blocked by policy", self.name, sender_id)
return
# Cache the inbound req_id after policy checks so proactive sends to
# this chat can fall back to APP_CMD_RESPONSE (required for groups —
# WeCom AI Bots cannot initiate APP_CMD_SEND in group chats).
self._remember_chat_req_id(chat_id, self._payload_req_id(payload))
text, reply_text = self._extract_text(body)
media_urls, media_types = await self._extract_media(body)
message_type = self._derive_message_type(body, text, media_types)
@@ -855,6 +858,23 @@ class WeComAdapter(BasePlatformAdapter):
while len(self._reply_req_ids) > DEDUP_MAX_SIZE:
self._reply_req_ids.pop(next(iter(self._reply_req_ids)))
def _remember_chat_req_id(self, chat_id: str, req_id: str) -> None:
"""Cache the most recent inbound req_id per chat.
Used as a fallback reply target when we need to send into a group
without an explicit ``reply_to`` — WeCom AI Bots are blocked from
APP_CMD_SEND in groups and must use APP_CMD_RESPONSE bound to some
prior req_id. Bounded like _reply_req_ids so long-running gateways
don't leak memory across many chats.
"""
normalized_chat_id = str(chat_id or "").strip()
normalized_req_id = str(req_id or "").strip()
if not normalized_chat_id or not normalized_req_id:
return
self._last_chat_req_ids[normalized_chat_id] = normalized_req_id
while len(self._last_chat_req_ids) > DEDUP_MAX_SIZE:
self._last_chat_req_ids.pop(next(iter(self._last_chat_req_ids)))
def _reply_req_id_for_message(self, reply_to: Optional[str]) -> Optional[str]:
normalized = str(reply_to or "").strip()
if not normalized or normalized.startswith("quote:"):
@@ -1239,7 +1259,7 @@ class WeComAdapter(BasePlatformAdapter):
return SendResult(success=False, error=prepared["reject_reason"])
reply_req_id = self._reply_req_id_for_message(reply_to)
if not reply_req_id and chat_id in getattr(self, '_last_chat_req_ids', {}):
if not reply_req_id and chat_id in self._last_chat_req_ids:
reply_req_id = self._last_chat_req_ids[chat_id]
try:
@@ -1310,7 +1330,7 @@ class WeComAdapter(BasePlatformAdapter):
try:
reply_req_id = self._reply_req_id_for_message(reply_to)
if not reply_req_id and chat_id in getattr(self, '_last_chat_req_ids', {}):
if not reply_req_id and chat_id in self._last_chat_req_ids:
reply_req_id = self._last_chat_req_ids[chat_id]
if reply_req_id:

View File

@@ -119,7 +119,7 @@ class TestWeComConnect:
class TestWeComReplyMode:
@pytest.mark.asyncio
async def test_send_uses_passive_reply_stream_when_reply_context_exists(self):
async def test_send_uses_passive_reply_markdown_when_reply_context_exists(self):
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
@@ -134,9 +134,10 @@ class TestWeComReplyMode:
adapter._send_reply_request.assert_awaited_once()
args = adapter._send_reply_request.await_args.args
assert args[0] == "req-1"
assert args[1]["msgtype"] == "stream"
assert args[1]["stream"]["finish"] is True
assert args[1]["stream"]["content"] == "hello from reply"
# msgtype: stream triggers WeCom errcode 600039 on many mobile clients
# (unsupported type). Markdown renders everywhere.
assert args[1]["msgtype"] == "markdown"
assert args[1]["markdown"]["content"] == "hello from reply"
@pytest.mark.asyncio
async def test_send_image_file_uses_passive_reply_media_when_reply_context_exists(self):
@@ -593,3 +594,193 @@ class TestInboundMessages:
await adapter._on_message(payload)
adapter.handle_message.assert_not_awaited()
class TestWeComZombieSessionFix:
"""Tests for PR #11572 — device_id, markdown reply, group req_id fallback."""
def test_adapter_generates_stable_device_id_per_instance(self):
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
assert isinstance(adapter._device_id, str)
assert len(adapter._device_id) > 0
# Second snapshot on the same adapter must be identical — only a fresh
# adapter instance should get a new device_id (one-per-reconnect is the
# zombie-session footgun we're fixing).
assert adapter._device_id == adapter._device_id
def test_different_adapter_instances_get_distinct_device_ids(self):
from gateway.platforms.wecom import WeComAdapter
a = WeComAdapter(PlatformConfig(enabled=True))
b = WeComAdapter(PlatformConfig(enabled=True))
assert a._device_id != b._device_id
@pytest.mark.asyncio
async def test_open_connection_includes_device_id_in_subscribe(self):
from gateway.platforms.wecom import APP_CMD_SUBSCRIBE, WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
adapter._bot_id = "test-bot"
adapter._secret = "test-secret"
sent_payloads = []
class _FakeWS:
closed = False
async def send_json(self, payload):
sent_payloads.append(payload)
async def close(self):
return None
class _FakeSession:
def __init__(self, *args, **kwargs):
pass
async def ws_connect(self, *args, **kwargs):
return _FakeWS()
async def close(self):
return None
async def _fake_cleanup():
return None
async def _fake_handshake(req_id):
return {"errcode": 0, "headers": {"req_id": req_id}}
adapter._cleanup_ws = _fake_cleanup
adapter._wait_for_handshake = _fake_handshake
with patch("gateway.platforms.wecom.aiohttp.ClientSession", _FakeSession):
await adapter._open_connection()
assert len(sent_payloads) == 1
subscribe = sent_payloads[0]
assert subscribe["cmd"] == APP_CMD_SUBSCRIBE
assert subscribe["body"]["bot_id"] == "test-bot"
assert subscribe["body"]["secret"] == "test-secret"
assert subscribe["body"]["device_id"] == adapter._device_id
@pytest.mark.asyncio
async def test_on_message_caches_last_req_id_per_chat(self):
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
adapter._text_batch_delay_seconds = 0
adapter.handle_message = AsyncMock()
adapter._extract_media = AsyncMock(return_value=([], []))
payload = {
"cmd": "aibot_msg_callback",
"headers": {"req_id": "req-abc"},
"body": {
"msgid": "msg-1",
"chatid": "group-1",
"chattype": "group",
"from": {"userid": "user-1"},
"msgtype": "text",
"text": {"content": "hi"},
},
}
await adapter._on_message(payload)
assert adapter._last_chat_req_ids["group-1"] == "req-abc"
@pytest.mark.asyncio
async def test_on_message_does_not_cache_blocked_sender_req_id(self):
"""Blocked chats shouldn't populate the proactive-send fallback cache."""
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(
PlatformConfig(
enabled=True,
extra={"group_policy": "allowlist", "group_allow_from": ["group-ok"]},
)
)
adapter.handle_message = AsyncMock()
adapter._extract_media = AsyncMock(return_value=([], []))
payload = {
"cmd": "aibot_msg_callback",
"headers": {"req_id": "req-abc"},
"body": {
"msgid": "msg-1",
"chatid": "group-blocked",
"chattype": "group",
"from": {"userid": "user-1"},
"msgtype": "text",
"text": {"content": "hi"},
},
}
await adapter._on_message(payload)
adapter.handle_message.assert_not_awaited()
assert "group-blocked" not in adapter._last_chat_req_ids
def test_remember_chat_req_id_is_bounded(self):
from gateway.platforms.wecom import DEDUP_MAX_SIZE, WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
for i in range(DEDUP_MAX_SIZE + 50):
adapter._remember_chat_req_id(f"chat-{i}", f"req-{i}")
assert len(adapter._last_chat_req_ids) <= DEDUP_MAX_SIZE
# The most recently remembered chat must still be present.
latest = f"chat-{DEDUP_MAX_SIZE + 49}"
assert adapter._last_chat_req_ids[latest] == f"req-{DEDUP_MAX_SIZE + 49}"
def test_remember_chat_req_id_ignores_empty_values(self):
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
adapter._remember_chat_req_id("", "req-1")
adapter._remember_chat_req_id("chat-1", "")
adapter._remember_chat_req_id(" ", " ")
assert adapter._last_chat_req_ids == {}
@pytest.mark.asyncio
async def test_proactive_group_send_falls_back_to_cached_req_id(self):
"""Sending into a group without reply_to should use the last cached
req_id via APP_CMD_RESPONSE — WeCom AI Bots cannot initiate APP_CMD_SEND
in group chats (errcode 600039)."""
from gateway.platforms.wecom import WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
adapter._last_chat_req_ids["group-1"] = "inbound-req-42"
adapter._send_reply_request = AsyncMock(
return_value={"headers": {"req_id": "inbound-req-42"}, "errcode": 0}
)
adapter._send_request = AsyncMock(
return_value={"headers": {"req_id": "new"}, "errcode": 0}
)
result = await adapter.send("group-1", "ping", reply_to=None)
assert result.success is True
# Must route through reply (APP_CMD_RESPONSE), not proactive send.
adapter._send_reply_request.assert_awaited_once()
adapter._send_request.assert_not_awaited()
args = adapter._send_reply_request.await_args.args
assert args[0] == "inbound-req-42"
assert args[1]["msgtype"] == "markdown"
assert args[1]["markdown"]["content"] == "ping"
@pytest.mark.asyncio
async def test_proactive_send_without_cached_req_id_uses_app_cmd_send(self):
"""When we have no prior req_id (fresh DM target), APP_CMD_SEND is used."""
from gateway.platforms.wecom import APP_CMD_SEND, WeComAdapter
adapter = WeComAdapter(PlatformConfig(enabled=True))
adapter._send_request = AsyncMock(
return_value={"headers": {"req_id": "new"}, "errcode": 0}
)
result = await adapter.send("fresh-dm-chat", "ping", reply_to=None)
assert result.success is True
adapter._send_request.assert_awaited_once()
cmd = adapter._send_request.await_args.args[0]
assert cmd == APP_CMD_SEND