Compare commits

...

6 Commits

Author SHA1 Message Date
pefontana
6b70739a79 fix(gateway): look up expired agents in _agent_cache, add global kill_all
Two fixes from PR review:

1. Session expiry was looking in _running_agents for the cached agent,
   but idle expired sessions live in _agent_cache. Now checks
   _agent_cache first, falls back to _running_agents.

2. Global cleanup in stop() was missing process_registry.kill_all(),
   so background processes from agents evicted without close() (branch,
   fallback) survived shutdown.
2026-04-10 17:26:10 -03:00
pefontana
44dc3f7e31 fix(gateway): guard _agent_cache_lock access in reset handler
Use getattr guard for _agent_cache_lock in _handle_reset_command
because test fixtures may create GatewayRunner without calling
__init__, leaving the attribute unset.

Fixes e2e test failure: test_new_resets_session,
test_new_then_status_reflects_reset, test_new_is_idempotent.
2026-04-10 16:58:42 -03:00
pefontana
5b1c504026 test: add zombie process cleanup tests
Add 9 tests covering the full zombie process prevention chain:

- TestZombieReproduction: demonstrates that processes survive when
  references are dropped without explicit cleanup (the original bug)
- TestAgentCloseMethod: verifies close() calls all cleanup functions,
  is idempotent, propagates to children, and continues cleanup even
  when individual steps fail
- TestGatewayCleanupWiring: verifies stop() calls close() and that
  _evict_cached_agent() does NOT call close() (since it's also used
  for non-destructive cache refreshes)
- TestDelegationCleanup: calls the real _run_single_child function and
  verifies close() is called on the child agent

Ref: #7131
2026-04-10 16:31:34 -03:00
pefontana
7f04364d24 fix(delegate): close child agent after delegation completes
Call child.close() in the _run_single_child finally block after
unregistering the child from the parent's active children list.

Previously child AIAgent instances were only removed from the tracking
list but never had their resources released — the OpenAI/httpx client
and any tool subprocesses relied entirely on garbage collection.

Ref: #7131
2026-04-10 16:23:23 -03:00
pefontana
54ff20e27b fix(gateway): call agent.close() on session end to prevent zombies
Wire AIAgent.close() into every gateway code path where an agent's
session is actually ending:

- stop(): close all running agents after interrupt + memory shutdown,
  then call cleanup_all_environments() and cleanup_all_browsers() as
  a global catch-all
- _session_expiry_watcher(): close agents when sessions expire after
  the 5-minute idle timeout
- _handle_reset_command(): close the old agent before evicting it from
  cache on /new or /reset

Note: _evict_cached_agent() intentionally does NOT call close() because
it is also used for non-destructive cache refreshes (model switch,
branch, fallback) where tool resources should persist.

Ref: #7131
2026-04-10 16:22:59 -03:00
pefontana
a8c17b383d feat(agent): add AIAgent.close() for subprocess cleanup
Add a close() method to AIAgent that acts as a single entry point for
releasing all resources held by an agent instance. This prevents zombie
process accumulation on long-running gateway deployments by explicitly
cleaning up:

- Background processes tracked in ProcessRegistry
- Terminal sandbox environments
- Browser daemon sessions
- Active child agents (subagent delegation)
- OpenAI/httpx client connections

Each cleanup step is independently guarded so a failure in one does not
prevent the rest. The method is idempotent and safe to call multiple
times.

Also simplifies the background review cleanup to use close() instead
of manually closing the OpenAI client.

Ref: #7131
2026-04-10 16:22:05 -03:00
4 changed files with 411 additions and 19 deletions

View File

