From 4aef0558054f332212f40c6cebe5147c6488e311 Mon Sep 17 00:00:00 2001 From: Jeff Escalante <556932+jescalan@users.noreply.github.com> Date: Tue, 7 Apr 2026 11:17:20 -0400 Subject: [PATCH] fix(gateway/webhook): don't pop delivery_info on send MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The webhook adapter stored per-request `deliver`/`deliver_extra` config in `_delivery_info[chat_id]` during POST handling and consumed it via `.pop()` inside `send()`. That worked for routes whose agent run produced exactly one outbound message — the final response — but it broke whenever the agent emitted any interim status message before the final response. Status messages flow through the same `send(chat_id, ...)` path as the final response (see `gateway/run.py::_status_callback_sync` → `adapter.send(...)`). Common triggers include: - "🔄 Primary model failed — switching to fallback: ..." (run_agent.py::_emit_status when `fallback_providers` activates) - context-pressure / compression notices - any other lifecycle event routed through `status_callback` When any of those fired, the first `send()` call popped the entry, so the subsequent final-response `send()` saw an empty dict and silently downgraded `deliver_type` from `"telegram"` (or `discord`/`slack`/etc.) to the default `"log"`. The agent's response was logged to the gateway log instead of being delivered to the configured cross-platform target — no warning, no error, just a missing message. This was easy to hit in practice. Any user with `fallback_providers` configured saw it the first time their primary provider hiccuped on a webhook-triggered run. Routes that worked perfectly in dev (where the primary stays healthy) silently dropped responses in prod. Fix: read `_delivery_info` with `.get()` so multiple `send()` calls for the same `chat_id` all see the same delivery config. To keep the dict bounded without relying on per-send cleanup, add a parallel `_delivery_info_created` timestamp dict and a `_prune_delivery_info()` helper that drops entries older than `_idempotency_ttl` (1h, same window already used by `_seen_deliveries`). Pruning runs on each POST, mirroring the existing `_seen_deliveries` cleanup pattern. Worst-case memory footprint is now `rate_limit * TTL = 30/min * 60min = 1800` entries, each ~1KB → under 2 MB. In practice it'll be far smaller because most webhooks complete in seconds, not the full hour. Test changes: - `test_delivery_info_cleaned_after_send` is replaced with `test_delivery_info_survives_multiple_sends`, which is now the regression test for this bug — it asserts that two consecutive `send()` calls both see the delivery config. - A new `test_delivery_info_pruned_via_ttl` covers the TTL cleanup behavior. - The two integration tests that asserted `chat_id not in adapter._delivery_info` after `send()` now assert the opposite, with a comment explaining why. All 40 tests in `tests/gateway/test_webhook_adapter.py` and `tests/gateway/test_webhook_integration.py` pass. Verified end-to-end locally against a dynamic `hermes webhook subscribe` route configured with `--deliver telegram --deliver-chat-id `: with `gpt-5.4` as the primary (currently flaky) and `claude-opus-4.6` as the fallback, the fallback notification fires, the agent finishes, and the final response is delivered to Telegram as expected. --- gateway/platforms/webhook.py | 44 ++++++++++++++++++--- tests/gateway/test_webhook_adapter.py | 47 ++++++++++++++++++++--- tests/gateway/test_webhook_integration.py | 10 +++-- 3 files changed, 87 insertions(+), 14 deletions(-) diff --git a/gateway/platforms/webhook.py b/gateway/platforms/webhook.py index daaf4f5dc3..6d4885d2b0 100644 --- a/gateway/platforms/webhook.py +++ b/gateway/platforms/webhook.py @@ -76,8 +76,17 @@ class WebhookAdapter(BasePlatformAdapter): self._routes: Dict[str, dict] = dict(self._static_routes) self._runner = None - # Delivery info keyed by session chat_id — consumed by send() + # Delivery info keyed by session chat_id. + # + # Read by every send() invocation for the chat_id (status messages + # AND the final response). Cleaned up via TTL on each POST so the + # dict stays bounded — see _prune_delivery_info(). Do NOT pop on + # send(), or interim status messages (e.g. fallback notifications, + # context-pressure warnings) will consume the entry before the + # final response arrives, causing the response to silently fall + # back to the "log" deliver type. self._delivery_info: Dict[str, dict] = {} + self._delivery_info_created: Dict[str, float] = {} # Reference to gateway runner for cross-platform delivery (set externally) self.gateway_runner = None @@ -160,10 +169,14 @@ class WebhookAdapter(BasePlatformAdapter): ) -> SendResult: """Deliver the agent's response to the configured destination. - chat_id is ``webhook:{route}:{delivery_id}`` — we pop the delivery - info stored during webhook receipt so it doesn't leak memory. + chat_id is ``webhook:{route}:{delivery_id}``. The delivery info + stored during webhook receipt is read with ``.get()`` (not popped) + so that interim status messages emitted before the final response + — fallback-model notifications, context-pressure warnings, etc. — + do not consume the entry and silently downgrade the final response + to the ``log`` deliver type. TTL cleanup happens on POST. """ - delivery = self._delivery_info.pop(chat_id, {}) + delivery = self._delivery_info.get(chat_id, {}) deliver_type = delivery.get("deliver", "log") if deliver_type == "log": @@ -190,6 +203,23 @@ class WebhookAdapter(BasePlatformAdapter): success=False, error=f"Unknown deliver type: {deliver_type}" ) + def _prune_delivery_info(self, now: float) -> None: + """Drop delivery_info entries older than the idempotency TTL. + + Mirrors the cleanup pattern used for ``_seen_deliveries``. Called + on each POST so the dict size is bounded by ``rate_limit * TTL`` + even if many webhooks fire and never receive a final response. + """ + cutoff = now - self._idempotency_ttl + stale = [ + k + for k, t in self._delivery_info_created.items() + if t < cutoff + ] + for k in stale: + self._delivery_info.pop(k, None) + self._delivery_info_created.pop(k, None) + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: return {"name": chat_id, "type": "webhook"} @@ -382,7 +412,9 @@ class WebhookAdapter(BasePlatformAdapter): # same route get independent agent runs (not queued/interrupted). session_chat_id = f"webhook:{route_name}:{delivery_id}" - # Store delivery info for send() — consumed (popped) on delivery + # Store delivery info for send(). Read by every send() invocation + # for this chat_id (interim status messages and the final response), + # so we do NOT pop on send. TTL-based cleanup keeps the dict bounded. deliver_config = { "deliver": route_config.get("deliver", "log"), "deliver_extra": self._render_delivery_extra( @@ -391,6 +423,8 @@ class WebhookAdapter(BasePlatformAdapter): "payload": payload, } self._delivery_info[session_chat_id] = deliver_config + self._delivery_info_created[session_chat_id] = now + self._prune_delivery_info(now) # Build source and event source = self.build_source( diff --git a/tests/gateway/test_webhook_adapter.py b/tests/gateway/test_webhook_adapter.py index f323b95af2..bedf254a15 100644 --- a/tests/gateway/test_webhook_adapter.py +++ b/tests/gateway/test_webhook_adapter.py @@ -590,8 +590,15 @@ class TestSessionIsolation: class TestDeliveryCleanup: @pytest.mark.asyncio - async def test_delivery_info_cleaned_after_send(self): - """send() pops delivery_info so the entry doesn't leak memory.""" + async def test_delivery_info_survives_multiple_sends(self): + """send() must NOT pop delivery_info. + + Interim status messages (fallback notifications, context-pressure + warnings, etc.) flow through the same send() path as the final + response. If the entry were popped on the first send, the final + response would silently downgrade to the ``log`` deliver type. + Regression test for that bug. + """ adapter = _make_adapter() chat_id = "webhook:test:d-xyz" adapter._delivery_info[chat_id] = { @@ -599,10 +606,40 @@ class TestDeliveryCleanup: "deliver_extra": {}, "payload": {"x": 1}, } + adapter._delivery_info_created[chat_id] = time.time() - result = await adapter.send(chat_id, "Agent response here") - assert result.success is True - assert chat_id not in adapter._delivery_info + # First send (e.g. an interim status message) + result1 = await adapter.send(chat_id, "Status: switching to fallback") + assert result1.success is True + # Entry must still be present so the final send can read it + assert chat_id in adapter._delivery_info + + # Second send (the final agent response) + result2 = await adapter.send(chat_id, "Final agent response") + assert result2.success is True + assert chat_id in adapter._delivery_info + + @pytest.mark.asyncio + async def test_delivery_info_pruned_via_ttl(self): + """Stale delivery_info entries are dropped on the next POST.""" + adapter = _make_adapter() + adapter._idempotency_ttl = 60 # short TTL for the test + now = time.time() + + # Stale entry — older than TTL + adapter._delivery_info["webhook:test:old"] = {"deliver": "log"} + adapter._delivery_info_created["webhook:test:old"] = now - 120 + + # Fresh entry — should survive + adapter._delivery_info["webhook:test:new"] = {"deliver": "log"} + adapter._delivery_info_created["webhook:test:new"] = now - 5 + + adapter._prune_delivery_info(now) + + assert "webhook:test:old" not in adapter._delivery_info + assert "webhook:test:old" not in adapter._delivery_info_created + assert "webhook:test:new" in adapter._delivery_info + assert "webhook:test:new" in adapter._delivery_info_created # =================================================================== diff --git a/tests/gateway/test_webhook_integration.py b/tests/gateway/test_webhook_integration.py index 8999898103..5c6fe01111 100644 --- a/tests/gateway/test_webhook_integration.py +++ b/tests/gateway/test_webhook_integration.py @@ -259,8 +259,9 @@ class TestCrossPlatformDelivery: mock_tg_adapter.send.assert_awaited_once_with( "12345", "I've acknowledged the alert.", metadata=None ) - # Delivery info should be cleaned up - assert chat_id not in adapter._delivery_info + # Delivery info is retained after send() so interim status messages + # don't strand the final response (TTL-based cleanup happens on POST). + assert chat_id in adapter._delivery_info # =================================================================== @@ -333,5 +334,6 @@ class TestGitHubCommentDelivery: text=True, timeout=30, ) - # Delivery info cleaned up - assert chat_id not in adapter._delivery_info + # Delivery info is retained after send() so interim status messages + # don't strand the final response (TTL-based cleanup happens on POST). + assert chat_id in adapter._delivery_info