mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
fix: create AsyncOpenAI lazily in trajectory_compressor to avoid closed event loop (#4013)
The AsyncOpenAI client was created once at __init__ and stored as an instance attribute. process_directory() calls asyncio.run() which creates and closes a fresh event loop. On a second call, the client's httpx transport is still bound to the closed loop, raising RuntimeError: "Event loop is closed" — the same pattern fixed by PR #3398 for the main agent loop. Create the client lazily in _get_async_client() so each asyncio.run() gets a client bound to the current loop. Co-authored-by: binhnt92 <binhnt.ht.92@gmail.com>
This commit is contained in:
115
tests/test_trajectory_compressor_async.py
Normal file
115
tests/test_trajectory_compressor_async.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
"""Tests for trajectory_compressor AsyncOpenAI event loop binding.
|
||||||
|
|
||||||
|
The AsyncOpenAI client was created once at __init__ time and stored as an
|
||||||
|
instance attribute. When process_directory() calls asyncio.run() — which
|
||||||
|
creates and closes a fresh event loop — the client's internal httpx
|
||||||
|
transport remains bound to the now-closed loop. A second call to
|
||||||
|
process_directory() would fail with "Event loop is closed".
|
||||||
|
|
||||||
|
The fix creates the AsyncOpenAI client lazily via _get_async_client() so
|
||||||
|
each asyncio.run() gets a client bound to the current loop.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import types
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
class TestAsyncClientLazyCreation:
|
||||||
|
"""trajectory_compressor.py — _get_async_client()"""
|
||||||
|
|
||||||
|
def test_async_client_none_after_init(self):
|
||||||
|
"""async_client should be None after __init__ (not eagerly created)."""
|
||||||
|
from trajectory_compressor import TrajectoryCompressor
|
||||||
|
|
||||||
|
comp = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
||||||
|
comp.config = MagicMock()
|
||||||
|
comp.config.base_url = "https://api.example.com/v1"
|
||||||
|
comp.config.api_key_env = "TEST_API_KEY"
|
||||||
|
comp._use_call_llm = False
|
||||||
|
comp.async_client = None
|
||||||
|
comp._async_client_api_key = "test-key"
|
||||||
|
|
||||||
|
assert comp.async_client is None
|
||||||
|
|
||||||
|
def test_get_async_client_creates_new_client(self):
|
||||||
|
"""_get_async_client() should create a fresh AsyncOpenAI instance."""
|
||||||
|
from trajectory_compressor import TrajectoryCompressor
|
||||||
|
|
||||||
|
comp = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
||||||
|
comp.config = MagicMock()
|
||||||
|
comp.config.base_url = "https://api.example.com/v1"
|
||||||
|
comp._async_client_api_key = "test-key"
|
||||||
|
comp.async_client = None
|
||||||
|
|
||||||
|
mock_async_openai = MagicMock()
|
||||||
|
with patch("openai.AsyncOpenAI", mock_async_openai):
|
||||||
|
client = comp._get_async_client()
|
||||||
|
|
||||||
|
mock_async_openai.assert_called_once_with(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://api.example.com/v1",
|
||||||
|
)
|
||||||
|
assert comp.async_client is not None
|
||||||
|
|
||||||
|
def test_get_async_client_creates_fresh_each_call(self):
|
||||||
|
"""Each call to _get_async_client() creates a NEW client instance,
|
||||||
|
so it binds to the current event loop."""
|
||||||
|
from trajectory_compressor import TrajectoryCompressor
|
||||||
|
|
||||||
|
comp = TrajectoryCompressor.__new__(TrajectoryCompressor)
|
||||||
|
comp.config = MagicMock()
|
||||||
|
comp.config.base_url = "https://api.example.com/v1"
|
||||||
|
comp._async_client_api_key = "test-key"
|
||||||
|
comp.async_client = None
|
||||||
|
|
||||||
|
call_count = 0
|
||||||
|
instances = []
|
||||||
|
|
||||||
|
def mock_constructor(**kwargs):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
instance = MagicMock()
|
||||||
|
instances.append(instance)
|
||||||
|
return instance
|
||||||
|
|
||||||
|
with patch("openai.AsyncOpenAI", side_effect=mock_constructor):
|
||||||
|
client1 = comp._get_async_client()
|
||||||
|
client2 = comp._get_async_client()
|
||||||
|
|
||||||
|
# Should have created two separate instances
|
||||||
|
assert call_count == 2
|
||||||
|
assert instances[0] is not instances[1]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSourceLineVerification:
|
||||||
|
"""Verify the actual source has the lazy pattern applied."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _read_file() -> str:
|
||||||
|
import os
|
||||||
|
base = os.path.dirname(os.path.dirname(__file__))
|
||||||
|
with open(os.path.join(base, "trajectory_compressor.py")) as f:
|
||||||
|
return f.read()
|
||||||
|
|
||||||
|
def test_no_eager_async_openai_in_init(self):
|
||||||
|
"""__init__ should NOT create AsyncOpenAI eagerly."""
|
||||||
|
src = self._read_file()
|
||||||
|
# The old pattern: self.async_client = AsyncOpenAI(...) in _init_summarizer
|
||||||
|
# should not exist — only self.async_client = None
|
||||||
|
lines = src.split("\n")
|
||||||
|
for i, line in enumerate(lines, 1):
|
||||||
|
if "self.async_client = AsyncOpenAI(" in line and "_get_async_client" not in lines[max(0,i-3):i+1]:
|
||||||
|
# Allow it inside _get_async_client method
|
||||||
|
# Check if we're inside _get_async_client by looking at context
|
||||||
|
context = "\n".join(lines[max(0,i-10):i+1])
|
||||||
|
if "_get_async_client" not in context:
|
||||||
|
pytest.fail(
|
||||||
|
f"Line {i}: AsyncOpenAI created eagerly outside _get_async_client()"
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_get_async_client_method_exists(self):
|
||||||
|
"""_get_async_client method should exist."""
|
||||||
|
src = self._read_file()
|
||||||
|
assert "def _get_async_client(self)" in src
|
||||||
@@ -375,15 +375,34 @@ class TrajectoryCompressor:
|
|||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Missing API key. Set {self.config.api_key_env} "
|
f"Missing API key. Set {self.config.api_key_env} "
|
||||||
f"environment variable.")
|
f"environment variable.")
|
||||||
from openai import OpenAI, AsyncOpenAI
|
from openai import OpenAI
|
||||||
self.client = OpenAI(
|
self.client = OpenAI(
|
||||||
api_key=api_key, base_url=self.config.base_url)
|
api_key=api_key, base_url=self.config.base_url)
|
||||||
self.async_client = AsyncOpenAI(
|
# AsyncOpenAI is created lazily in _get_async_client() so it
|
||||||
api_key=api_key, base_url=self.config.base_url)
|
# binds to the current event loop — avoids "Event loop is closed"
|
||||||
|
# when process_directory() is called multiple times (each call
|
||||||
|
# creates a new loop via asyncio.run()).
|
||||||
|
self.async_client = None
|
||||||
|
self._async_client_api_key = api_key
|
||||||
|
|
||||||
print(f"✅ Initialized summarizer client: {self.config.summarization_model}")
|
print(f"✅ Initialized summarizer client: {self.config.summarization_model}")
|
||||||
print(f" Max concurrent requests: {self.config.max_concurrent_requests}")
|
print(f" Max concurrent requests: {self.config.max_concurrent_requests}")
|
||||||
|
|
||||||
|
def _get_async_client(self):
|
||||||
|
"""Return an AsyncOpenAI client bound to the current event loop.
|
||||||
|
|
||||||
|
Created lazily so that each ``asyncio.run()`` call in
|
||||||
|
``process_directory()`` gets a client tied to its own loop,
|
||||||
|
avoiding "Event loop is closed" errors on repeated calls.
|
||||||
|
"""
|
||||||
|
from openai import AsyncOpenAI
|
||||||
|
# Always create a fresh client so it binds to the running loop.
|
||||||
|
self.async_client = AsyncOpenAI(
|
||||||
|
api_key=self._async_client_api_key,
|
||||||
|
base_url=self.config.base_url,
|
||||||
|
)
|
||||||
|
return self.async_client
|
||||||
|
|
||||||
def _detect_provider(self) -> str:
|
def _detect_provider(self) -> str:
|
||||||
"""Detect the provider name from the configured base_url."""
|
"""Detect the provider name from the configured base_url."""
|
||||||
url = (self.config.base_url or "").lower()
|
url = (self.config.base_url or "").lower()
|
||||||
@@ -615,7 +634,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
|
|||||||
max_tokens=self.config.summary_target_tokens * 2,
|
max_tokens=self.config.summary_target_tokens * 2,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
response = await self.async_client.chat.completions.create(
|
response = await self._get_async_client().chat.completions.create(
|
||||||
model=self.config.summarization_model,
|
model=self.config.summarization_model,
|
||||||
messages=[{"role": "user", "content": prompt}],
|
messages=[{"role": "user", "content": prompt}],
|
||||||
temperature=self.config.temperature,
|
temperature=self.config.temperature,
|
||||||
|
|||||||
Reference in New Issue
Block a user