fix(gateway): drain pending messages via fresh task, not recursion (#17758)
`_process_message_background` finished a turn, found a queued
follow-up, and drained it via `await
self._process_message_background(pending_event, session_key)`. Each
chained follow-up added a frame to the call stack instead of starting
fresh. Under sustained pending-queue activity (e.g. a user sending
follow-ups faster than the agent finishes turns) the C stack would
exhaust at ~2000 nested frames and SIGSEGV the process.
Mirror the late-arrival drain pattern that already exists in the same
function: spawn a new `asyncio.create_task(...)` for the pending event
and return so the current frame can unwind. The new task takes
ownership via `_session_tasks[session_key]`.
The late-arrival drain in `finally` could now race with the in-band
drain across the `await typing_task` / `await stop_typing` window, so
add a guard: if `_session_tasks[session_key]` is no longer the current
task, an in-band drain already spawned a follow-up task — re-queue the
late-arrival event so that task picks it up after its current event,
instead of spawning a second concurrent task for the same session_key.
Regression test (`test_pending_drain_no_recursion.py`) chains 12
follow-ups and asserts the recorded
`_process_message_background` stack depth stays bounded at handler
entry. Pre-fix: depths grow linearly `[1,2,3,…,12]`. Post-fix: all
depths are `1`.
`test_duplicate_reply_suppression::test_stale_response_suppressed_when_interrupted`
called `_process_message_background` directly and implicitly relied on
the old recursive `await` semantic — updated to wait for the spawned
drain task before checking the sent list.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 22:19:00 -07:00
|
|
|
"""Regression test for #17758 — chained pending-message drains must not
|
|
|
|
|
grow the call stack.
|
|
|
|
|
|
|
|
|
|
Before the fix, ``_process_message_background`` finished a turn, found a
|
|
|
|
|
pending follow-up, and drained it via ``await
|
|
|
|
|
self._process_message_background(pending_event, session_key)``. Each
|
|
|
|
|
queued follow-up added a frame to the call stack instead of starting
|
|
|
|
|
fresh, so under sustained pending-queue activity the C stack would
|
|
|
|
|
exhaust at ~2000 nested frames and the process would crash with
|
|
|
|
|
SIGSEGV.
|
|
|
|
|
|
|
|
|
|
After the fix, the in-band drain spawns a fresh task (mirroring the
|
|
|
|
|
late-arrival drain pattern), so the stack stays bounded regardless of
|
|
|
|
|
chain length.
|
|
|
|
|
|
|
|
|
|
We assert the invariant directly: count nested
|
|
|
|
|
``_process_message_background`` frames at handler entry across a chain
|
|
|
|
|
of N follow-ups. Recursion makes depth grow linearly (1, 2, 3, …, N);
|
|
|
|
|
task spawning keeps it constant (1 every time).
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import sys
|
fix(gateway): preserve session guard across in-band drain handoff
When the in-band pending-message drain spawns a fresh task and
transfers ownership via _session_tasks[session_key] = drain_task,
the original task still unwinds through the finally block. The
drain task picks up the same interrupt_event in its own
_process_message_background entry, so an unconditional
_release_session_guard(session_key, guard=interrupt_event) at the
end of the finally matches and deletes _active_sessions[session_key]
while the drain task is still pending its first await.
A concurrent inbound message arriving in that handoff window passes
the Level-1 guard (no entry exists) and spawns a second
_process_message_background for the same session — two agents on
one session_key, duplicate responses, duplicate tool calls.
Fix: only call _release_session_guard when the current task still
owns _session_tasks[session_key]. When ownership has been
transferred to a drain task, leave _active_sessions populated; the
drain task's own lifecycle releases it. This mirrors the
late-arrival drain path in the same finally block, which already
leaves both entries alone after handing off.
Also reorder stdlib imports in the new regression test file to
match the gateway test convention (stdlib before third-party).
Regression test: capture _active_sessions[sk] identity at every
handler entry across a 2-step in-band drain chain and assert the
guard Event identity stays the same. Pre-fix, the original task's
finally deletes the entry, the drain task falls through to the
`or asyncio.Event()` branch, and a fresh Event is installed —
identity diverges. Post-fix, the entry is preserved and the drain
task reuses the original Event.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 23:12:08 -07:00
|
|
|
from unittest.mock import AsyncMock
|
fix(gateway): drain pending messages via fresh task, not recursion (#17758)
`_process_message_background` finished a turn, found a queued
follow-up, and drained it via `await
self._process_message_background(pending_event, session_key)`. Each
chained follow-up added a frame to the call stack instead of starting
fresh. Under sustained pending-queue activity (e.g. a user sending
follow-ups faster than the agent finishes turns) the C stack would
exhaust at ~2000 nested frames and SIGSEGV the process.
Mirror the late-arrival drain pattern that already exists in the same
function: spawn a new `asyncio.create_task(...)` for the pending event
and return so the current frame can unwind. The new task takes
ownership via `_session_tasks[session_key]`.
The late-arrival drain in `finally` could now race with the in-band
drain across the `await typing_task` / `await stop_typing` window, so
add a guard: if `_session_tasks[session_key]` is no longer the current
task, an in-band drain already spawned a follow-up task — re-queue the
late-arrival event so that task picks it up after its current event,
instead of spawning a second concurrent task for the same session_key.
Regression test (`test_pending_drain_no_recursion.py`) chains 12
follow-ups and asserts the recorded
`_process_message_background` stack depth stays bounded at handler
entry. Pre-fix: depths grow linearly `[1,2,3,…,12]`. Post-fix: all
depths are `1`.
`test_duplicate_reply_suppression::test_stale_response_suppressed_when_interrupted`
called `_process_message_background` directly and implicitly relied on
the old recursive `await` semantic — updated to wait for the spawned
drain task before checking the sent list.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 22:19:00 -07:00
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
|
|
|
from gateway.platforms.base import (
|
|
|
|
|
BasePlatformAdapter,
|
|
|
|
|
MessageEvent,
|
|
|
|
|
MessageType,
|
|
|
|
|
)
|
|
|
|
|
from gateway.session import SessionSource, build_session_key
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _StubAdapter(BasePlatformAdapter):
|
|
|
|
|
async def connect(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def disconnect(self):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
async def send(self, chat_id, text, **kwargs):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
async def get_chat_info(self, chat_id):
|
|
|
|
|
return {}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_adapter():
|
|
|
|
|
adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM)
|
|
|
|
|
adapter._send_with_retry = AsyncMock(return_value=None)
|
|
|
|
|
return adapter
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _make_event(text="hi", chat_id="42"):
|
|
|
|
|
return MessageEvent(
|
|
|
|
|
text=text,
|
|
|
|
|
message_type=MessageType.TEXT,
|
|
|
|
|
source=SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _sk(chat_id="42"):
|
|
|
|
|
return build_session_key(
|
|
|
|
|
SessionSource(platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _count_pmb_frames() -> int:
|
|
|
|
|
"""Walk the current call stack and count nested
|
|
|
|
|
``_process_message_background`` frames. Used to detect recursive
|
|
|
|
|
in-band drains."""
|
|
|
|
|
f = sys._getframe()
|
|
|
|
|
n = 0
|
|
|
|
|
while f is not None:
|
|
|
|
|
if f.f_code.co_name == "_process_message_background":
|
|
|
|
|
n += 1
|
|
|
|
|
f = f.f_back
|
|
|
|
|
return n
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_in_band_drain_does_not_grow_stack():
|
|
|
|
|
"""Issue #17758: chained pending-message drains must not recurse.
|
|
|
|
|
|
|
|
|
|
Queue a fresh pending message inside each handler invocation so the
|
|
|
|
|
in-band drain block fires for every turn in the chain. After N
|
|
|
|
|
turns, the recorded stack depth at handler entry must stay bounded.
|
|
|
|
|
Pre-fix, depths would be 1, 2, 3, …, N; post-fix, depths are 1
|
|
|
|
|
every time because each drain runs in its own task.
|
|
|
|
|
"""
|
|
|
|
|
N = 12
|
|
|
|
|
adapter = _make_adapter()
|
|
|
|
|
sk = _sk()
|
|
|
|
|
|
|
|
|
|
depths: list[int] = []
|
|
|
|
|
next_index = [1]
|
|
|
|
|
|
|
|
|
|
async def handler(event):
|
|
|
|
|
depths.append(_count_pmb_frames())
|
|
|
|
|
if next_index[0] < N:
|
|
|
|
|
adapter._pending_messages[sk] = _make_event(text=f"M{next_index[0]}")
|
|
|
|
|
next_index[0] += 1
|
|
|
|
|
return "ok"
|
|
|
|
|
|
|
|
|
|
adapter._message_handler = handler
|
|
|
|
|
|
|
|
|
|
await adapter.handle_message(_make_event(text="M0"))
|
|
|
|
|
|
|
|
|
|
# Drain the chain. Each turn schedules the next via the in-band
|
|
|
|
|
# drain block, so we wait until N handler runs have completed and
|
|
|
|
|
# the session has been released.
|
|
|
|
|
for _ in range(400):
|
|
|
|
|
if len(depths) >= N and sk not in adapter._active_sessions:
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
await adapter.cancel_background_tasks()
|
|
|
|
|
|
|
|
|
|
assert len(depths) == N, (
|
|
|
|
|
f"expected {N} handler runs in the chain, got {len(depths)}: depths={depths!r}"
|
|
|
|
|
)
|
|
|
|
|
max_depth = max(depths)
|
|
|
|
|
assert max_depth <= 2, (
|
|
|
|
|
f"in-band drain is recursing instead of spawning a fresh task — "
|
|
|
|
|
f"stack depth grew with chain length: {depths!r}"
|
|
|
|
|
)
|
fix(gateway): preserve session guard across in-band drain handoff
When the in-band pending-message drain spawns a fresh task and
transfers ownership via _session_tasks[session_key] = drain_task,
the original task still unwinds through the finally block. The
drain task picks up the same interrupt_event in its own
_process_message_background entry, so an unconditional
_release_session_guard(session_key, guard=interrupt_event) at the
end of the finally matches and deletes _active_sessions[session_key]
while the drain task is still pending its first await.
A concurrent inbound message arriving in that handoff window passes
the Level-1 guard (no entry exists) and spawns a second
_process_message_background for the same session — two agents on
one session_key, duplicate responses, duplicate tool calls.
Fix: only call _release_session_guard when the current task still
owns _session_tasks[session_key]. When ownership has been
transferred to a drain task, leave _active_sessions populated; the
drain task's own lifecycle releases it. This mirrors the
late-arrival drain path in the same finally block, which already
leaves both entries alone after handing off.
Also reorder stdlib imports in the new regression test file to
match the gateway test convention (stdlib before third-party).
Regression test: capture _active_sessions[sk] identity at every
handler entry across a 2-step in-band drain chain and assert the
guard Event identity stays the same. Pre-fix, the original task's
finally deletes the entry, the drain task falls through to the
`or asyncio.Event()` branch, and a fresh Event is installed —
identity diverges. Post-fix, the entry is preserved and the drain
task reuses the original Event.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 23:12:08 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_in_band_drain_preserves_active_session_guard():
|
|
|
|
|
"""The original task must NOT release ``_active_sessions[session_key]``
|
|
|
|
|
after handing off to the drain task.
|
|
|
|
|
|
|
|
|
|
When the in-band drain spawns ``drain_task`` and transfers ownership
|
|
|
|
|
via ``_session_tasks[session_key] = drain_task``, the original task
|
|
|
|
|
still unwinds through the ``finally`` block. The drain task picks
|
|
|
|
|
up the same ``interrupt_event`` in its own
|
|
|
|
|
``_process_message_background`` entry, so a naive
|
|
|
|
|
``_release_session_guard(session_key, guard=interrupt_event)`` in
|
|
|
|
|
the unwind matches and deletes ``_active_sessions[session_key]``.
|
|
|
|
|
That briefly reopens the Level-1 guard between the original task's
|
|
|
|
|
finally and the drain task's first await — a concurrent inbound
|
|
|
|
|
arriving in that window passes the guard and spawns a second
|
|
|
|
|
handler for the same session.
|
|
|
|
|
|
|
|
|
|
Invariant: ``_active_sessions[sk]`` must hold the SAME interrupt
|
|
|
|
|
Event identity at every handler entry across an in-band drain
|
|
|
|
|
chain. Pre-fix, the original task's finally deletes the entry, so
|
|
|
|
|
the drain task falls through to the ``or asyncio.Event()`` branch
|
|
|
|
|
in ``_process_message_background`` and installs a *new* Event —
|
|
|
|
|
the identity diverges. Post-fix, the entry is preserved across
|
|
|
|
|
handoff and the drain task reuses the original Event.
|
|
|
|
|
"""
|
|
|
|
|
adapter = _make_adapter()
|
|
|
|
|
sk = _sk()
|
|
|
|
|
|
|
|
|
|
seen_guards: list = []
|
|
|
|
|
|
|
|
|
|
async def handler(event):
|
|
|
|
|
seen_guards.append(adapter._active_sessions.get(sk))
|
|
|
|
|
if len(seen_guards) == 1:
|
|
|
|
|
adapter._pending_messages[sk] = _make_event(text="M1")
|
|
|
|
|
return "ok"
|
|
|
|
|
|
|
|
|
|
adapter._message_handler = handler
|
|
|
|
|
|
|
|
|
|
await adapter.handle_message(_make_event(text="M0"))
|
|
|
|
|
|
|
|
|
|
for _ in range(400):
|
|
|
|
|
if len(seen_guards) >= 2 and sk not in adapter._active_sessions:
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
await adapter.cancel_background_tasks()
|
|
|
|
|
|
|
|
|
|
assert len(seen_guards) == 2, f"expected 2 handler runs, got {len(seen_guards)}"
|
|
|
|
|
assert seen_guards[0] is not None, "M0 saw no active-session guard"
|
|
|
|
|
assert seen_guards[1] is not None, "M1 saw no active-session guard"
|
|
|
|
|
assert seen_guards[0] is seen_guards[1], (
|
|
|
|
|
"in-band drain handoff replaced the active-session guard — the "
|
|
|
|
|
"original task's finally deleted _active_sessions[sk] and the "
|
|
|
|
|
"drain task installed a new Event. Concurrent inbounds during "
|
|
|
|
|
"the handoff window would bypass the Level-1 guard and spawn a "
|
|
|
|
|
"second handler for the same session."
|
|
|
|
|
)
|
2026-04-30 04:54:42 -07:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Follow-up guardrails (belt-and-suspenders on top of the #17758 fix).
|
|
|
|
|
#
|
|
|
|
|
# The in-band drain hand-off changed cleanup semantics in three subtle ways
|
|
|
|
|
# that the original fix reasoned about but didn't test directly. These
|
|
|
|
|
# tests pin each invariant so future refactors can't silently regress them.
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_normal_path_releases_session_guard():
|
|
|
|
|
"""The common path — one message, nothing queued — must still
|
|
|
|
|
fully release ``_active_sessions[sk]`` and ``_session_tasks[sk]``
|
|
|
|
|
through the end-of-finally block.
|
|
|
|
|
|
|
|
|
|
The #17758 fix moved ``_release_session_guard(...)`` under an
|
|
|
|
|
``if current_task is self._session_tasks.get(session_key)``
|
|
|
|
|
conditional. For the 99%-common case (no pending message, no
|
|
|
|
|
handoff) ``current_task`` IS the stored task, so the guard must
|
|
|
|
|
still fire. This test would fail if the conditional were ever
|
|
|
|
|
tightened in a way that dropped the normal path."""
|
|
|
|
|
adapter = _make_adapter()
|
|
|
|
|
sk = _sk()
|
|
|
|
|
|
|
|
|
|
async def handler(event):
|
|
|
|
|
return "ok"
|
|
|
|
|
|
|
|
|
|
adapter._message_handler = handler
|
|
|
|
|
|
|
|
|
|
await adapter.handle_message(_make_event(text="solo"))
|
|
|
|
|
|
|
|
|
|
# Wait for the single-shot handler to fully unwind.
|
|
|
|
|
for _ in range(200):
|
|
|
|
|
if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
await adapter.cancel_background_tasks()
|
|
|
|
|
|
|
|
|
|
assert sk not in adapter._active_sessions, (
|
|
|
|
|
"normal-path unwind left _active_sessions[sk] populated — future "
|
|
|
|
|
"messages would take the busy-handler path forever"
|
|
|
|
|
)
|
|
|
|
|
assert sk not in adapter._session_tasks, (
|
|
|
|
|
"normal-path unwind left _session_tasks[sk] populated — "
|
|
|
|
|
"stale-lock detection will treat a dead task as alive"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_drain_task_cancellation_releases_session():
|
|
|
|
|
"""If the in-band drain task is cancelled (e.g. user sent ``/stop``
|
|
|
|
|
mid-drain), the session guard and task registry must still get
|
|
|
|
|
cleaned up — the cancelled drain task's own ``finally`` runs and
|
|
|
|
|
fires ``_release_session_guard``.
|
|
|
|
|
|
|
|
|
|
The #17758 fix transfers ownership of ``_session_tasks[sk]`` to
|
|
|
|
|
the drain task; the drain task's ``except asyncio.CancelledError``
|
|
|
|
|
branch must then own the cleanup. Without this test a future
|
|
|
|
|
refactor could move cancellation handling in a way that leaves
|
|
|
|
|
the session permanently pinned as busy after a cancel."""
|
|
|
|
|
adapter = _make_adapter()
|
|
|
|
|
sk = _sk()
|
|
|
|
|
|
|
|
|
|
turn_started = asyncio.Event()
|
|
|
|
|
drain_hit_handler = asyncio.Event()
|
|
|
|
|
|
|
|
|
|
async def handler(event):
|
|
|
|
|
if event.text == "M0":
|
|
|
|
|
# Queue a pending follow-up so an in-band drain task gets spawned.
|
|
|
|
|
adapter._pending_messages[sk] = _make_event(text="M1")
|
|
|
|
|
turn_started.set()
|
|
|
|
|
return "ok"
|
|
|
|
|
# M1 is the drained follow-up — hang so we can cancel the drain task.
|
|
|
|
|
drain_hit_handler.set()
|
|
|
|
|
try:
|
|
|
|
|
await asyncio.sleep(10)
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
adapter._message_handler = handler
|
|
|
|
|
|
|
|
|
|
await adapter.handle_message(_make_event(text="M0"))
|
|
|
|
|
|
|
|
|
|
# Wait for the drain task to actually start running M1.
|
|
|
|
|
await asyncio.wait_for(drain_hit_handler.wait(), timeout=2)
|
|
|
|
|
|
|
|
|
|
# Cancel the drain task mid-handler.
|
|
|
|
|
drain_task = adapter._session_tasks.get(sk)
|
|
|
|
|
assert drain_task is not None, "in-band drain did not install a drain task"
|
|
|
|
|
assert not drain_task.done(), "drain task finished before we could cancel"
|
|
|
|
|
drain_task.cancel()
|
|
|
|
|
|
|
|
|
|
# Drain task's finally must release both registries.
|
|
|
|
|
for _ in range(200):
|
|
|
|
|
if sk not in adapter._active_sessions and sk not in adapter._session_tasks:
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
await adapter.cancel_background_tasks()
|
|
|
|
|
|
|
|
|
|
assert sk not in adapter._active_sessions, (
|
|
|
|
|
"cancelled drain task did not release _active_sessions[sk] — "
|
|
|
|
|
"the session stays permanently pinned as busy after a /stop mid-drain"
|
|
|
|
|
)
|
|
|
|
|
assert sk not in adapter._session_tasks, (
|
|
|
|
|
"cancelled drain task did not release _session_tasks[sk] — "
|
|
|
|
|
"stale-lock detection will treat the dead task as alive"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
|
async def test_late_arrival_drain_still_fires_when_no_in_band_drain():
|
|
|
|
|
"""The late-arrival drain in ``finally`` must still spawn a fresh
|
|
|
|
|
task when no in-band drain preceded it.
|
|
|
|
|
|
|
|
|
|
Pre-#17758 this path already existed; the #17758 follow-up guard
|
|
|
|
|
only re-queues when ``_session_tasks[sk] is not current_task``.
|
|
|
|
|
For a late-arrival with no in-band drain, ``_session_tasks[sk]``
|
|
|
|
|
IS the current task, so the ``else`` branch must fire and spawn
|
|
|
|
|
a drain task for the queued message.
|
|
|
|
|
|
|
|
|
|
Queue a pending message *after* M0's handler returns (so the
|
|
|
|
|
in-band drain block sees nothing) but *before* ``finally`` runs
|
|
|
|
|
the late-arrival check — we do this by hooking ``_stop_typing``,
|
|
|
|
|
which runs in finally before the late-arrival check."""
|
|
|
|
|
adapter = _make_adapter()
|
|
|
|
|
sk = _sk()
|
|
|
|
|
|
|
|
|
|
results: list[str] = []
|
|
|
|
|
original_stop_typing = getattr(adapter, "stop_typing", None)
|
|
|
|
|
|
|
|
|
|
async def injecting_stop_typing(chat_id):
|
|
|
|
|
# Simulate a message landing during the cleanup awaits.
|
|
|
|
|
adapter._pending_messages[sk] = _make_event(text="late")
|
|
|
|
|
if original_stop_typing:
|
|
|
|
|
await original_stop_typing(chat_id)
|
|
|
|
|
|
|
|
|
|
adapter.stop_typing = injecting_stop_typing
|
|
|
|
|
|
|
|
|
|
async def handler(event):
|
|
|
|
|
results.append(event.text)
|
|
|
|
|
return "ok"
|
|
|
|
|
|
|
|
|
|
adapter._message_handler = handler
|
|
|
|
|
|
|
|
|
|
await adapter.handle_message(_make_event(text="first"))
|
|
|
|
|
|
|
|
|
|
# Wait for the late-arrival drain task to finish the second event.
|
|
|
|
|
for _ in range(400):
|
|
|
|
|
if "late" in results and sk not in adapter._active_sessions:
|
|
|
|
|
break
|
|
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
|
|
|
|
await adapter.cancel_background_tasks()
|
|
|
|
|
|
|
|
|
|
assert "first" in results, "original message handler did not run"
|
|
|
|
|
assert "late" in results, (
|
|
|
|
|
"late-arrival drain did not spawn a drain task — a message that "
|
|
|
|
|
"landed during cleanup awaits was silently dropped"
|
|
|
|
|
)
|