From ad1e8804a60e4548dc125bb2bb64a9c178aba3f0 Mon Sep 17 00:00:00 2001 From: Mariano Nicolini Date: Sat, 11 Apr 2026 15:15:33 -0300 Subject: [PATCH] handle port variants in Twilio signatures --- gateway/platforms/sms.py | 46 ++++++++++++++++++++++++++++ tests/gateway/test_sms.py | 63 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+) diff --git a/gateway/platforms/sms.py b/gateway/platforms/sms.py index bdd64d1791..5bc66d8818 100644 --- a/gateway/platforms/sms.py +++ b/gateway/platforms/sms.py @@ -208,8 +208,24 @@ class SmsAdapter(BasePlatformAdapter): ) -> bool: """Validate ``X-Twilio-Signature`` header (HMAC-SHA1, base64). + Tries both with and without the default port for the URL scheme, + since Twilio may sign with either variant. + Algorithm: https://www.twilio.com/docs/usage/security#validating-requests """ + if self._check_signature(url, post_params, signature): + return True + + variant = self._port_variant_url(url) + if variant and self._check_signature(variant, post_params, signature): + return True + + return False + + def _check_signature( + self, url: str, post_params: dict, signature: str, + ) -> bool: + """Compute and compare a single Twilio signature.""" data_to_sign = url for key in sorted(post_params.keys()): data_to_sign += key + post_params[key] @@ -221,6 +237,36 @@ class SmsAdapter(BasePlatformAdapter): computed = base64.b64encode(mac.digest()).decode("utf-8") return hmac.compare_digest(computed, signature) + @staticmethod + def _port_variant_url(url: str) -> str | None: + """Return the URL with the default port toggled, or None. + + Only toggles default ports (443 for https, 80 for http). + Non-standard ports are never modified. + """ + parsed = urllib.parse.urlparse(url) + default_ports = {"https": 443, "http": 80} + default_port = default_ports.get(parsed.scheme) + if default_port is None: + return None + + if parsed.port == default_port: + # Has explicit default port → strip it + return urllib.parse.urlunparse( + (parsed.scheme, parsed.hostname, parsed.path, + parsed.params, parsed.query, parsed.fragment) + ) + elif parsed.port is None: + # No port → add default + netloc = f"{parsed.hostname}:{default_port}" + return urllib.parse.urlunparse( + (parsed.scheme, netloc, parsed.path, + parsed.params, parsed.query, parsed.fragment) + ) + + # Non-standard port — no variant + return None + # ------------------------------------------------------------------ # Twilio webhook handler # ------------------------------------------------------------------ diff --git a/tests/gateway/test_sms.py b/tests/gateway/test_sms.py index cfe06df983..670e506932 100644 --- a/tests/gateway/test_sms.py +++ b/tests/gateway/test_sms.py @@ -348,6 +348,50 @@ class TestTwilioSignatureValidation: "https://b.com/webhooks/twilio", params, sig ) is False + def test_port_variant_443_matches_without_port(self): + """Signature for https URL with :443 validates against URL without port.""" + adapter = self._make_adapter() + params = {"From": "+15551234567", "Body": "hello"} + sig = _compute_twilio_signature( + "test_token_secret", "https://example.com:443/webhooks/twilio", params + ) + assert adapter._validate_twilio_signature( + "https://example.com/webhooks/twilio", params, sig + ) is True + + def test_port_variant_without_port_matches_443(self): + """Signature for https URL without port validates against URL with :443.""" + adapter = self._make_adapter() + params = {"From": "+15551234567", "Body": "hello"} + sig = _compute_twilio_signature( + "test_token_secret", "https://example.com/webhooks/twilio", params + ) + assert adapter._validate_twilio_signature( + "https://example.com:443/webhooks/twilio", params, sig + ) is True + + def test_non_standard_port_no_variant(self): + """Non-standard port must NOT match URL without port.""" + adapter = self._make_adapter() + params = {"From": "+15551234567", "Body": "hello"} + sig = _compute_twilio_signature( + "test_token_secret", "https://example.com/webhooks/twilio", params + ) + assert adapter._validate_twilio_signature( + "https://example.com:8080/webhooks/twilio", params, sig + ) is False + + def test_port_variant_http_80(self): + """Port variant also works for http with port 80.""" + adapter = self._make_adapter() + params = {"From": "+15551234567", "Body": "hello"} + sig = _compute_twilio_signature( + "test_token_secret", "http://example.com:80/webhooks/twilio", params + ) + assert adapter._validate_twilio_signature( + "http://example.com/webhooks/twilio", params, sig + ) is True + # ── Webhook signature enforcement (handler-level) ────────────────── @@ -415,3 +459,22 @@ class TestWebhookSignatureEnforcement: request = self._mock_request(body, headers={"X-Twilio-Signature": sig}) resp = await adapter._handle_webhook(request) assert resp.status == 200 + + @pytest.mark.asyncio + async def test_port_variant_signature_returns_200(self): + """Signature computed with :443 should pass when URL configured without port.""" + webhook_url = "https://example.com/webhooks/twilio" + adapter = self._make_adapter(webhook_url=webhook_url) + params = { + "From": "+15551234567", + "To": "+15550001111", + "Body": "hello", + "MessageSid": "SM123", + } + sig = _compute_twilio_signature( + "test_token_secret", "https://example.com:443/webhooks/twilio", params + ) + body = b"From=%2B15551234567&To=%2B15550001111&Body=hello&MessageSid=SM123" + request = self._mock_request(body, headers={"X-Twilio-Signature": sig}) + resp = await adapter._handle_webhook(request) + assert resp.status == 200