diff --git a/tests/test_trajectory_compressor_async.py b/tests/test_trajectory_compressor_async.py new file mode 100644 index 0000000000..2b276d03d0 --- /dev/null +++ b/tests/test_trajectory_compressor_async.py @@ -0,0 +1,115 @@ +"""Tests for trajectory_compressor AsyncOpenAI event loop binding. + +The AsyncOpenAI client was created once at __init__ time and stored as an +instance attribute. When process_directory() calls asyncio.run() — which +creates and closes a fresh event loop — the client's internal httpx +transport remains bound to the now-closed loop. A second call to +process_directory() would fail with "Event loop is closed". + +The fix creates the AsyncOpenAI client lazily via _get_async_client() so +each asyncio.run() gets a client bound to the current loop. +""" + +import types +from unittest.mock import MagicMock, patch + +import pytest + + +class TestAsyncClientLazyCreation: + """trajectory_compressor.py — _get_async_client()""" + + def test_async_client_none_after_init(self): + """async_client should be None after __init__ (not eagerly created).""" + from trajectory_compressor import TrajectoryCompressor + + comp = TrajectoryCompressor.__new__(TrajectoryCompressor) + comp.config = MagicMock() + comp.config.base_url = "https://api.example.com/v1" + comp.config.api_key_env = "TEST_API_KEY" + comp._use_call_llm = False + comp.async_client = None + comp._async_client_api_key = "test-key" + + assert comp.async_client is None + + def test_get_async_client_creates_new_client(self): + """_get_async_client() should create a fresh AsyncOpenAI instance.""" + from trajectory_compressor import TrajectoryCompressor + + comp = TrajectoryCompressor.__new__(TrajectoryCompressor) + comp.config = MagicMock() + comp.config.base_url = "https://api.example.com/v1" + comp._async_client_api_key = "test-key" + comp.async_client = None + + mock_async_openai = MagicMock() + with patch("openai.AsyncOpenAI", mock_async_openai): + client = comp._get_async_client() + + mock_async_openai.assert_called_once_with( + api_key="test-key", + base_url="https://api.example.com/v1", + ) + assert comp.async_client is not None + + def test_get_async_client_creates_fresh_each_call(self): + """Each call to _get_async_client() creates a NEW client instance, + so it binds to the current event loop.""" + from trajectory_compressor import TrajectoryCompressor + + comp = TrajectoryCompressor.__new__(TrajectoryCompressor) + comp.config = MagicMock() + comp.config.base_url = "https://api.example.com/v1" + comp._async_client_api_key = "test-key" + comp.async_client = None + + call_count = 0 + instances = [] + + def mock_constructor(**kwargs): + nonlocal call_count + call_count += 1 + instance = MagicMock() + instances.append(instance) + return instance + + with patch("openai.AsyncOpenAI", side_effect=mock_constructor): + client1 = comp._get_async_client() + client2 = comp._get_async_client() + + # Should have created two separate instances + assert call_count == 2 + assert instances[0] is not instances[1] + + +class TestSourceLineVerification: + """Verify the actual source has the lazy pattern applied.""" + + @staticmethod + def _read_file() -> str: + import os + base = os.path.dirname(os.path.dirname(__file__)) + with open(os.path.join(base, "trajectory_compressor.py")) as f: + return f.read() + + def test_no_eager_async_openai_in_init(self): + """__init__ should NOT create AsyncOpenAI eagerly.""" + src = self._read_file() + # The old pattern: self.async_client = AsyncOpenAI(...) in _init_summarizer + # should not exist — only self.async_client = None + lines = src.split("\n") + for i, line in enumerate(lines, 1): + if "self.async_client = AsyncOpenAI(" in line and "_get_async_client" not in lines[max(0,i-3):i+1]: + # Allow it inside _get_async_client method + # Check if we're inside _get_async_client by looking at context + context = "\n".join(lines[max(0,i-10):i+1]) + if "_get_async_client" not in context: + pytest.fail( + f"Line {i}: AsyncOpenAI created eagerly outside _get_async_client()" + ) + + def test_get_async_client_method_exists(self): + """_get_async_client method should exist.""" + src = self._read_file() + assert "def _get_async_client(self)" in src diff --git a/trajectory_compressor.py b/trajectory_compressor.py index fd69cd18a6..2dfdda7af3 100644 --- a/trajectory_compressor.py +++ b/trajectory_compressor.py @@ -375,15 +375,34 @@ class TrajectoryCompressor: raise RuntimeError( f"Missing API key. Set {self.config.api_key_env} " f"environment variable.") - from openai import OpenAI, AsyncOpenAI + from openai import OpenAI self.client = OpenAI( api_key=api_key, base_url=self.config.base_url) - self.async_client = AsyncOpenAI( - api_key=api_key, base_url=self.config.base_url) + # AsyncOpenAI is created lazily in _get_async_client() so it + # binds to the current event loop — avoids "Event loop is closed" + # when process_directory() is called multiple times (each call + # creates a new loop via asyncio.run()). + self.async_client = None + self._async_client_api_key = api_key print(f"✅ Initialized summarizer client: {self.config.summarization_model}") print(f" Max concurrent requests: {self.config.max_concurrent_requests}") + def _get_async_client(self): + """Return an AsyncOpenAI client bound to the current event loop. + + Created lazily so that each ``asyncio.run()`` call in + ``process_directory()`` gets a client tied to its own loop, + avoiding "Event loop is closed" errors on repeated calls. + """ + from openai import AsyncOpenAI + # Always create a fresh client so it binds to the running loop. + self.async_client = AsyncOpenAI( + api_key=self._async_client_api_key, + base_url=self.config.base_url, + ) + return self.async_client + def _detect_provider(self) -> str: """Detect the provider name from the configured base_url.""" url = (self.config.base_url or "").lower() @@ -615,7 +634,7 @@ Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.""" max_tokens=self.config.summary_target_tokens * 2, ) else: - response = await self.async_client.chat.completions.create( + response = await self._get_async_client().chat.completions.create( model=self.config.summarization_model, messages=[{"role": "user", "content": prompt}], temperature=self.config.temperature,