mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
fix: allow trusted QQ CDN benchmark IP resolution
This commit is contained in:
@@ -152,6 +152,34 @@ class TestIsSafeUrl:
|
||||
# 100.0.0.1 is a global IP, not in CGNAT range
|
||||
assert is_safe_url("http://legit-host.example/") is True
|
||||
|
||||
def test_benchmark_ip_blocked_for_non_allowlisted_host(self):
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
(2, 1, 6, "", ("198.18.0.23", 0)),
|
||||
]):
|
||||
assert is_safe_url("https://example.com/file.jpg") is False
|
||||
|
||||
def test_qq_multimedia_hostname_allowed_with_benchmark_ip(self):
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
(2, 1, 6, "", ("198.18.0.23", 0)),
|
||||
]):
|
||||
assert is_safe_url("https://multimedia.nt.qq.com.cn/download?id=123") is True
|
||||
|
||||
def test_qq_multimedia_hostname_exception_is_exact_match(self):
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
(2, 1, 6, "", ("198.18.0.23", 0)),
|
||||
]):
|
||||
assert is_safe_url("https://sub.multimedia.nt.qq.com.cn/download?id=123") is False
|
||||
|
||||
def test_qq_multimedia_hostname_exception_requires_https(self):
|
||||
with patch("socket.getaddrinfo", return_value=[
|
||||
(2, 1, 6, "", ("198.18.0.23", 0)),
|
||||
]):
|
||||
assert is_safe_url("http://multimedia.nt.qq.com.cn/download?id=123") is False
|
||||
|
||||
def test_qq_multimedia_hostname_dns_failure_still_blocked(self):
|
||||
with patch("socket.getaddrinfo", side_effect=socket.gaierror("Name resolution failed")):
|
||||
assert is_safe_url("https://multimedia.nt.qq.com.cn/download?id=123") is False
|
||||
|
||||
|
||||
class TestIsBlockedIp:
|
||||
"""Direct tests for the _is_blocked_ip helper."""
|
||||
@@ -159,7 +187,7 @@ class TestIsBlockedIp:
|
||||
@pytest.mark.parametrize("ip_str", [
|
||||
"127.0.0.1", "10.0.0.1", "172.16.0.1", "192.168.1.1",
|
||||
"169.254.169.254", "0.0.0.0", "224.0.0.1", "255.255.255.255",
|
||||
"100.64.0.1", "100.100.100.100", "100.127.255.254",
|
||||
"100.64.0.1", "100.100.100.100", "100.127.255.254", "198.18.0.23",
|
||||
"::1", "fe80::1", "fc00::1", "fd12::1", "ff02::1",
|
||||
"::ffff:127.0.0.1", "::ffff:169.254.169.254",
|
||||
])
|
||||
|
||||
@@ -29,6 +29,13 @@ _BLOCKED_HOSTNAMES = frozenset({
|
||||
"metadata.goog",
|
||||
})
|
||||
|
||||
# Exact HTTPS hostnames allowed to resolve to private/benchmark-space IPs.
|
||||
# This is intentionally narrow: QQ media downloads can legitimately resolve
|
||||
# to 198.18.0.0/15 behind local proxy/benchmark infrastructure.
|
||||
_TRUSTED_PRIVATE_IP_HOSTS = frozenset({
|
||||
"multimedia.nt.qq.com.cn",
|
||||
})
|
||||
|
||||
# 100.64.0.0/10 (CGNAT / Shared Address Space, RFC 6598) is NOT covered by
|
||||
# ipaddress.is_private — it returns False for both is_private and is_global.
|
||||
# Must be blocked explicitly. Used by carrier-grade NAT, Tailscale/WireGuard
|
||||
@@ -48,6 +55,11 @@ def _is_blocked_ip(ip: ipaddress.IPv4Address | ipaddress.IPv6Address) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _allows_private_ip_resolution(hostname: str, scheme: str) -> bool:
|
||||
"""Return True when a trusted HTTPS hostname may bypass IP-class blocking."""
|
||||
return scheme == "https" and hostname in _TRUSTED_PRIVATE_IP_HOSTS
|
||||
|
||||
|
||||
def is_safe_url(url: str) -> bool:
|
||||
"""Return True if the URL target is not a private/internal address.
|
||||
|
||||
@@ -56,7 +68,8 @@ def is_safe_url(url: str) -> bool:
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
hostname = (parsed.hostname or "").strip().lower()
|
||||
hostname = (parsed.hostname or "").strip().lower().rstrip(".")
|
||||
scheme = (parsed.scheme or "").strip().lower()
|
||||
if not hostname:
|
||||
return False
|
||||
|
||||
@@ -65,6 +78,8 @@ def is_safe_url(url: str) -> bool:
|
||||
logger.warning("Blocked request to internal hostname: %s", hostname)
|
||||
return False
|
||||
|
||||
allow_private_ip = _allows_private_ip_resolution(hostname, scheme)
|
||||
|
||||
# Try to resolve and check IP
|
||||
try:
|
||||
addr_info = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
|
||||
@@ -81,13 +96,19 @@ def is_safe_url(url: str) -> bool:
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
if _is_blocked_ip(ip):
|
||||
if not allow_private_ip and _is_blocked_ip(ip):
|
||||
logger.warning(
|
||||
"Blocked request to private/internal address: %s -> %s",
|
||||
hostname, ip_str,
|
||||
)
|
||||
return False
|
||||
|
||||
if allow_private_ip:
|
||||
logger.debug(
|
||||
"Allowing trusted hostname despite private/internal resolution: %s",
|
||||
hostname,
|
||||
)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as exc:
|
||||
|
||||
Reference in New Issue
Block a user