mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
fix(cron): reap orphaned MCP stdio subprocesses after each tick
MCP stdio servers are spawned via the SDK's stdio_client, which on Linux uses start_new_session=True (setsid). When a cron job is cancelled mid-way (timeout, agent finish, exception), the subprocess often escapes the SDK's teardown and survives as a session leader. Because setsid() detaches the child from the gateway's process group / cgroup tree, systemd does not reap it on service restart either — so every cron tick that touches an MCP tool leaks a dangling server process. Fix: * tools/mcp_tool.py — _run_stdio now wraps the whole stdio+session context in try/finally. On any exit path (clean, exception, cancellation), PIDs still alive are moved from the active _stdio_pids set into a new _orphan_stdio_pids set. Orphan detection is done via os.kill(pid, 0) — a cheap liveness probe that never signals the target. * tools/mcp_tool.py — _kill_orphaned_mcp_children gains an include_active=False flag. Default behaviour now only reaps the orphan set so concurrent sessions (other parallel cron jobs or live user chats) are never disrupted. The existing shutdown path passes include_active=True to keep the previous "kill everything" semantics after the MCP loop is stopped. * cron/scheduler.py — the cleanup hook is moved from run_job()'s finally (which would race with parallel siblings after #13021) into tick() after the ThreadPoolExecutor has joined every future. At that point there are no in-flight sessions from this tick, so sweeping the orphan set is always safe. Net effect: zero regression for healthy sessions, and orphan MCP servers no longer accumulate between gateway restarts. Made-with: Cursor
This commit is contained in:
@@ -1308,6 +1308,17 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
_futures.append(_tick_pool.submit(_ctx.run, _process_job, job))
|
||||
_results.extend(f.result() for f in _futures)
|
||||
|
||||
# Best-effort sweep of MCP stdio subprocesses that survived their
|
||||
# session teardown during this tick. Runs AFTER every job has
|
||||
# finished so active sessions (including live user chats) are
|
||||
# never touched — only PIDs explicitly detected as orphans in
|
||||
# tools.mcp_tool._run_stdio's finally block are reaped.
|
||||
try:
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children
|
||||
_kill_orphaned_mcp_children()
|
||||
except Exception as _e:
|
||||
logger.debug("Post-tick MCP orphan cleanup failed: %s", _e)
|
||||
|
||||
return sum(_results)
|
||||
finally:
|
||||
if fcntl:
|
||||
|
||||
@@ -81,37 +81,51 @@ class TestStdioPidTracking:
|
||||
|
||||
def test_kill_orphaned_noop_when_empty(self):
|
||||
"""_kill_orphaned_mcp_children does nothing when no PIDs tracked."""
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock
|
||||
from tools.mcp_tool import (
|
||||
_kill_orphaned_mcp_children,
|
||||
_orphan_stdio_pids,
|
||||
_stdio_pids,
|
||||
_lock,
|
||||
)
|
||||
|
||||
with _lock:
|
||||
_stdio_pids.clear()
|
||||
_orphan_stdio_pids.clear()
|
||||
|
||||
# Should not raise
|
||||
_kill_orphaned_mcp_children()
|
||||
|
||||
def test_kill_orphaned_handles_dead_pids(self):
|
||||
"""_kill_orphaned_mcp_children gracefully handles already-dead PIDs."""
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock
|
||||
from tools.mcp_tool import (
|
||||
_kill_orphaned_mcp_children,
|
||||
_orphan_stdio_pids,
|
||||
_lock,
|
||||
)
|
||||
|
||||
# Use a PID that definitely doesn't exist
|
||||
fake_pid = 999999999
|
||||
with _lock:
|
||||
_stdio_pids[fake_pid] = "test"
|
||||
_orphan_stdio_pids.add(fake_pid)
|
||||
|
||||
# Should not raise (ProcessLookupError is caught)
|
||||
_kill_orphaned_mcp_children()
|
||||
|
||||
with _lock:
|
||||
assert fake_pid not in _stdio_pids
|
||||
assert fake_pid not in _orphan_stdio_pids
|
||||
|
||||
def test_kill_orphaned_uses_sigkill_when_available(self, monkeypatch):
|
||||
"""SIGTERM-first then SIGKILL after 2s for orphan cleanup."""
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock
|
||||
from tools.mcp_tool import (
|
||||
_kill_orphaned_mcp_children,
|
||||
_orphan_stdio_pids,
|
||||
_lock,
|
||||
)
|
||||
|
||||
fake_pid = 424242
|
||||
with _lock:
|
||||
_stdio_pids.clear()
|
||||
_stdio_pids[fake_pid] = "test"
|
||||
_orphan_stdio_pids.clear()
|
||||
_orphan_stdio_pids.add(fake_pid)
|
||||
|
||||
fake_sigkill = 9
|
||||
monkeypatch.setattr(signal, "SIGKILL", fake_sigkill, raising=False)
|
||||
@@ -128,16 +142,20 @@ class TestStdioPidTracking:
|
||||
mock_sleep.assert_called_once_with(2)
|
||||
|
||||
with _lock:
|
||||
assert fake_pid not in _stdio_pids
|
||||
assert fake_pid not in _orphan_stdio_pids
|
||||
|
||||
def test_kill_orphaned_falls_back_without_sigkill(self, monkeypatch):
|
||||
"""Without SIGKILL, SIGTERM is used for both phases."""
|
||||
from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock
|
||||
from tools.mcp_tool import (
|
||||
_kill_orphaned_mcp_children,
|
||||
_orphan_stdio_pids,
|
||||
_lock,
|
||||
)
|
||||
|
||||
fake_pid = 434343
|
||||
with _lock:
|
||||
_stdio_pids.clear()
|
||||
_stdio_pids[fake_pid] = "test"
|
||||
_orphan_stdio_pids.clear()
|
||||
_orphan_stdio_pids.add(fake_pid)
|
||||
|
||||
monkeypatch.delattr(signal, "SIGKILL", raising=False)
|
||||
|
||||
@@ -150,7 +168,7 @@ class TestStdioPidTracking:
|
||||
assert mock_sleep.called
|
||||
|
||||
with _lock:
|
||||
assert fake_pid not in _stdio_pids
|
||||
assert fake_pid not in _orphan_stdio_pids
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -1044,33 +1044,51 @@ class MCPServerTask:
|
||||
|
||||
# Snapshot child PIDs before spawning so we can track the new one.
|
||||
pids_before = _snapshot_child_pids()
|
||||
new_pids: set = set()
|
||||
# Redirect subprocess stderr into a shared log file so MCP servers
|
||||
# (FastMCP banners, slack-mcp startup JSON, etc.) don't dump onto
|
||||
# the user's TTY and corrupt the TUI. Preserves debuggability via
|
||||
# ~/.hermes/logs/mcp-stderr.log.
|
||||
_write_stderr_log_header(self.name)
|
||||
_errlog = _get_mcp_stderr_log()
|
||||
async with stdio_client(server_params, errlog=_errlog) as (read_stream, write_stream):
|
||||
# Capture the newly spawned subprocess PID for force-kill cleanup.
|
||||
new_pids = _snapshot_child_pids() - pids_before
|
||||
try:
|
||||
async with stdio_client(server_params, errlog=_errlog) as (
|
||||
read_stream,
|
||||
write_stream,
|
||||
):
|
||||
# Capture the newly spawned subprocess PID for force-kill cleanup.
|
||||
new_pids = _snapshot_child_pids() - pids_before
|
||||
if new_pids:
|
||||
with _lock:
|
||||
for _pid in new_pids:
|
||||
_stdio_pids[_pid] = self.name
|
||||
async with ClientSession(
|
||||
read_stream, write_stream, **sampling_kwargs
|
||||
) as session:
|
||||
await session.initialize()
|
||||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# stdio transport does not use OAuth, but we still honor
|
||||
# _reconnect_event (e.g. future manual /mcp refresh) for
|
||||
# consistency with _run_http.
|
||||
await self._wait_for_lifecycle_event()
|
||||
finally:
|
||||
# Runs on clean exit, exceptions, AND asyncio cancellation.
|
||||
# If any of the spawned PIDs are still alive, the SDK's
|
||||
# teardown failed (common when the task is cancelled mid-way
|
||||
# on Linux, where setsid() children escape the parent cgroup).
|
||||
# Mark them as orphans so the next cleanup sweep can reap them.
|
||||
if new_pids:
|
||||
with _lock:
|
||||
for _pid in new_pids:
|
||||
_stdio_pids[_pid] = self.name
|
||||
async with ClientSession(read_stream, write_stream, **sampling_kwargs) as session:
|
||||
await session.initialize()
|
||||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# stdio transport does not use OAuth, but we still honor
|
||||
# _reconnect_event (e.g. future manual /mcp refresh) for
|
||||
# consistency with _run_http.
|
||||
await self._wait_for_lifecycle_event()
|
||||
# Context exited cleanly — subprocess was terminated by the SDK.
|
||||
if new_pids:
|
||||
with _lock:
|
||||
for _pid in new_pids:
|
||||
_stdio_pids.pop(_pid, None)
|
||||
_stdio_pids.pop(_pid, None)
|
||||
for pid in new_pids:
|
||||
try:
|
||||
os.kill(pid, 0) # signal 0: probe liveness only
|
||||
except (ProcessLookupError, PermissionError, OSError):
|
||||
continue # process already exited — nothing to do
|
||||
_orphan_stdio_pids.add(pid)
|
||||
|
||||
async def _run_http(self, config: dict):
|
||||
"""Run the server using HTTP/StreamableHTTP transport."""
|
||||
@@ -1718,6 +1736,13 @@ _lock = threading.Lock()
|
||||
# normal server shutdown.
|
||||
_stdio_pids: Dict[int, str] = {} # pid -> server_name
|
||||
|
||||
# PIDs that survived their session context exit (SDK teardown failed to
|
||||
# terminate them). These are detected in _run_stdio's finally block and
|
||||
# can be cleaned up asynchronously by _kill_orphaned_mcp_children().
|
||||
# Separate from _stdio_pids so cleanup sweeps never race with active
|
||||
# sessions (e.g. concurrent cron jobs or live user chats).
|
||||
_orphan_stdio_pids: set = set()
|
||||
|
||||
|
||||
def _snapshot_child_pids() -> set:
|
||||
"""Return a set of current child process PIDs.
|
||||
@@ -2959,21 +2984,34 @@ def shutdown_mcp_servers():
|
||||
_stop_mcp_loop()
|
||||
|
||||
|
||||
def _kill_orphaned_mcp_children() -> None:
|
||||
"""Graceful shutdown of MCP stdio subprocesses that survived loop cleanup.
|
||||
def _kill_orphaned_mcp_children(include_active: bool = False) -> None:
|
||||
"""Best-effort graceful shutdown of stdio MCP subprocesses to reap orphans.
|
||||
|
||||
Sends SIGTERM first, waits 2 seconds, then escalates to SIGKILL.
|
||||
This prevents shared-resource collisions when multiple hermes processes
|
||||
run on the same host (each has its own _stdio_pids dict).
|
||||
Orphans are PIDs that survived their session context exit (SDK teardown
|
||||
did not terminate the process — common on Linux when stdio children escape
|
||||
the parent cgroup on cancellation). By default only entries in
|
||||
``_orphan_stdio_pids`` are reaped so concurrent cron jobs and live user
|
||||
sessions are not disrupted.
|
||||
|
||||
Only kills PIDs tracked in ``_stdio_pids`` — never arbitrary children.
|
||||
Sends SIGTERM, waits 2 seconds, then escalates to SIGKILL for any
|
||||
survivors, avoiding shared-resource collisions when multiple hermes
|
||||
processes run on the same host (each has its own ``_stdio_pids`` dict).
|
||||
|
||||
With ``include_active=True`` also kills every PID in ``_stdio_pids`` —
|
||||
used only at final shutdown, after the MCP event loop has stopped and no
|
||||
sessions can still be in flight.
|
||||
"""
|
||||
import signal as _signal
|
||||
import time as _time
|
||||
|
||||
with _lock:
|
||||
pids = dict(_stdio_pids)
|
||||
_stdio_pids.clear()
|
||||
pids: Dict[int, str] = {}
|
||||
for opid in _orphan_stdio_pids:
|
||||
pids[opid] = "orphan"
|
||||
_orphan_stdio_pids.clear()
|
||||
if include_active:
|
||||
pids.update(dict(_stdio_pids))
|
||||
_stdio_pids.clear()
|
||||
|
||||
# Fast path: no tracked stdio PIDs to reap. Skip the SIGTERM/sleep/SIGKILL
|
||||
# dance entirely — otherwise every MCP-free shutdown pays a 2s sleep tax.
|
||||
@@ -3022,5 +3060,6 @@ def _stop_mcp_loop():
|
||||
except Exception:
|
||||
pass
|
||||
# After closing the loop, any stdio subprocesses that survived the
|
||||
# graceful shutdown are now orphaned. Force-kill them.
|
||||
_kill_orphaned_mcp_children()
|
||||
# graceful shutdown are now orphaned — include active PIDs too
|
||||
# since the loop is gone and no session can still be in flight.
|
||||
_kill_orphaned_mcp_children(include_active=True)
|
||||
|
||||
Reference in New Issue
Block a user