fix(gateway,cron): close ephemeral agents + reap stale aux clients (salvage #13979) (#16598)

* fix: clean gateway auxiliary client caches on teardown

* fix(gateway): recover from stale pid files and close cron agents

Two issues were keeping the gateway from surviving long runs:

1. `_cleanup_invalid_pid_path` delegated to `remove_pid_file`, which
   refuses to unlink when the file's pid differs from our own. That
   safety check exists for the --replace atexit handoff, but it also
   applied to stale-record cleanup, so after a crashy exit the pid
   file was orphaned: `write_pid_file()`'s O_EXCL create then failed
   with `FileExistsError`, and systemd looped on "PID file race lost
   to another gateway instance". Unlink unconditionally from this
   helper since the caller has already verified the record is dead.

2. The cron scheduler never closed the ephemeral `AIAgent` it creates
   per tick, and never swept the process-global auxiliary-client
   cache. Over days of 10-minute ticks this leaked subprocesses and
   async httpx transports until the gateway hit EMFILE. Release the
   agent and call `cleanup_stale_async_clients()` in `run_job`'s
   outer `finally`, matching the gateway's own per-turn cleanup.

* chore(release): map bloodcarter@gmail.com -> bloodcarter

---------

Co-authored-by: bloodcarter <bloodcarter@gmail.com>
This commit is contained in:
Teknium
2026-04-27 07:41:42 -07:00
committed by GitHub
parent ac0325c257
commit 9b55365f6f
6 changed files with 157 additions and 1 deletions

View File

@@ -822,6 +822,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
logger.info("Prompt: %s", prompt[:100])
agent = None
# Mark this as a cron session so the approval system can apply cron_mode.
# This env var is process-wide and persists for the lifetime of the
# scheduler process — every job this process runs is a cron job.
@@ -1170,6 +1172,24 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_session_db.close()
except (Exception, KeyboardInterrupt) as e:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
# Release subprocesses, terminal sandboxes, browser daemons, and the
# main OpenAI/httpx client held by this ephemeral cron agent. Without
# this, a gateway that ticks cron every N minutes leaks fds per job
# until it hits EMFILE (#10200 / "too many open files").
try:
if agent is not None:
agent.close()
except (Exception, KeyboardInterrupt) as e:
logger.debug("Job '%s': failed to close agent resources: %s", job_id, e)
# Each cron run spins up a short-lived worker thread whose event loop
# dies as soon as the ``ThreadPoolExecutor`` shuts down. Any async
# httpx clients cached under that loop are now unusable — reap them
# so their transports don't accumulate in the process-global cache.
try:
from agent.auxiliary_client import cleanup_stale_async_clients
cleanup_stale_async_clients()
except Exception as e:
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:

View File

@@ -1968,6 +1968,15 @@ class GatewayRunner:
agent.close()
except Exception:
pass
# Auxiliary async clients (session_search/web/vision/etc.) live in a
# process-global cache and are created inside worker threads. Clean up
# any entries whose event loop is now dead so their httpx transports do
# not accumulate across gateway turns.
try:
from agent.auxiliary_client import cleanup_stale_async_clients
cleanup_stale_async_clients()
except Exception:
pass
_STUCK_LOOP_THRESHOLD = 3 # restarts while active before auto-suspend
_STUCK_LOOP_FILE = ".restart_failure_counts"
@@ -2931,6 +2940,19 @@ class GatewayRunner:
# disconnect (defense in depth; safe to call repeatedly).
_kill_tool_subprocesses("final-cleanup")
# Reap the process-global auxiliary-client cache once at the very
# end of teardown. Per-turn cleanup runs in _cleanup_agent_resources
# for each active agent, but clients bound to worker-thread loops
# that died with their ThreadPoolExecutor (notably cron ticks) only
# get swept here. Without this, long-running gateways accumulate
# async httpx transports until they hit EMFILE on macOS's default
# RLIMIT_NOFILE=256. See #14210.
try:
from agent.auxiliary_client import shutdown_cached_clients
shutdown_cached_clients()
except Exception as _e:
logger.debug("shutdown_cached_clients error: %s", _e)
# Close SQLite session DBs so the WAL write lock is released.
# Without this, --replace and similar restart flows leave the
# old gateway's connection holding the WAL lock until Python

View File

@@ -63,6 +63,7 @@ AUTHOR_MAP = {
"yoimexex@gmail.com": "Yoimex",
"6548898+romanornr@users.noreply.github.com": "romanornr",
"foxion37@gmail.com": "foxion37",
"bloodcarter@gmail.com": "bloodcarter",
# contributors (from noreply pattern)
"david.vv@icloud.com": "davidvv",
"wangqiang@wangqiangdeMac-mini.local": "xiaoqiang243",

View File

@@ -672,6 +672,79 @@ class TestRunJobSessionPersistence:
assert call_args[0][0].startswith("cron_test-job_")
assert call_args[0][1] == "cron_complete"
fake_db.close.assert_called_once()
mock_agent.close.assert_called_once()
def test_run_job_closes_agent_on_failure_to_prevent_fd_leak(self, tmp_path):
# Regression: if ``run_conversation`` raises, the ephemeral cron
# agent was previously leaked — over days of ticks this accumulated
# httpx transports and hit EMFILE / "too many open files".
job = {
"id": "failing-job",
"name": "failing",
"prompt": "hello",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_agent = MagicMock()
mock_agent.run_conversation.side_effect = RuntimeError("boom")
mock_agent_cls.return_value = mock_agent
success, output, final_response, error = run_job(job)
assert success is False
assert final_response == ""
assert "RuntimeError: boom" in error
mock_agent.close.assert_called_once()
def test_run_job_reaps_stale_auxiliary_clients_per_tick(self, tmp_path):
# Regression: auxiliary clients bound to the cron worker's dead
# event loop must be reaped each tick. Without this, ``_client_cache``
# holds onto transports whose underlying sockets can no longer be
# closed (their loop is gone), leaking one fd batch per cron run.
job = {
"id": "aux-clean-job",
"name": "aux-clean",
"prompt": "hello",
}
fake_db = MagicMock()
with patch("cron.scheduler._hermes_home", tmp_path), \
patch("cron.scheduler._resolve_origin", return_value=None), \
patch("dotenv.load_dotenv"), \
patch("hermes_state.SessionDB", return_value=fake_db), \
patch(
"hermes_cli.runtime_provider.resolve_runtime_provider",
return_value={
"api_key": "***",
"base_url": "https://example.invalid/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
},
), \
patch("run_agent.AIAgent") as mock_agent_cls, \
patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock:
mock_agent = MagicMock()
mock_agent.run_conversation.return_value = {"final_response": "ok"}
mock_agent_cls.return_value = mock_agent
success, _output, _final_response, _error = run_job(job)
assert success is True
cleanup_mock.assert_called_once()
def _make_run_job_patches(self, tmp_path):
"""Common patches for run_job tests."""

View File

@@ -35,6 +35,18 @@ async def test_cancel_background_tasks_cancels_inflight_message_processing():
assert adapter._pending_messages == {}
def test_cleanup_agent_resources_reaps_stale_aux_clients():
runner, _adapter = make_restart_runner()
agent = MagicMock()
with patch("agent.auxiliary_client.cleanup_stale_async_clients") as cleanup_mock:
runner._cleanup_agent_resources(agent)
agent.shutdown_memory_provider.assert_called_once()
agent.close.assert_called_once()
cleanup_mock.assert_called_once()
@pytest.mark.asyncio
async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks():
runner, adapter = make_restart_runner()
@@ -60,11 +72,16 @@ async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks(
running_agent = MagicMock()
runner._running_agents = {session_key: running_agent}
with patch("gateway.status.remove_pid_file"), patch("gateway.status.write_runtime_status"):
with (
patch("gateway.status.remove_pid_file"),
patch("gateway.status.write_runtime_status"),
patch("agent.auxiliary_client.shutdown_cached_clients") as shutdown_cached_clients,
):
await runner.stop()
running_agent.interrupt.assert_called_once_with("Gateway shutting down")
disconnect_mock.assert_awaited_once()
shutdown_cached_clients.assert_called_once()
assert runner.adapters == {}
assert runner._running_agents == {}
assert runner._pending_messages == {}

View File

@@ -51,6 +51,29 @@ class TestGatewayPidState:
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_cleans_stale_record_from_dead_process(self, tmp_path, monkeypatch):
# Simulates the aftermath of a crash: the PID file still points at a
# process that no longer exists. The next gateway startup must be
# able to unlink it so ``write_pid_file``'s O_EXCL create succeeds —
# otherwise systemd's restart loop hits "PID file race lost" forever.
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"
dead_pid = 999999 # not our pid, and below we simulate it's dead
pid_path.write_text(json.dumps({
"pid": dead_pid,
"kind": "hermes-gateway",
"argv": ["python", "-m", "hermes_cli.main", "gateway", "run"],
"start_time": 111,
}))
def _dead_process(pid, sig):
raise ProcessLookupError
monkeypatch.setattr(status.os, "kill", _dead_process)
assert status.get_running_pid() is None
assert not pid_path.exists()
def test_get_running_pid_accepts_gateway_metadata_when_cmdline_unavailable(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
pid_path = tmp_path / "gateway.pid"