@@ -1348,12 +1348,28 @@ class GatewayRunner:
for key, entry in _expired_entries:
try:
await self._async_flush_memories(entry.session_id)
# Shut down memory provider on the cached agent
cached_agent = self._running_agents.get(key)
if cached_agent and cached_agent is not _AGENT_PENDING_SENTINEL:
# Shut down memory provider and close tool resources
# on the cached agent. Idle agents live in
# _agent_cache (not _running_agents), so look there.
_cached_agent = None
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock is not None:
with _cache_lock:
_cached = self._agent_cache.get(key)
_cached_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
# Fall back to _running_agents in case the agent is
# still mid-turn when the expiry fires.
if _cached_agent is None:
_cached_agent = self._running_agents.get(key)
if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL:
try:
if hasattr(cached_agent, 'shutdown_memory_provider'):
cached_agent.shutdown_memory_provider()
if hasattr(_cached_agent, 'shutdown_memory_provider'):
_cached_agent.shutdown_memory_provider()
except Exception:
pass
try:
if hasattr(_cached_agent, 'close'):
_cached_agent.close()
except Exception:
pass
# Mark as flushed and persist to disk so the flag
@@ -1536,6 +1552,14 @@ class GatewayRunner:
agent.shutdown_memory_provider()
except Exception:
pass
# Close tool resources (terminal sandboxes, browser daemons,
# background processes, httpx clients) to prevent zombie
# process accumulation.
try:
if hasattr(agent, 'close'):
agent.close()
except Exception:
pass
for platform, adapter in list(self.adapters.items()):
try:
@@ -1558,7 +1582,25 @@ class GatewayRunner:
self._pending_messages.clear()
self._pending_approvals.clear()
self._shutdown_event.set()
# Global cleanup: kill any remaining tool subprocesses not tied
# to a specific agent (catch-all for zombie prevention).
try:
from tools.process_registry import process_registry
process_registry.kill_all()
except Exception:
pass
try:
from tools.terminal_tool import cleanup_all_environments
cleanup_all_environments()
except Exception:
pass
try:
from tools.browser_tool import cleanup_all_browsers
cleanup_all_browsers()
except Exception:
pass
from gateway.status import remove_pid_file, write_runtime_status
remove_pid_file()
try:
@@ -3335,8 +3377,22 @@ class GatewayRunner:
_flush_task.add_done_callback(self._background_tasks.discard)
except Exception as e:
logger.debug("Gateway memory flush on reset failed: %s", e)
# Close tool resources on the old agent (terminal sandboxes, browser
# daemons, background processes) before evicting from cache.
# Guard with getattr because test fixtures may skip __init__.
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock is not None:
with _cache_lock:
_cached = self._agent_cache.get(session_key)
_old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
if _old_agent is not None:
try:
if hasattr(_old_agent, "close"):
_old_agent.close()
except Exception:
pass
self._evict_cached_agent(session_key)
try:
from tools.env_passthrough import clear_env_passthrough
clear_env_passthrough()

View File

