From 9b55365f6f0191ac21eee371f8197454ca6f69fb Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Mon, 27 Apr 2026 07:41:42 -0700 Subject: [PATCH] fix(gateway,cron): close ephemeral agents + reap stale aux clients (salvage #13979) (#16598) * fix: clean gateway auxiliary client caches on teardown * fix(gateway): recover from stale pid files and close cron agents Two issues were keeping the gateway from surviving long runs: 1. `_cleanup_invalid_pid_path` delegated to `remove_pid_file`, which refuses to unlink when the file's pid differs from our own. That safety check exists for the --replace atexit handoff, but it also applied to stale-record cleanup, so after a crashy exit the pid file was orphaned: `write_pid_file()`'s O_EXCL create then failed with `FileExistsError`, and systemd looped on "PID file race lost to another gateway instance". Unlink unconditionally from this helper since the caller has already verified the record is dead. 2. The cron scheduler never closed the ephemeral `AIAgent` it creates per tick, and never swept the process-global auxiliary-client cache. Over days of 10-minute ticks this leaked subprocesses and async httpx transports until the gateway hit EMFILE. Release the agent and call `cleanup_stale_async_clients()` in `run_job`'s outer `finally`, matching the gateway's own per-turn cleanup. * chore(release): map bloodcarter@gmail.com -> bloodcarter --------- Co-authored-by: bloodcarter --- cron/scheduler.py | 20 +++++++ gateway/run.py | 22 ++++++++ scripts/release.py | 1 + tests/cron/test_scheduler.py | 73 ++++++++++++++++++++++++++ tests/gateway/test_gateway_shutdown.py | 19 ++++++- tests/gateway/test_status.py | 23 ++++++++ 6 files changed, 157 insertions(+), 1 deletion(-) diff --git a/cron/scheduler.py b/cron/scheduler.py index 12dae811fd..21ec8dbdec 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -822,6 +822,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) + agent = None + # Mark this as a cron session so the approval system can apply cron_mode. # This env var is process-wide and persists for the lifetime of the # scheduler process — every job this process runs is a cron job. @@ -1170,6 +1172,24 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: _session_db.close() except (Exception, KeyboardInterrupt) as e: logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e) + # Release subprocesses, terminal sandboxes, browser daemons, and the + # main OpenAI/httpx client held by this ephemeral cron agent. Without + # this, a gateway that ticks cron every N minutes leaks fds per job + # until it hits EMFILE (#10200 / "too many open files"). + try: + if agent is not None: + agent.close() + except (Exception, KeyboardInterrupt) as e: + logger.debug("Job '%s': failed to close agent resources: %s", job_id, e) + # Each cron run spins up a short-lived worker thread whose event loop + # dies as soon as the ``ThreadPoolExecutor`` shuts down. Any async + # httpx clients cached under that loop are now unusable — reap them + # so their transports don't accumulate in the process-global cache. + try: + from agent.auxiliary_client import cleanup_stale_async_clients + cleanup_stale_async_clients() + except Exception as e: + logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) def tick(verbose: bool = True, adapters=None, loop=None) -> int: diff --git a/gateway/run.py b/gateway/run.py index 8bdd6c4160..b50bbc5851 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1968,6 +1968,15 @@ class GatewayRunner: agent.close() except Exception: pass + # Auxiliary async clients (session_search/web/vision/etc.) live in a + # process-global cache and are created inside worker threads. Clean up + # any entries whose event loop is now dead so their httpx transports do + # not accumulate across gateway turns. + try: + from agent.auxiliary_client import cleanup_stale_async_clients + cleanup_stale_async_clients() + except Exception: + pass _STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend _STUCK_LOOP_FILE = ".restart_failure_counts" @@ -2931,6 +2940,19 @@ class GatewayRunner: # disconnect (defense in depth; safe to call repeatedly). _kill_tool_subprocesses("final-cleanup") + # Reap the process-global auxiliary-client cache once at the very + # end of teardown. Per-turn cleanup runs in _cleanup_agent_resources + # for each active agent, but clients bound to worker-thread loops + # that died with their ThreadPoolExecutor (notably cron ticks) only + # get swept here. Without this, long-running gateways accumulate + # async httpx transports until they hit EMFILE on macOS's default + # RLIMIT_NOFILE=256. See #14210. + try: + from agent.auxiliary_client import shutdown_cached_clients + shutdown_cached_clients() + except Exception as _e: + logger.debug("shutdown_cached_clients error: %s", _e) + # Close SQLite session DBs so the WAL write lock is released. # Without this, --replace and similar restart flows leave the # old gateway's connection holding the WAL lock until Python diff --git a/scripts/release.py b/scripts/release.py index f85e51ad13..c1dac5bb23 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -63,6 +63,7 @@ AUTHOR_MAP = { "yoimexex@gmail.com": "Yoimex", "6548898+romanornr@users.noreply.github.com": "romanornr", "foxion37@gmail.com": "foxion37", + "bloodcarter@gmail.com": "bloodcarter", # contributors (from noreply pattern) "david.vv@icloud.com": "davidvv", "wangqiang@wangqiangdeMac-mini.local": "xiaoqiang243", diff --git a/tests/cron/test_scheduler.py b/tests/cron/test_scheduler.py index 4cd4b7cd75..25f707efd8 100644 --- a/tests/cron/test_scheduler.py +++ b/tests/cron/test_scheduler.py @@ -672,6 +672,79 @@ class TestRunJobSessionPersistence: assert call_args[0][0].startswith("cron_test-job_") assert call_args[0][1] == "cron_complete" fake_db.close.assert_called_once() + mock_agent.close.assert_called_once() + + def test_run_job_closes_agent_on_failure_to_prevent_fd_leak(self, tmp_path): + # Regression: if ``run_conversation`` raises, the ephemeral cron + # agent was previously leaked — over days of ticks this accumulated + # httpx transports and hit EMFILE / "too many open files". + job = { + "id": "failing-job", + "name": "failing", + "prompt": "hello", + } + fake_db = MagicMock() + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("run_agent.AIAgent") as mock_agent_cls: + mock_agent = MagicMock() + mock_agent.run_conversation.side_effect = RuntimeError("boom") + mock_agent_cls.return_value = mock_agent + + success, output, final_response, error = run_job(job) + + assert success is False + assert final_response == "" + assert "RuntimeError: boom" in error + mock_agent.close.assert_called_once() + + def test_run_job_reaps_stale_auxiliary_clients_per_tick(self, tmp_path): + # Regression: auxiliary clients bound to the cron worker's dead + # event loop must be reaped each tick. Without this, ``_client_cache`` + # holds onto transports whose underlying sockets can no longer be + # closed (their loop is gone), leaking one fd batch per cron run. + job = { + "id": "aux-clean-job", + "name": "aux-clean", + "prompt": "hello", + } + fake_db = MagicMock() + + with patch("cron.scheduler._hermes_home", tmp_path), \ + patch("cron.scheduler._resolve_origin", return_value=None), \ + patch("dotenv.load_dotenv"), \ + patch("hermes_state.SessionDB", return_value=fake_db), \ + patch( + "hermes_cli.runtime_provider.resolve_runtime_provider", + return_value={ + "api_key": "***", + "base_url": "https://example.invalid/v1", + "provider": "openrouter", + "api_mode": "chat_completions", + }, + ), \ + patch("run_agent.AIAgent") as mock_agent_cls, \ + patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock: + mock_agent = MagicMock() + mock_agent.run_conversation.return_value = {"final_response": "ok"} + mock_agent_cls.return_value = mock_agent + + success, _output, _final_response, _error = run_job(job) + + assert success is True + cleanup_mock.assert_called_once() def _make_run_job_patches(self, tmp_path): """Common patches for run_job tests.""" diff --git a/tests/gateway/test_gateway_shutdown.py b/tests/gateway/test_gateway_shutdown.py index 137ddfd036..d12fac14bb 100644 --- a/tests/gateway/test_gateway_shutdown.py +++ b/tests/gateway/test_gateway_shutdown.py @@ -35,6 +35,18 @@ async def test_cancel_background_tasks_cancels_inflight_message_processing(): assert adapter._pending_messages == {} +def test_cleanup_agent_resources_reaps_stale_aux_clients(): + runner, _adapter = make_restart_runner() + agent = MagicMock() + + with patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock: + runner._cleanup_agent_resources(agent) + + agent.shutdown_memory_provider.assert_called_once() + agent.close.assert_called_once() + cleanup_mock.assert_called_once() + + @pytest.mark.asyncio async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks(): runner, adapter = make_restart_runner() @@ -60,11 +72,16 @@ async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks( running_agent = MagicMock() runner._running_agents = {session_key: running_agent} - with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"): + with ( + patch("gateway.status.remove_pid_file"), + patch("gateway.status.write_runtime_status"), + patch("agent.auxiliary_client.shutdown_cached_clients") as shutdown_cached_clients, + ): await runner.stop() running_agent.interrupt.assert_called_once_with("Gateway shutting down") disconnect_mock.assert_awaited_once() + shutdown_cached_clients.assert_called_once() assert runner.adapters == {} assert runner._running_agents == {} assert runner._pending_messages == {} diff --git a/tests/gateway/test_status.py b/tests/gateway/test_status.py index e91bb6e419..e56b2107e5 100644 --- a/tests/gateway/test_status.py +++ b/tests/gateway/test_status.py @@ -51,6 +51,29 @@ class TestGatewayPidState: assert status.get_running_pid() is None assert not pid_path.exists() + def test_get_running_pid_cleans_stale_record_from_dead_process(self, tmp_path, monkeypatch): + # Simulates the aftermath of a crash: the PID file still points at a + # process that no longer exists. The next gateway startup must be + # able to unlink it so ``write_pid_file``'s O_EXCL create succeeds — + # otherwise systemd's restart loop hits "PID file race lost" forever. + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + pid_path = tmp_path / "gateway.pid" + dead_pid = 999999 # not our pid, and below we simulate it's dead + pid_path.write_text(json.dumps({ + "pid": dead_pid, + "kind": "hermes-gateway", + "argv": ["python", "-m", "hermes_cli.main", "gateway", "run"], + "start_time": 111, + })) + + def _dead_process(pid, sig): + raise ProcessLookupError + + monkeypatch.setattr(status.os, "kill", _dead_process) + + assert status.get_running_pid() is None + assert not pid_path.exists() + def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch): monkeypatch.setenv("HERMES_HOME", str(tmp_path)) pid_path = tmp_path / "gateway.pid"