handle port variants in Twilio signatures

This commit is contained in:
Mariano Nicolini
2026-04-11 15:15:33 -03:00
committed by Teknium
parent c22bffc92e
commit ad1e8804a6
2 changed files with 109 additions and 0 deletions

View File

@@ -208,8 +208,24 @@ class SmsAdapter(BasePlatformAdapter):
) -> bool:
"""Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64).
Tries both with and without the default port for the URL scheme,
since Twilio may sign with either variant.
Algorithm: https://www.twilio.com/docs/usage/security#validating-requests
"""
if self._check_signature(url, post_params, signature):
return True
variant = self._port_variant_url(url)
if variant and self._check_signature(variant, post_params, signature):
return True
return False
def _check_signature(
self, url: str, post_params: dict, signature: str,
) -> bool:
"""Compute and compare a single Twilio signature."""
data_to_sign = url
for key in sorted(post_params.keys()):
data_to_sign += key + post_params[key]
@@ -221,6 +237,36 @@ class SmsAdapter(BasePlatformAdapter):
computed = base64.b64encode(mac.digest()).decode("utf-8")
return hmac.compare_digest(computed, signature)
@staticmethod
def _port_variant_url(url: str) -> str | None:
"""Return the URL with the default port toggled, or None.
Only toggles default ports (443 for https, 80 for http).
Non-standard ports are never modified.
"""
parsed = urllib.parse.urlparse(url)
default_ports = {"https": 443, "http": 80}
default_port = default_ports.get(parsed.scheme)
if default_port is None:
return None
if parsed.port == default_port:
# Has explicit default port → strip it
return urllib.parse.urlunparse(
(parsed.scheme, parsed.hostname, parsed.path,
parsed.params, parsed.query, parsed.fragment)
)
elif parsed.port is None:
# No port → add default
netloc = f"{parsed.hostname}:{default_port}"
return urllib.parse.urlunparse(
(parsed.scheme, netloc, parsed.path,
parsed.params, parsed.query, parsed.fragment)
)
# Non-standard port — no variant
return None
# ------------------------------------------------------------------
# Twilio webhook handler
# ------------------------------------------------------------------

View File

@@ -348,6 +348,50 @@ class TestTwilioSignatureValidation:
"https://b.com/webhooks/twilio", params, sig
) is False
def test_port_variant_443_matches_without_port(self):
"""Signature for https URL with :443 validates against URL without port."""
adapter = self._make_adapter()
params = {"From": "+15551234567", "Body": "hello"}
sig = _compute_twilio_signature(
"test_token_secret", "https://example.com:443/webhooks/twilio", params
)
assert adapter._validate_twilio_signature(
"https://example.com/webhooks/twilio", params, sig
) is True
def test_port_variant_without_port_matches_443(self):
"""Signature for https URL without port validates against URL with :443."""
adapter = self._make_adapter()
params = {"From": "+15551234567", "Body": "hello"}
sig = _compute_twilio_signature(
"test_token_secret", "https://example.com/webhooks/twilio", params
)
assert adapter._validate_twilio_signature(
"https://example.com:443/webhooks/twilio", params, sig
) is True
def test_non_standard_port_no_variant(self):
"""Non-standard port must NOT match URL without port."""
adapter = self._make_adapter()
params = {"From": "+15551234567", "Body": "hello"}
sig = _compute_twilio_signature(
"test_token_secret", "https://example.com/webhooks/twilio", params
)
assert adapter._validate_twilio_signature(
"https://example.com:8080/webhooks/twilio", params, sig
) is False
def test_port_variant_http_80(self):
"""Port variant also works for http with port 80."""
adapter = self._make_adapter()
params = {"From": "+15551234567", "Body": "hello"}
sig = _compute_twilio_signature(
"test_token_secret", "http://example.com:80/webhooks/twilio", params
)
assert adapter._validate_twilio_signature(
"http://example.com/webhooks/twilio", params, sig
) is True
# ── Webhook signature enforcement (handler-level) ──────────────────
@@ -415,3 +459,22 @@ class TestWebhookSignatureEnforcement:
request = self._mock_request(body, headers={"X-Twilio-Signature": sig})
resp = await adapter._handle_webhook(request)
assert resp.status == 200
@pytest.mark.asyncio
async def test_port_variant_signature_returns_200(self):
"""Signature computed with :443 should pass when URL configured without port."""
webhook_url = "https://example.com/webhooks/twilio"
adapter = self._make_adapter(webhook_url=webhook_url)
params = {
"From": "+15551234567",
"To": "+15550001111",
"Body": "hello",
"MessageSid": "SM123",
}
sig = _compute_twilio_signature(
"test_token_secret", "https://example.com:443/webhooks/twilio", params
)
body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123"
request = self._mock_request(body, headers={"X-Twilio-Signature": sig})
resp = await adapter._handle_webhook(request)
assert resp.status == 200