From cedc95c100eb25e45097449ee2f3b62e0095d213 Mon Sep 17 00:00:00 2001 From: memosr Date: Mon, 13 Apr 2026 23:52:18 +0300 Subject: [PATCH] fix(security): validate WeChat media URLs against CDN allowlist to prevent SSRF --- gateway/platforms/weixin.py | 36 +++++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/gateway/platforms/weixin.py b/gateway/platforms/weixin.py index baf344ee9c..1c303b72ad 100644 --- a/gateway/platforms/weixin.py +++ b/gateway/platforms/weixin.py @@ -28,7 +28,7 @@ import uuid from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Tuple -from urllib.parse import quote +from urllib.parse import quote, urlparse logger = logging.getLogger(__name__) @@ -535,6 +535,39 @@ async def _download_bytes( return await response.read() +_WEIXIN_CDN_ALLOWLIST: frozenset[str] = frozenset( + { + "novac2c.cdn.weixin.qq.com", + "ilinkai.weixin.qq.com", + "wx.qlogo.cn", + "thirdwx.qlogo.cn", + "res.wx.qq.com", + "mmbiz.qpic.cn", + "mmbiz.qlogo.cn", + } +) + + +def _assert_weixin_cdn_url(url: str) -> None: + """Raise ValueError if *url* does not point at a known WeChat CDN host.""" + try: + parsed = urlparse(url) + scheme = parsed.scheme.lower() + host = parsed.hostname or "" + except Exception as exc: # noqa: BLE001 + raise ValueError(f"Unparseable media URL: {url!r}") from exc + + if scheme not in ("http", "https"): + raise ValueError( + f"Media URL has disallowed scheme {scheme!r}; only http/https are permitted." + ) + if host not in _WEIXIN_CDN_ALLOWLIST: + raise ValueError( + f"Media URL host {host!r} is not in the WeChat CDN allowlist. " + "Refusing to fetch to prevent SSRF." + ) + + def _media_reference(item: Dict[str, Any], key: str) -> Dict[str, Any]: return (item.get(key) or {}).get("media") or {} @@ -555,6 +588,7 @@ async def _download_and_decrypt_media( timeout_seconds=timeout_seconds, ) elif full_url: + _assert_weixin_cdn_url(full_url) raw = await _download_bytes(session, url=full_url, timeout_seconds=timeout_seconds) else: raise RuntimeError("media item had neither encrypt_query_param nor full_url")