fix: bound auxiliary client cache to prevent fd exhaustion in long-running gateways (#10200) (#10470)

The _client_cache used event loop id() as part of the cache key, so
every new worker-thread event loop created a new entry for the same
provider config.  In long-running gateways where threads are recycled
frequently, this caused unbounded cache growth — each stale entry
held an unclosed AsyncOpenAI client with its httpx connection pool,
eventually exhausting file descriptors.

Fix: remove loop_id from the cache key and instead validate on each
async cache hit that the cached loop is the current, open loop.  If
the loop changed or was closed, the stale entry is replaced in-place
rather than creating an additional entry.  This bounds cache growth
to at most one entry per unique provider config.

Also adds a _CLIENT_CACHE_MAX_SIZE (64) safety belt with FIFO
eviction as defense-in-depth against any remaining unbounded growth.

Cross-loop safety is preserved: different event loops still get
different client instances (validated by existing test suite).

Closes #10200
This commit is contained in:
Teknium
2026-04-15 13:16:28 -07:00
committed by GitHub
parent d1d425e9d0
commit 6391b46779
2 changed files with 171 additions and 21 deletions

View File

@@ -1835,9 +1835,15 @@ def auxiliary_max_tokens_param(value: int) -> dict:
# Every auxiliary LLM consumer should use these instead of manually
# constructing clients and calling .chat.completions.create().
# Client cache: (provider, async_mode, base_url, api_key) -> (client, default_model)
# Client cache: (provider, async_mode, base_url, api_key, api_mode, runtime_key) -> (client, default_model, loop)
# NOTE: loop identity is NOT part of the key. On async cache hits we check
# whether the cached loop is the *current* loop; if not, the stale entry is
# replaced in-place. This bounds cache growth to one entry per unique
# provider config rather than one per (config × event-loop), which previously
# caused unbounded fd accumulation in long-running gateway processes (#10200).
_client_cache: Dict[tuple, tuple] = {}
_client_cache_lock = threading.Lock()
_CLIENT_CACHE_MAX_SIZE = 64 # safety belt — evict oldest when exceeded
def neuter_async_httpx_del() -> None:
@@ -1970,39 +1976,49 @@ def _get_cached_client(
Async clients (AsyncOpenAI) use httpx.AsyncClient internally, which
binds to the event loop that was current when the client was created.
Using such a client on a *different* loop causes deadlocks or
RuntimeError. To prevent cross-loop issues (especially in gateway
mode where _run_async() may spawn fresh loops in worker threads), the
cache key for async clients includes the current event loop's identity
so each loop gets its own client instance.
RuntimeError. To prevent cross-loop issues, the cache validates on
every async hit that the cached loop is the *current, open* loop.
If the loop changed (e.g. a new gateway worker-thread loop), the stale
entry is replaced in-place rather than creating an additional entry.
This keeps cache size bounded to one entry per unique provider config,
preventing the fd-exhaustion that previously occurred in long-running
gateways where recycled worker threads created unbounded entries (#10200).
"""
# Include loop identity for async clients to prevent cross-loop reuse.
# httpx.AsyncClient (inside AsyncOpenAI) is bound to the loop where it
# was created — reusing it on a different loop causes deadlocks (#2681).
loop_id = 0
# Resolve the current event loop for async clients so we can validate
# cached entries. Loop identity is NOT in the cache key — instead we
# check at hit time whether the cached loop is still current and open.
# This prevents unbounded cache growth from recycled worker-thread loops
# while still guaranteeing we never reuse a client on the wrong loop
# (which causes deadlocks, see #2681).
current_loop = None
if async_mode:
try:
import asyncio as _aio
current_loop = _aio.get_event_loop()
loop_id = id(current_loop)
except RuntimeError:
pass
runtime = _normalize_main_runtime(main_runtime)
runtime_key = tuple(runtime.get(field, "") for field in _MAIN_RUNTIME_FIELDS) if provider == "auto" else ()
cache_key = (provider, async_mode, base_url or "", api_key or "", api_mode or "", loop_id, runtime_key)
cache_key = (provider, async_mode, base_url or "", api_key or "", api_mode or "", runtime_key)
with _client_cache_lock:
if cache_key in _client_cache:
cached_client, cached_default, cached_loop = _client_cache[cache_key]
if async_mode:
# A cached async client whose loop has been closed will raise
# "Event loop is closed" when httpx tries to clean up its
# transport. Discard the stale client and create a fresh one.
if cached_loop is not None and cached_loop.is_closed():
_force_close_async_httpx(cached_client)
del _client_cache[cache_key]
else:
# Validate: the cached client must be bound to the CURRENT,
# OPEN loop. If the loop changed or was closed, the httpx
# transport inside is dead — force-close and replace.
loop_ok = (
cached_loop is not None
and cached_loop is current_loop
and not cached_loop.is_closed()
)
if loop_ok:
effective = _compat_model(cached_client, model, cached_default)
return cached_client, effective
# Stale — evict and fall through to create a new client.
_force_close_async_httpx(cached_client)
del _client_cache[cache_key]
else:
effective = _compat_model(cached_client, model, cached_default)
return cached_client, effective
@@ -2022,6 +2038,12 @@ def _get_cached_client(
bound_loop = current_loop
with _client_cache_lock:
if cache_key not in _client_cache:
# Safety belt: if the cache has grown beyond the max, evict
# the oldest entries (FIFO — dict preserves insertion order).
while len(_client_cache) >= _CLIENT_CACHE_MAX_SIZE:
evict_key, evict_entry = next(iter(_client_cache.items()))
_force_close_async_httpx(evict_entry[0])
del _client_cache[evict_key]
_client_cache[cache_key] = (client, default_model, bound_loop)
else:
client, default_model, _ = _client_cache[cache_key]

View File

@@ -103,7 +103,7 @@ class TestCleanupStaleAsyncClients:
mock_client._client = MagicMock()
mock_client._client.is_closed = False
key = ("test_stale", True, "", "", id(loop))
key = ("test_stale", True, "", "", "", ())
with _client_cache_lock:
_client_cache[key] = (mock_client, "test-model", loop)
@@ -127,7 +127,7 @@ class TestCleanupStaleAsyncClients:
loop = asyncio.new_event_loop() # NOT closed
mock_client = MagicMock()
key = ("test_live", True, "", "", id(loop))
key = ("test_live", True, "", "", "", ())
with _client_cache_lock:
_client_cache[key] = (mock_client, "test-model", loop)
@@ -149,7 +149,7 @@ class TestCleanupStaleAsyncClients:
)
mock_client = MagicMock()
key = ("test_sync", False, "", "", 0)
key = ("test_sync", False, "", "", "", ())
with _client_cache_lock:
_client_cache[key] = (mock_client, "test-model", None)
@@ -160,3 +160,131 @@ class TestCleanupStaleAsyncClients:
finally:
with _client_cache_lock:
_client_cache.pop(key, None)
# ---------------------------------------------------------------------------
# Cache bounded growth (#10200)
# ---------------------------------------------------------------------------
class TestClientCacheBoundedGrowth:
"""Verify the cache stays bounded when loops change (fix for #10200).
Previously, loop_id was part of the cache key, so every new event loop
created a new entry for the same provider config. Now loop identity is
validated at hit time and stale entries are replaced in-place.
"""
def test_same_key_replaces_stale_loop_entry(self):
"""When the loop changes, the old entry should be replaced, not duplicated."""
from agent.auxiliary_client import (
_client_cache,
_client_cache_lock,
_get_cached_client,
)
key = ("test_replace", True, "", "", "", ())
# Simulate a stale entry from a closed loop
old_loop = asyncio.new_event_loop()
old_loop.close()
old_client = MagicMock()
old_client._client = MagicMock()
old_client._client.is_closed = False
with _client_cache_lock:
_client_cache[key] = (old_client, "old-model", old_loop)
try:
# Now call _get_cached_client — should detect stale loop and evict
with patch("agent.auxiliary_client.resolve_provider_client") as mock_resolve:
mock_resolve.return_value = (MagicMock(), "new-model")
client, model = _get_cached_client(
"test_replace", async_mode=True,
)
# The old entry should have been replaced
with _client_cache_lock:
assert key in _client_cache, "Key should still exist (replaced)"
entry = _client_cache[key]
assert entry[1] == "new-model", "Should have the new model"
finally:
with _client_cache_lock:
_client_cache.pop(key, None)
def test_different_loops_do_not_grow_cache(self):
"""Multiple event loops for the same provider should NOT create multiple entries."""
from agent.auxiliary_client import (
_client_cache,
_client_cache_lock,
)
key = ("test_no_grow", True, "", "", "", ())
loops = []
try:
for i in range(5):
loop = asyncio.new_event_loop()
loops.append(loop)
mock_client = MagicMock()
mock_client._client = MagicMock()
mock_client._client.is_closed = False
# Close previous loop entries (simulating worker thread recycling)
if i > 0:
loops[i - 1].close()
with _client_cache_lock:
# Simulate what _get_cached_client does: replace on loop mismatch
if key in _client_cache:
old_entry = _client_cache[key]
del _client_cache[key]
_client_cache[key] = (mock_client, f"model-{i}", loop)
# Only one entry should exist for this key
with _client_cache_lock:
count = sum(1 for k in _client_cache if k == key)
assert count == 1, f"Expected 1 entry, got {count}"
finally:
for loop in loops:
if not loop.is_closed():
loop.close()
with _client_cache_lock:
_client_cache.pop(key, None)
def test_max_cache_size_eviction(self):
"""Cache should not exceed _CLIENT_CACHE_MAX_SIZE."""
from agent.auxiliary_client import (
_client_cache,
_client_cache_lock,
_CLIENT_CACHE_MAX_SIZE,
)
# Save existing cache state
with _client_cache_lock:
saved = dict(_client_cache)
_client_cache.clear()
try:
# Fill to max + 5
for i in range(_CLIENT_CACHE_MAX_SIZE + 5):
mock_client = MagicMock()
mock_client._client = MagicMock()
mock_client._client.is_closed = False
key = (f"evict_test_{i}", False, "", "", "", ())
with _client_cache_lock:
# Inline the eviction logic (same as _get_cached_client)
while len(_client_cache) >= _CLIENT_CACHE_MAX_SIZE:
evict_key = next(iter(_client_cache))
del _client_cache[evict_key]
_client_cache[key] = (mock_client, f"model-{i}", None)
with _client_cache_lock:
assert len(_client_cache) <= _CLIENT_CACHE_MAX_SIZE, \
f"Cache size {len(_client_cache)} exceeds max {_CLIENT_CACHE_MAX_SIZE}"
# The earliest entries should have been evicted
assert ("evict_test_0", False, "", "", "", ()) not in _client_cache
# The latest entries should be present
assert (f"evict_test_{_CLIENT_CACHE_MAX_SIZE + 4}", False, "", "", "", ()) in _client_cache
finally:
with _client_cache_lock:
_client_cache.clear()
_client_cache.update(saved)