@@ -1893,19 +1893,14 @@ class AIAgent:
except Exception as e:
logger.debug("Background memory/skill review failed: %s", e)
finally:
# Explicitly close the OpenAI/httpx client so GC doesn't
# try to clean it up on a dead asyncio event loop (which
# produces "Event loop is closed" errors in the terminal).
# Close all resources (httpx client, subprocesses, etc.) so
# GC doesn't try to clean them up on a dead asyncio event
# loop (which produces "Event loop is closed" errors).
if review_agent is not None:
client = getattr(review_agent, "client", None)
if client is not None:
try:
review_agent._close_openai_client(
client, reason="bg_review_done", shared=True
)
review_agent.client = None
except Exception:
pass
try:
review_agent.close()
except Exception:
pass
t = threading.Thread(target=_run_review, daemon=True, name="bg-review")
t.start()
@@ -2645,6 +2640,64 @@ class AIAgent:
except Exception:
pass
def close(self) -> None:
"""Release all resources held by this agent instance.
Cleans up subprocess resources that would otherwise become orphans:
- Background processes tracked in ProcessRegistry
- Terminal sandbox environments
- Browser daemon sessions
- Active child agents (subagent delegation)
- OpenAI/httpx client connections
Safe to call multiple times (idempotent). Each cleanup step is
independently guarded so a failure in one does not prevent the rest.
"""
task_id = getattr(self, "session_id", None) or ""
# 1. Kill background processes for this task
try:
from tools.process_registry import process_registry
process_registry.kill_all(task_id=task_id)
except Exception:
pass
# 2. Clean terminal sandbox environments
try:
from tools.terminal_tool import cleanup_vm
cleanup_vm(task_id)
except Exception:
pass
# 3. Clean browser daemon sessions
try:
from tools.browser_tool import cleanup_browser
cleanup_browser(task_id)
except Exception:
pass
# 4. Close active child agents
try:
with self._active_children_lock:
children = list(self._active_children)
self._active_children.clear()
for child in children:
try:
child.close()
except Exception:
pass
except Exception:
pass
# 5. Close the OpenAI/httpx client
try:
client = getattr(self, "client", None)
if client is not None:
self._close_openai_client(client, reason="agent_close", shared=True)
self.client = None
except Exception:
pass
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
"""
Recover todo state from conversation history.

View File

@@ -0,0 +1,274 @@
"""Tests for zombie process cleanup — verifies processes spawned by tools
are properly reaped when agent sessions end.
Reproduction for issue #7131: zombie process accumulation on long-running
gateway deployments.
"""
import os
import signal
import subprocess
import sys
import time
import threading
import pytest
def _spawn_sleep(seconds: float = 60) -> subprocess.Popen:
"""Spawn a portable long-lived Python sleep process (no shell wrapper)."""
return subprocess.Popen(
[sys.executable, "-c", f"import time; time.sleep({seconds})"],
)
def _pid_alive(pid: int) -> bool:
"""Return True if a process with the given PID is still running."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
class TestZombieReproduction:
"""Demonstrate that subprocesses survive when cleanup is not called."""
def test_orphaned_processes_survive_without_cleanup(self):
"""REPRODUCTION: processes spawned directly survive if no one kills
them — this models the gap that causes zombie accumulation when
the gateway drops agent references without calling close()."""
pids = []
try:
for _ in range(3):
proc = _spawn_sleep(60)
pids.append(proc.pid)
for pid in pids:
assert _pid_alive(pid), f"PID {pid} should be alive after spawn"
# Simulate "session end" by just dropping the reference
del proc # noqa: F821
# BUG: processes are still alive after reference is dropped
for pid in pids:
assert _pid_alive(pid), (
f"PID {pid} died after ref drop — "
f"expected it to survive (demonstrating the bug)"
)
finally:
for pid in pids:
try:
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
def test_explicit_terminate_reaps_processes(self):
"""Explicitly terminating+waiting on Popen handles works.
This models what ProcessRegistry.kill_process does internally."""
procs = []
try:
for _ in range(3):
proc = _spawn_sleep(60)
procs.append(proc)
for proc in procs:
assert _pid_alive(proc.pid)
for proc in procs:
proc.terminate()
proc.wait(timeout=5)
for proc in procs:
assert proc.returncode is not None, (
f"PID {proc.pid} should have exited after terminate+wait"
)
finally:
for proc in procs:
try:
proc.kill()
proc.wait(timeout=1)
except Exception:
pass
class TestAgentCloseMethod:
"""Verify AIAgent.close() exists, is idempotent, and calls cleanup."""
def test_close_calls_cleanup_functions(self):
"""close() should call kill_all, cleanup_vm, cleanup_browser."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-cleanup"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
with patch("tools.process_registry.process_registry") as mock_registry, \
patch("tools.terminal_tool.cleanup_vm") as mock_cleanup_vm, \
patch("tools.browser_tool.cleanup_browser") as mock_cleanup_browser:
agent.close()
mock_registry.kill_all.assert_called_once_with(
task_id="test-close-cleanup"
)
mock_cleanup_vm.assert_called_once_with("test-close-cleanup")
mock_cleanup_browser.assert_called_once_with("test-close-cleanup")
def test_close_is_idempotent(self):
"""close() can be called multiple times without error."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-idempotent"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
agent.close()
agent.close()
agent.close()
def test_close_propagates_to_children(self):
"""close() should call close() on all active child agents."""
from unittest.mock import MagicMock, patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-children"
agent._active_children_lock = threading.Lock()
agent.client = None
child_1 = MagicMock()
child_2 = MagicMock()
agent._active_children = [child_1, child_2]
agent.close()
child_1.close.assert_called_once()
child_2.close.assert_called_once()
assert agent._active_children == []
def test_close_survives_partial_failures(self):
"""close() continues cleanup even if one step fails."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-partial"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
with patch(
"tools.process_registry.process_registry"
) as mock_reg, patch(
"tools.terminal_tool.cleanup_vm"
) as mock_vm, patch(
"tools.browser_tool.cleanup_browser"
) as mock_browser:
mock_reg.kill_all.side_effect = RuntimeError("boom")
agent.close()
mock_vm.assert_called_once()
mock_browser.assert_called_once()
class TestGatewayCleanupWiring:
"""Verify gateway lifecycle calls close() on agents."""
def test_gateway_stop_calls_close(self):
"""gateway stop() should call close() on all running agents."""
import asyncio
from unittest.mock import MagicMock, patch
runner = MagicMock()
runner._running = True
runner._running_agents = {}
runner.adapters = {}
runner._background_tasks = set()
runner._pending_messages = {}
runner._pending_approvals = {}
runner._shutdown_event = asyncio.Event()
runner._exit_reason = None
mock_agent_1 = MagicMock()
mock_agent_2 = MagicMock()
runner._running_agents = {
"session-1": mock_agent_1,
"session-2": mock_agent_2,
}
from gateway.run import GatewayRunner
loop = asyncio.new_event_loop()
try:
with patch("gateway.status.remove_pid_file"), \
patch("gateway.status.write_runtime_status"), \
patch("tools.terminal_tool.cleanup_all_environments"), \
patch("tools.browser_tool.cleanup_all_browsers"):
loop.run_until_complete(GatewayRunner.stop(runner))
finally:
loop.close()
mock_agent_1.close.assert_called()
mock_agent_2.close.assert_called()
def test_evict_does_not_call_close(self):
"""_evict_cached_agent() should NOT call close() — it's also used
for non-destructive refreshes (model switch, branch, fallback)."""
import threading
from unittest.mock import MagicMock
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._agent_cache_lock = threading.Lock()
mock_agent = MagicMock()
runner._agent_cache = {"session-key": (mock_agent, 12345)}
GatewayRunner._evict_cached_agent(runner, "session-key")
mock_agent.close.assert_not_called()
assert "session-key" not in runner._agent_cache
class TestDelegationCleanup:
"""Verify subagent delegation cleans up child agents."""
def test_run_single_child_calls_close(self):
"""_run_single_child finally block should call close() on child."""
from unittest.mock import MagicMock
from tools.delegate_tool import _run_single_child
parent = MagicMock()
parent._active_children = []
parent._active_children_lock = threading.Lock()
child = MagicMock()
child._delegate_saved_tool_names = ["tool1"]
child.run_conversation.side_effect = RuntimeError("test abort")
parent._active_children.append(child)
result = _run_single_child(
task_index=0,
goal="test goal",
child=child,
parent_agent=parent,
)
child.close.assert_called_once()
assert child not in parent._active_children
assert result["status"] == "error"

View File

@@ -507,6 +507,15 @@ def _run_single_child(
except (ValueError, UnboundLocalError) as e:
logger.debug("Could not remove child from active_children: %s", e)
# Close tool resources (terminal sandboxes, browser daemons,
# background processes, httpx clients) so subagent subprocesses
# don't outlive the delegation.
try:
if hasattr(child, 'close'):
child.close()
except Exception:
logger.debug("Failed to close child agent after delegation")
def delegate_task(
goal: Optional[str] = None,
context: Optional[str] = None,