mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
6 Commits
skill/gith
...
terminate-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b70739a79 | ||
|
|
44dc3f7e31 | ||
|
|
5b1c504026 | ||
|
|
7f04364d24 | ||
|
|
54ff20e27b | ||
|
|
a8c17b383d |
@@ -1348,12 +1348,28 @@ class GatewayRunner:
|
|||||||
for key, entry in _expired_entries:
|
for key, entry in _expired_entries:
|
||||||
try:
|
try:
|
||||||
await self._async_flush_memories(entry.session_id)
|
await self._async_flush_memories(entry.session_id)
|
||||||
# Shut down memory provider on the cached agent
|
# Shut down memory provider and close tool resources
|
||||||
cached_agent = self._running_agents.get(key)
|
# on the cached agent. Idle agents live in
|
||||||
if cached_agent and cached_agent is not _AGENT_PENDING_SENTINEL:
|
# _agent_cache (not _running_agents), so look there.
|
||||||
|
_cached_agent = None
|
||||||
|
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
||||||
|
if _cache_lock is not None:
|
||||||
|
with _cache_lock:
|
||||||
|
_cached = self._agent_cache.get(key)
|
||||||
|
_cached_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
|
||||||
|
# Fall back to _running_agents in case the agent is
|
||||||
|
# still mid-turn when the expiry fires.
|
||||||
|
if _cached_agent is None:
|
||||||
|
_cached_agent = self._running_agents.get(key)
|
||||||
|
if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL:
|
||||||
try:
|
try:
|
||||||
if hasattr(cached_agent, 'shutdown_memory_provider'):
|
if hasattr(_cached_agent, 'shutdown_memory_provider'):
|
||||||
cached_agent.shutdown_memory_provider()
|
_cached_agent.shutdown_memory_provider()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
if hasattr(_cached_agent, 'close'):
|
||||||
|
_cached_agent.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
# Mark as flushed and persist to disk so the flag
|
# Mark as flushed and persist to disk so the flag
|
||||||
@@ -1536,6 +1552,14 @@ class GatewayRunner:
|
|||||||
agent.shutdown_memory_provider()
|
agent.shutdown_memory_provider()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
# Close tool resources (terminal sandboxes, browser daemons,
|
||||||
|
# background processes, httpx clients) to prevent zombie
|
||||||
|
# process accumulation.
|
||||||
|
try:
|
||||||
|
if hasattr(agent, 'close'):
|
||||||
|
agent.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
for platform, adapter in list(self.adapters.items()):
|
for platform, adapter in list(self.adapters.items()):
|
||||||
try:
|
try:
|
||||||
@@ -1559,6 +1583,24 @@ class GatewayRunner:
|
|||||||
self._pending_approvals.clear()
|
self._pending_approvals.clear()
|
||||||
self._shutdown_event.set()
|
self._shutdown_event.set()
|
||||||
|
|
||||||
|
# Global cleanup: kill any remaining tool subprocesses not tied
|
||||||
|
# to a specific agent (catch-all for zombie prevention).
|
||||||
|
try:
|
||||||
|
from tools.process_registry import process_registry
|
||||||
|
process_registry.kill_all()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
from tools.terminal_tool import cleanup_all_environments
|
||||||
|
cleanup_all_environments()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
from tools.browser_tool import cleanup_all_browsers
|
||||||
|
cleanup_all_browsers()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
from gateway.status import remove_pid_file, write_runtime_status
|
from gateway.status import remove_pid_file, write_runtime_status
|
||||||
remove_pid_file()
|
remove_pid_file()
|
||||||
try:
|
try:
|
||||||
@@ -3335,6 +3377,20 @@ class GatewayRunner:
|
|||||||
_flush_task.add_done_callback(self._background_tasks.discard)
|
_flush_task.add_done_callback(self._background_tasks.discard)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Gateway memory flush on reset failed: %s", e)
|
logger.debug("Gateway memory flush on reset failed: %s", e)
|
||||||
|
# Close tool resources on the old agent (terminal sandboxes, browser
|
||||||
|
# daemons, background processes) before evicting from cache.
|
||||||
|
# Guard with getattr because test fixtures may skip __init__.
|
||||||
|
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
||||||
|
if _cache_lock is not None:
|
||||||
|
with _cache_lock:
|
||||||
|
_cached = self._agent_cache.get(session_key)
|
||||||
|
_old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
|
||||||
|
if _old_agent is not None:
|
||||||
|
try:
|
||||||
|
if hasattr(_old_agent, "close"):
|
||||||
|
_old_agent.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
self._evict_cached_agent(session_key)
|
self._evict_cached_agent(session_key)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
71
run_agent.py
71
run_agent.py
@@ -1893,17 +1893,12 @@ class AIAgent:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.debug("Background memory/skill review failed: %s", e)
|
logger.debug("Background memory/skill review failed: %s", e)
|
||||||
finally:
|
finally:
|
||||||
# Explicitly close the OpenAI/httpx client so GC doesn't
|
# Close all resources (httpx client, subprocesses, etc.) so
|
||||||
# try to clean it up on a dead asyncio event loop (which
|
# GC doesn't try to clean them up on a dead asyncio event
|
||||||
# produces "Event loop is closed" errors in the terminal).
|
# loop (which produces "Event loop is closed" errors).
|
||||||
if review_agent is not None:
|
if review_agent is not None:
|
||||||
client = getattr(review_agent, "client", None)
|
|
||||||
if client is not None:
|
|
||||||
try:
|
try:
|
||||||
review_agent._close_openai_client(
|
review_agent.close()
|
||||||
client, reason="bg_review_done", shared=True
|
|
||||||
)
|
|
||||||
review_agent.client = None
|
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -2645,6 +2640,64 @@ class AIAgent:
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""Release all resources held by this agent instance.
|
||||||
|
|
||||||
|
Cleans up subprocess resources that would otherwise become orphans:
|
||||||
|
- Background processes tracked in ProcessRegistry
|
||||||
|
- Terminal sandbox environments
|
||||||
|
- Browser daemon sessions
|
||||||
|
- Active child agents (subagent delegation)
|
||||||
|
- OpenAI/httpx client connections
|
||||||
|
|
||||||
|
Safe to call multiple times (idempotent). Each cleanup step is
|
||||||
|
independently guarded so a failure in one does not prevent the rest.
|
||||||
|
"""
|
||||||
|
task_id = getattr(self, "session_id", None) or ""
|
||||||
|
|
||||||
|
# 1. Kill background processes for this task
|
||||||
|
try:
|
||||||
|
from tools.process_registry import process_registry
|
||||||
|
process_registry.kill_all(task_id=task_id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 2. Clean terminal sandbox environments
|
||||||
|
try:
|
||||||
|
from tools.terminal_tool import cleanup_vm
|
||||||
|
cleanup_vm(task_id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 3. Clean browser daemon sessions
|
||||||
|
try:
|
||||||
|
from tools.browser_tool import cleanup_browser
|
||||||
|
cleanup_browser(task_id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 4. Close active child agents
|
||||||
|
try:
|
||||||
|
with self._active_children_lock:
|
||||||
|
children = list(self._active_children)
|
||||||
|
self._active_children.clear()
|
||||||
|
for child in children:
|
||||||
|
try:
|
||||||
|
child.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 5. Close the OpenAI/httpx client
|
||||||
|
try:
|
||||||
|
client = getattr(self, "client", None)
|
||||||
|
if client is not None:
|
||||||
|
self._close_openai_client(client, reason="agent_close", shared=True)
|
||||||
|
self.client = None
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
|
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
|
||||||
"""
|
"""
|
||||||
Recover todo state from conversation history.
|
Recover todo state from conversation history.
|
||||||
|
|||||||
274
tests/tools/test_zombie_process_cleanup.py
Normal file
274
tests/tools/test_zombie_process_cleanup.py
Normal file
@@ -0,0 +1,274 @@
|
|||||||
|
"""Tests for zombie process cleanup — verifies processes spawned by tools
|
||||||
|
are properly reaped when agent sessions end.
|
||||||
|
|
||||||
|
Reproduction for issue #7131: zombie process accumulation on long-running
|
||||||
|
gateway deployments.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def _spawn_sleep(seconds: float = 60) -> subprocess.Popen:
|
||||||
|
"""Spawn a portable long-lived Python sleep process (no shell wrapper)."""
|
||||||
|
return subprocess.Popen(
|
||||||
|
[sys.executable, "-c", f"import time; time.sleep({seconds})"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _pid_alive(pid: int) -> bool:
|
||||||
|
"""Return True if a process with the given PID is still running."""
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0)
|
||||||
|
return True
|
||||||
|
except (ProcessLookupError, PermissionError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class TestZombieReproduction:
|
||||||
|
"""Demonstrate that subprocesses survive when cleanup is not called."""
|
||||||
|
|
||||||
|
def test_orphaned_processes_survive_without_cleanup(self):
|
||||||
|
"""REPRODUCTION: processes spawned directly survive if no one kills
|
||||||
|
them — this models the gap that causes zombie accumulation when
|
||||||
|
the gateway drops agent references without calling close()."""
|
||||||
|
pids = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
for _ in range(3):
|
||||||
|
proc = _spawn_sleep(60)
|
||||||
|
pids.append(proc.pid)
|
||||||
|
|
||||||
|
for pid in pids:
|
||||||
|
assert _pid_alive(pid), f"PID {pid} should be alive after spawn"
|
||||||
|
|
||||||
|
# Simulate "session end" by just dropping the reference
|
||||||
|
del proc # noqa: F821
|
||||||
|
|
||||||
|
# BUG: processes are still alive after reference is dropped
|
||||||
|
for pid in pids:
|
||||||
|
assert _pid_alive(pid), (
|
||||||
|
f"PID {pid} died after ref drop — "
|
||||||
|
f"expected it to survive (demonstrating the bug)"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
for pid in pids:
|
||||||
|
try:
|
||||||
|
os.kill(pid, signal.SIGKILL)
|
||||||
|
except (ProcessLookupError, PermissionError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_explicit_terminate_reaps_processes(self):
|
||||||
|
"""Explicitly terminating+waiting on Popen handles works.
|
||||||
|
This models what ProcessRegistry.kill_process does internally."""
|
||||||
|
procs = []
|
||||||
|
|
||||||
|
try:
|
||||||
|
for _ in range(3):
|
||||||
|
proc = _spawn_sleep(60)
|
||||||
|
procs.append(proc)
|
||||||
|
|
||||||
|
for proc in procs:
|
||||||
|
assert _pid_alive(proc.pid)
|
||||||
|
|
||||||
|
for proc in procs:
|
||||||
|
proc.terminate()
|
||||||
|
proc.wait(timeout=5)
|
||||||
|
|
||||||
|
for proc in procs:
|
||||||
|
assert proc.returncode is not None, (
|
||||||
|
f"PID {proc.pid} should have exited after terminate+wait"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
for proc in procs:
|
||||||
|
try:
|
||||||
|
proc.kill()
|
||||||
|
proc.wait(timeout=1)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class TestAgentCloseMethod:
|
||||||
|
"""Verify AIAgent.close() exists, is idempotent, and calls cleanup."""
|
||||||
|
|
||||||
|
def test_close_calls_cleanup_functions(self):
|
||||||
|
"""close() should call kill_all, cleanup_vm, cleanup_browser."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
with patch("run_agent.AIAgent.__init__", return_value=None):
|
||||||
|
from run_agent import AIAgent
|
||||||
|
agent = AIAgent.__new__(AIAgent)
|
||||||
|
agent.session_id = "test-close-cleanup"
|
||||||
|
agent._active_children = []
|
||||||
|
agent._active_children_lock = threading.Lock()
|
||||||
|
agent.client = None
|
||||||
|
|
||||||
|
with patch("tools.process_registry.process_registry") as mock_registry, \
|
||||||
|
patch("tools.terminal_tool.cleanup_vm") as mock_cleanup_vm, \
|
||||||
|
patch("tools.browser_tool.cleanup_browser") as mock_cleanup_browser:
|
||||||
|
agent.close()
|
||||||
|
|
||||||
|
mock_registry.kill_all.assert_called_once_with(
|
||||||
|
task_id="test-close-cleanup"
|
||||||
|
)
|
||||||
|
mock_cleanup_vm.assert_called_once_with("test-close-cleanup")
|
||||||
|
mock_cleanup_browser.assert_called_once_with("test-close-cleanup")
|
||||||
|
|
||||||
|
def test_close_is_idempotent(self):
|
||||||
|
"""close() can be called multiple times without error."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
with patch("run_agent.AIAgent.__init__", return_value=None):
|
||||||
|
from run_agent import AIAgent
|
||||||
|
agent = AIAgent.__new__(AIAgent)
|
||||||
|
agent.session_id = "test-close-idempotent"
|
||||||
|
agent._active_children = []
|
||||||
|
agent._active_children_lock = threading.Lock()
|
||||||
|
agent.client = None
|
||||||
|
|
||||||
|
agent.close()
|
||||||
|
agent.close()
|
||||||
|
agent.close()
|
||||||
|
|
||||||
|
def test_close_propagates_to_children(self):
|
||||||
|
"""close() should call close() on all active child agents."""
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
with patch("run_agent.AIAgent.__init__", return_value=None):
|
||||||
|
from run_agent import AIAgent
|
||||||
|
agent = AIAgent.__new__(AIAgent)
|
||||||
|
agent.session_id = "test-close-children"
|
||||||
|
agent._active_children_lock = threading.Lock()
|
||||||
|
agent.client = None
|
||||||
|
|
||||||
|
child_1 = MagicMock()
|
||||||
|
child_2 = MagicMock()
|
||||||
|
agent._active_children = [child_1, child_2]
|
||||||
|
|
||||||
|
agent.close()
|
||||||
|
|
||||||
|
child_1.close.assert_called_once()
|
||||||
|
child_2.close.assert_called_once()
|
||||||
|
assert agent._active_children == []
|
||||||
|
|
||||||
|
def test_close_survives_partial_failures(self):
|
||||||
|
"""close() continues cleanup even if one step fails."""
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
with patch("run_agent.AIAgent.__init__", return_value=None):
|
||||||
|
from run_agent import AIAgent
|
||||||
|
agent = AIAgent.__new__(AIAgent)
|
||||||
|
agent.session_id = "test-close-partial"
|
||||||
|
agent._active_children = []
|
||||||
|
agent._active_children_lock = threading.Lock()
|
||||||
|
agent.client = None
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"tools.process_registry.process_registry"
|
||||||
|
) as mock_reg, patch(
|
||||||
|
"tools.terminal_tool.cleanup_vm"
|
||||||
|
) as mock_vm, patch(
|
||||||
|
"tools.browser_tool.cleanup_browser"
|
||||||
|
) as mock_browser:
|
||||||
|
mock_reg.kill_all.side_effect = RuntimeError("boom")
|
||||||
|
|
||||||
|
agent.close()
|
||||||
|
|
||||||
|
mock_vm.assert_called_once()
|
||||||
|
mock_browser.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestGatewayCleanupWiring:
|
||||||
|
"""Verify gateway lifecycle calls close() on agents."""
|
||||||
|
|
||||||
|
def test_gateway_stop_calls_close(self):
|
||||||
|
"""gateway stop() should call close() on all running agents."""
|
||||||
|
import asyncio
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
runner = MagicMock()
|
||||||
|
runner._running = True
|
||||||
|
runner._running_agents = {}
|
||||||
|
runner.adapters = {}
|
||||||
|
runner._background_tasks = set()
|
||||||
|
runner._pending_messages = {}
|
||||||
|
runner._pending_approvals = {}
|
||||||
|
runner._shutdown_event = asyncio.Event()
|
||||||
|
runner._exit_reason = None
|
||||||
|
|
||||||
|
mock_agent_1 = MagicMock()
|
||||||
|
mock_agent_2 = MagicMock()
|
||||||
|
runner._running_agents = {
|
||||||
|
"session-1": mock_agent_1,
|
||||||
|
"session-2": mock_agent_2,
|
||||||
|
}
|
||||||
|
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
try:
|
||||||
|
with patch("gateway.status.remove_pid_file"), \
|
||||||
|
patch("gateway.status.write_runtime_status"), \
|
||||||
|
patch("tools.terminal_tool.cleanup_all_environments"), \
|
||||||
|
patch("tools.browser_tool.cleanup_all_browsers"):
|
||||||
|
loop.run_until_complete(GatewayRunner.stop(runner))
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
mock_agent_1.close.assert_called()
|
||||||
|
mock_agent_2.close.assert_called()
|
||||||
|
|
||||||
|
def test_evict_does_not_call_close(self):
|
||||||
|
"""_evict_cached_agent() should NOT call close() — it's also used
|
||||||
|
for non-destructive refreshes (model switch, branch, fallback)."""
|
||||||
|
import threading
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
from gateway.run import GatewayRunner
|
||||||
|
|
||||||
|
runner = object.__new__(GatewayRunner)
|
||||||
|
runner._agent_cache_lock = threading.Lock()
|
||||||
|
|
||||||
|
mock_agent = MagicMock()
|
||||||
|
runner._agent_cache = {"session-key": (mock_agent, 12345)}
|
||||||
|
|
||||||
|
GatewayRunner._evict_cached_agent(runner, "session-key")
|
||||||
|
|
||||||
|
mock_agent.close.assert_not_called()
|
||||||
|
assert "session-key" not in runner._agent_cache
|
||||||
|
|
||||||
|
|
||||||
|
class TestDelegationCleanup:
|
||||||
|
"""Verify subagent delegation cleans up child agents."""
|
||||||
|
|
||||||
|
def test_run_single_child_calls_close(self):
|
||||||
|
"""_run_single_child finally block should call close() on child."""
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
from tools.delegate_tool import _run_single_child
|
||||||
|
|
||||||
|
parent = MagicMock()
|
||||||
|
parent._active_children = []
|
||||||
|
parent._active_children_lock = threading.Lock()
|
||||||
|
|
||||||
|
child = MagicMock()
|
||||||
|
child._delegate_saved_tool_names = ["tool1"]
|
||||||
|
child.run_conversation.side_effect = RuntimeError("test abort")
|
||||||
|
|
||||||
|
parent._active_children.append(child)
|
||||||
|
|
||||||
|
result = _run_single_child(
|
||||||
|
task_index=0,
|
||||||
|
goal="test goal",
|
||||||
|
child=child,
|
||||||
|
parent_agent=parent,
|
||||||
|
)
|
||||||
|
|
||||||
|
child.close.assert_called_once()
|
||||||
|
assert child not in parent._active_children
|
||||||
|
assert result["status"] == "error"
|
||||||
@@ -507,6 +507,15 @@ def _run_single_child(
|
|||||||
except (ValueError, UnboundLocalError) as e:
|
except (ValueError, UnboundLocalError) as e:
|
||||||
logger.debug("Could not remove child from active_children: %s", e)
|
logger.debug("Could not remove child from active_children: %s", e)
|
||||||
|
|
||||||
|
# Close tool resources (terminal sandboxes, browser daemons,
|
||||||
|
# background processes, httpx clients) so subagent subprocesses
|
||||||
|
# don't outlive the delegation.
|
||||||
|
try:
|
||||||
|
if hasattr(child, 'close'):
|
||||||
|
child.close()
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Failed to close child agent after delegation")
|
||||||
|
|
||||||
def delegate_task(
|
def delegate_task(
|
||||||
goal: Optional[str] = None,
|
goal: Optional[str] = None,
|
||||||
context: Optional[str] = None,
|
context: Optional[str] = None,
|
||||||
|
|||||||
Reference in New Issue
Block a user