Compare commits

...

2 Commits

Author SHA1 Message Date
teknium1
434e998956 test(webhook): regression tests for empty-secret hot-reload skip (#8306)
Five tests under TestDynamicRouteSecretValidation cover the cases the
salvage of #8306 fixes:

- empty 'secret' string with no global -> skipped
- missing 'secret' key with no global -> skipped
- missing 'secret' key WITH global -> falls back to global, admitted
- empty 'secret' string WITH global -> still skipped (dict.get returns ''
  when key is present and empty, NOT the default)
- 'INSECURE_NO_AUTH' as secret -> still admitted (explicit opt-in)
2026-05-20 23:50:21 -07:00
memosr
578f8c15be fix(security): validate secret in _reload_dynamic_routes to prevent HMAC bypass 2026-05-20 23:42:42 -07:00
2 changed files with 87 additions and 5 deletions

View File

@@ -308,11 +308,26 @@ class WebhookAdapter(BasePlatformAdapter):
data = json.loads(subs_path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return
# Merge: static routes take precedence over dynamic ones
self._dynamic_routes = {
k: v for k, v in data.items()
if k not in self._static_routes
}
# Merge: static routes take precedence over dynamic ones.
# Reject any dynamic route whose effective secret is empty —
# an empty secret would cause _handle_webhook to skip HMAC
# validation entirely, letting unauthenticated callers in.
new_dynamic: Dict[str, dict] = {}
for k, v in data.items():
if k in self._static_routes:
continue
effective_secret = v.get("secret", self._global_secret)
if not effective_secret:
logger.warning(
"[webhook] Dynamic route '%s' skipped: 'secret' is "
"missing or empty. Set a valid HMAC secret, or use "
"'%s' to explicitly disable auth (testing only).",
k,
_INSECURE_NO_AUTH,
)
continue
new_dynamic[k] = v
self._dynamic_routes = new_dynamic
self._routes = {**self._dynamic_routes, **self._static_routes}
self._dynamic_routes_mtime = mtime
logger.info(

View File

@@ -85,3 +85,70 @@ class TestDynamicRouteLoading:
adapter._reload_dynamic_routes()
assert "static" in adapter._routes
assert len(adapter._dynamic_routes) == 0
class TestDynamicRouteSecretValidation:
"""Regression tests for #8306 — hot-reloaded dynamic routes with empty
or missing secrets must be rejected so the per-request HMAC check in
``_handle_webhook`` can never see a falsy secret and skip validation."""
def test_empty_secret_string_rejected(self, tmp_path, caplog):
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"monitor": {"secret": "", "prompt": "test"}})
)
# No global secret: _global_secret falls back to None, so the route
# has no effective secret at all.
config = PlatformConfig(
enabled=True,
extra={"routes": {"static": {"secret": "s"}}},
)
adapter = WebhookAdapter(config)
adapter._reload_dynamic_routes()
assert "monitor" not in adapter._routes
assert "monitor" not in adapter._dynamic_routes
def test_missing_secret_key_rejected_without_global(self, tmp_path):
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"hook-a": {"prompt": "test"}})
)
config = PlatformConfig(
enabled=True,
extra={"routes": {"static": {"secret": "s"}}},
)
adapter = WebhookAdapter(config)
adapter._reload_dynamic_routes()
assert "hook-a" not in adapter._routes
assert "hook-a" not in adapter._dynamic_routes
def test_missing_secret_falls_back_to_global(self, tmp_path):
# When 'secret' key is absent, dict.get() returns the global default,
# so a route with no per-route secret is still admitted when a global
# secret is configured.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"hook-b": {"prompt": "test"}})
)
adapter = _make_adapter() # _make_adapter sets a global secret
adapter._reload_dynamic_routes()
assert "hook-b" in adapter._dynamic_routes
def test_empty_per_route_secret_rejected_even_with_global(self, tmp_path):
# Critical case: dict.get('secret', default) returns '' when the key
# is present and empty, NOT the default. So even with a global secret
# set, an empty per-route secret must still be skipped.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"sneaky": {"secret": "", "prompt": "test"}})
)
adapter = _make_adapter() # has a global secret
adapter._reload_dynamic_routes()
assert "sneaky" not in adapter._dynamic_routes
def test_insecure_no_auth_secret_still_admitted(self, tmp_path):
# INSECURE_NO_AUTH is the explicit opt-in for unauthenticated routes
# (loopback-only enforcement happens at startup). Hot-reload must keep
# it working — it's a non-empty string so the validator accepts it.
(tmp_path / _DYNAMIC_ROUTES_FILENAME).write_text(
json.dumps({"local-test": {"secret": "INSECURE_NO_AUTH", "prompt": "t"}})
)
adapter = _make_adapter()
adapter._reload_dynamic_routes()
assert "local-test" in adapter._dynamic_routes