Fix browser cleanup consistency and screenshot recovery

Unify browser session teardown so manual close, inactivity cleanup, and emergency shutdown all follow the same cleanup path instead of partially duplicating logic.

This changes browser_close() to delegate to cleanup_browser(), which means recording shutdown, Browserbase release, activity bookkeeping cleanup, and local socket-directory removal now happen consistently. It also updates emergency cleanup to route through cleanup_all_browsers() and explicitly clear in-memory tracking state after teardown so stale active-session, last-activity, and recording entries are not left behind on exit.

The screenshot fallback path has also been fixed. _extract_screenshot_path_from_text() now matches real absolute PNG paths, including quoted output, so browser_vision() can recover screenshots when agent-browser emits human-readable text instead of JSON.

Regression coverage was added in tests/tools/test_browser_cleanup.py for screenshot path extraction, cleanup_browser() state removal, browser_close() delegation, and emergency cleanup state clearing.

Verified with:
- python -m pytest tests/tools/test_browser_cleanup.py -q
- python -m pytest tests/tools/test_browser_console.py tests/gateway/test_send_image_file.py -q
This commit is contained in:
Dave Tist
2026-03-12 02:49:24 +01:00
committed by teknium1
parent 6d8286f396
commit 895fe5a5d3
2 changed files with 469 additions and 325 deletions

View File

@@ -0,0 +1,96 @@
"""Regression tests for browser session cleanup and screenshot recovery."""
from unittest.mock import patch
class TestScreenshotPathRecovery:
def test_extracts_standard_absolute_path(self):
from tools.browser_tool import _extract_screenshot_path_from_text
assert (
_extract_screenshot_path_from_text("Screenshot saved to /tmp/foo.png")
== "/tmp/foo.png"
)
def test_extracts_quoted_absolute_path(self):
from tools.browser_tool import _extract_screenshot_path_from_text
assert (
_extract_screenshot_path_from_text(
"Screenshot saved to '/Users/david/.hermes/browser_screenshots/shot.png'"
)
== "/Users/david/.hermes/browser_screenshots/shot.png"
)
class TestBrowserCleanup:
def setup_method(self):
from tools import browser_tool
self.browser_tool = browser_tool
self.orig_active_sessions = browser_tool._active_sessions.copy()
self.orig_session_last_activity = browser_tool._session_last_activity.copy()
self.orig_recording_sessions = browser_tool._recording_sessions.copy()
self.orig_cleanup_done = browser_tool._cleanup_done
def teardown_method(self):
self.browser_tool._active_sessions.clear()
self.browser_tool._active_sessions.update(self.orig_active_sessions)
self.browser_tool._session_last_activity.clear()
self.browser_tool._session_last_activity.update(self.orig_session_last_activity)
self.browser_tool._recording_sessions.clear()
self.browser_tool._recording_sessions.update(self.orig_recording_sessions)
self.browser_tool._cleanup_done = self.orig_cleanup_done
def test_cleanup_browser_clears_tracking_state(self):
browser_tool = self.browser_tool
browser_tool._active_sessions["task-1"] = {
"session_name": "sess-1",
"bb_session_id": None,
}
browser_tool._session_last_activity["task-1"] = 123.0
with (
patch("tools.browser_tool._maybe_stop_recording") as mock_stop,
patch(
"tools.browser_tool._run_browser_command",
return_value={"success": True},
) as mock_run,
patch("tools.browser_tool.os.path.exists", return_value=False),
):
browser_tool.cleanup_browser("task-1")
assert "task-1" not in browser_tool._active_sessions
assert "task-1" not in browser_tool._session_last_activity
mock_stop.assert_called_once_with("task-1")
mock_run.assert_called_once_with("task-1", "close", [], timeout=10)
def test_browser_close_delegates_to_cleanup_browser(self):
import json
browser_tool = self.browser_tool
browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"}
with patch("tools.browser_tool.cleanup_browser") as mock_cleanup:
result = json.loads(browser_tool.browser_close("task-2"))
assert result == {"success": True, "closed": True}
mock_cleanup.assert_called_once_with("task-2")
def test_emergency_cleanup_clears_all_tracking_state(self):
browser_tool = self.browser_tool
browser_tool._cleanup_done = False
browser_tool._active_sessions["task-1"] = {"session_name": "sess-1"}
browser_tool._active_sessions["task-2"] = {"session_name": "sess-2"}
browser_tool._session_last_activity["task-1"] = 1.0
browser_tool._session_last_activity["task-2"] = 2.0
browser_tool._recording_sessions.update({"task-1", "task-2"})
with patch("tools.browser_tool.cleanup_all_browsers") as mock_cleanup_all:
browser_tool._emergency_cleanup_all_sessions()
mock_cleanup_all.assert_called_once_with()
assert browser_tool._active_sessions == {}
assert browser_tool._session_last_activity == {}
assert browser_tool._recording_sessions == set()
assert browser_tool._cleanup_done is True

View File

@@ -49,10 +49,12 @@ Usage:
browser_click("@e5", task_id="task_123")
"""
from tools.registry import registry
import atexit
import json
import logging
import os
import re
import signal
import subprocess
import shutil
@@ -126,7 +128,8 @@ def _socket_safe_tmpdir() -> str:
# Track active sessions per task
# Stores: session_name (always), bb_session_id + cdp_url (cloud mode only)
_active_sessions: Dict[str, Dict[str, str]] = {} # task_id -> {session_name, ...}
# task_id -> {session_name, ...}
_active_sessions: Dict[str, Dict[str, str]] = {}
_recording_sessions: set = set() # task_ids with active recordings
# Flag to track if cleanup has been done
@@ -139,7 +142,8 @@ _cleanup_done = False
# Session inactivity timeout (seconds) - cleanup if no activity for this long
# Default: 5 minutes. Needs headroom for LLM reasoning between browser commands,
# especially when subagents are doing multi-step browser tasks.
BROWSER_SESSION_INACTIVITY_TIMEOUT = int(os.environ.get("BROWSER_INACTIVITY_TIMEOUT", "300"))
BROWSER_SESSION_INACTIVITY_TIMEOUT = int(
os.environ.get("BROWSER_INACTIVITY_TIMEOUT", "300"))
# Track last activity time per session
_session_last_activity: Dict[str, float] = {}
@@ -165,63 +169,18 @@ def _emergency_cleanup_all_sessions():
if not _active_sessions:
return
logger.info("Emergency cleanup: closing %s active session(s)...", len(_active_sessions))
logger.info("Emergency cleanup: closing %s active session(s)...",
len(_active_sessions))
try:
if _is_local_mode():
# Local mode: just close agent-browser sessions via CLI
for task_id, session_info in list(_active_sessions.items()):
session_name = session_info.get("session_name")
if session_name:
try:
browser_cmd = _find_agent_browser()
task_socket_dir = os.path.join(
_socket_safe_tmpdir(),
f"agent-browser-{session_name}"
)
env = {**os.environ, "AGENT_BROWSER_SOCKET_DIR": task_socket_dir}
subprocess.run(
browser_cmd.split() + ["--session", session_name, "--json", "close"],
capture_output=True, timeout=5, env=env,
)
logger.info("Closed local session %s", session_name)
except Exception as e:
logger.debug("Error closing local session %s: %s", session_name, e)
else:
# Cloud mode: release Browserbase sessions via API
api_key = os.environ.get("BROWSERBASE_API_KEY")
project_id = os.environ.get("BROWSERBASE_PROJECT_ID")
if not api_key or not project_id:
logger.warning("Cannot cleanup - missing BROWSERBASE credentials")
return
for task_id, session_info in list(_active_sessions.items()):
bb_session_id = session_info.get("bb_session_id")
if bb_session_id:
try:
response = requests.post(
f"https://api.browserbase.com/v1/sessions/{bb_session_id}",
headers={
"X-BB-API-Key": api_key,
"Content-Type": "application/json"
},
json={
"projectId": project_id,
"status": "REQUEST_RELEASE"
},
timeout=5 # Short timeout for cleanup
)
if response.status_code in (200, 201, 204):
logger.info("Closed session %s", bb_session_id)
else:
logger.warning("Failed to close session %s: HTTP %s", bb_session_id, response.status_code)
except Exception as e:
logger.error("Error closing session %s: %s", bb_session_id, e)
_active_sessions.clear()
cleanup_all_browsers()
except Exception as e:
logger.error("Emergency cleanup error: %s", e)
finally:
with _cleanup_lock:
_active_sessions.clear()
_session_last_activity.clear()
_recording_sessions.clear()
# Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM
@@ -255,14 +214,17 @@ def _cleanup_inactive_browser_sessions():
for task_id in sessions_to_cleanup:
try:
elapsed = int(current_time - _session_last_activity.get(task_id, current_time))
logger.info("Cleaning up inactive session for task: %s (inactive for %ss)", task_id, elapsed)
elapsed = int(
current_time - _session_last_activity.get(task_id, current_time))
logger.info(
"Cleaning up inactive session for task: %s (inactive for %ss)", task_id, elapsed)
cleanup_browser(task_id)
with _cleanup_lock:
if task_id in _session_last_activity:
del _session_last_activity[task_id]
except Exception as e:
logger.warning("Error cleaning up inactive session %s: %s", task_id, e)
logger.warning(
"Error cleaning up inactive session %s: %s", task_id, e)
def _browser_cleanup_thread_worker():
@@ -300,7 +262,8 @@ def _start_browser_cleanup_thread():
name="browser-cleanup"
)
_cleanup_thread.start()
logger.info("Started inactivity cleanup thread (timeout: %ss)", BROWSER_SESSION_INACTIVITY_TIMEOUT)
logger.info("Started inactivity cleanup thread (timeout: %ss)",
BROWSER_SESSION_INACTIVITY_TIMEOUT)
def _stop_browser_cleanup_thread():
@@ -511,11 +474,14 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
# Check for optional settings from environment
# Proxies: enabled by default for better CAPTCHA solving
enable_proxies = os.environ.get("BROWSERBASE_PROXIES", "true").lower() != "false"
enable_proxies = os.environ.get(
"BROWSERBASE_PROXIES", "true").lower() != "false"
# Advanced Stealth: requires Scale Plan, disabled by default
enable_advanced_stealth = os.environ.get("BROWSERBASE_ADVANCED_STEALTH", "false").lower() == "true"
enable_advanced_stealth = os.environ.get(
"BROWSERBASE_ADVANCED_STEALTH", "false").lower() == "true"
# keepAlive: enabled by default (requires paid plan) - allows reconnection after disconnects
enable_keep_alive = os.environ.get("BROWSERBASE_KEEP_ALIVE", "true").lower() != "false"
enable_keep_alive = os.environ.get(
"BROWSERBASE_KEEP_ALIVE", "true").lower() != "false"
# Custom session timeout in milliseconds (optional) - extends session beyond project default
custom_timeout_ms = os.environ.get("BROWSERBASE_SESSION_TIMEOUT")
@@ -547,7 +513,8 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
if timeout_val > 0:
session_config["timeout"] = timeout_val
except ValueError:
logger.warning("Invalid BROWSERBASE_SESSION_TIMEOUT value: %s", custom_timeout_ms)
logger.warning(
"Invalid BROWSERBASE_SESSION_TIMEOUT value: %s", custom_timeout_ms)
# Enable proxies for better CAPTCHA solving (default: true)
# Routes traffic through residential IPs for more reliable access
@@ -583,7 +550,7 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
if enable_keep_alive:
keepalive_fallback = True
logger.warning("keepAlive may require paid plan (402), retrying without it. "
"Sessions may timeout during long operations.")
"Sessions may timeout during long operations.")
session_config.pop("keepAlive", None)
response = requests.post(
"https://api.browserbase.com/v1/sessions",
@@ -599,7 +566,7 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
if response.status_code == 402 and enable_proxies:
proxies_fallback = True
logger.warning("Proxies unavailable (402), retrying without proxies. "
"Bot detection may be less effective.")
"Bot detection may be less effective.")
session_config.pop("proxies", None)
response = requests.post(
"https://api.browserbase.com/v1/sessions",
@@ -612,7 +579,8 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
)
if not response.ok:
raise RuntimeError(f"Failed to create Browserbase session: {response.status_code} {response.text}")
raise RuntimeError(
f"Failed to create Browserbase session: {response.status_code} {response.text}")
session_data = response.json()
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
@@ -629,7 +597,8 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
# Log session info for debugging
feature_str = ", ".join(k for k, v in features_enabled.items() if v)
logger.info("Created session %s with features: %s", session_name, feature_str)
logger.info("Created session %s with features: %s",
session_name, feature_str)
return {
"session_name": session_name,
@@ -640,18 +609,14 @@ def _create_browserbase_session(task_id: str) -> Dict[str, str]:
def _create_local_session(task_id: str) -> Dict[str, str]:
"""Create a lightweight local browser session (no cloud API call).
Returns the same dict shape as ``_create_browserbase_session`` so the rest
of the code can treat both modes uniformly.
"""
import uuid
session_name = f"hermes_{task_id}_{uuid.uuid4().hex[:8]}"
logger.info("Created local browser session %s", session_name)
session_name = f"h_{uuid.uuid4().hex[:10]}"
logger.info("Created local browser session %s for task %s",
session_name, task_id)
return {
"session_name": session_name,
"bb_session_id": None, # Not applicable in local mode
"cdp_url": None, # Not applicable in local mode
"bb_session_id": None,
"cdp_url": None,
"features": {"local": True},
}
@@ -772,6 +737,27 @@ def _find_agent_browser() -> str:
)
def _extract_screenshot_path_from_text(text: str) -> Optional[str]:
"""Extract a screenshot file path from agent-browser human-readable output."""
if not text:
return None
patterns = [
r"Screenshot saved to ['\"](?P<path>/[^'\"]+?\.png)['\"]",
r"Screenshot saved to (?P<path>/\S+?\.png)(?:\s|$)",
r"(?P<path>/\S+?\.png)(?:\s|$)",
]
for pattern in patterns:
match = re.search(pattern, text)
if match:
path = match.group("path").strip().strip("'\"")
if path:
return path
return None
def _run_browser_command(
task_id: str,
command: str,
@@ -807,7 +793,8 @@ def _run_browser_command(
try:
session_info = _get_session_info(task_id)
except Exception as e:
logger.warning("Failed to create browser session for task=%s: %s", task_id, e)
logger.warning(
"Failed to create browser session for task=%s: %s", task_id, e)
return {"success": False, "error": f"Failed to create browser session: {str(e)}"}
# Build the command with the appropriate backend flag.
@@ -841,11 +828,29 @@ def _run_browser_command(
command, task_id, task_socket_dir, len(task_socket_dir))
browser_env = {**os.environ}
# Ensure PATH includes standard dirs (systemd services may have minimal PATH)
if "/usr/bin" not in browser_env.get("PATH", "").split(":"):
browser_env["PATH"] = f"{browser_env.get('PATH', '')}:{_SANE_PATH}"
# Ensure PATH includes Hermes-managed Node first, then standard system dirs.
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
hermes_node_bin = str(hermes_home / "node" / "bin")
existing_path = browser_env.get("PATH", "")
path_parts = [p for p in existing_path.split(":") if p]
candidate_dirs = [hermes_node_bin] + [p for p in _SANE_PATH.split(":") if p]
for part in reversed(candidate_dirs):
if os.path.isdir(part) and part not in path_parts:
path_parts.insert(0, part)
browser_env["PATH"] = ":".join(path_parts)
browser_env["AGENT_BROWSER_SOCKET_DIR"] = task_socket_dir
node_path = shutil.which("node", path=browser_env["PATH"])
if node_path:
logger.debug("browser subprocess using node at: %s", node_path)
else:
logger.warning("node not found in browser PATH: %s",
browser_env["PATH"])
result = subprocess.run(
cmd_parts,
capture_output=True,
@@ -857,7 +862,8 @@ def _run_browser_command(
# Log stderr for diagnostics — use warning level on failure so it's visible
if result.stderr and result.stderr.strip():
level = logging.WARNING if result.returncode != 0 else logging.DEBUG
logger.log(level, "browser '%s' stderr: %s", command, result.stderr.strip()[:500])
logger.log(level, "browser '%s' stderr: %s",
command, result.stderr.strip()[:500])
# Log empty output as warning — common sign of broken agent-browser
if not result.stdout.strip() and result.returncode == 0:
@@ -866,11 +872,11 @@ def _run_browser_command(
command, " ".join(cmd_parts[:4]) + "...",
(result.stderr or "")[:200])
# Parse JSON output
if result.stdout.strip():
stdout_text = result.stdout.strip()
if stdout_text:
try:
parsed = json.loads(result.stdout.strip())
# Warn if snapshot came back empty (common sign of daemon/CDP issues)
parsed = json.loads(stdout_text)
if command == "snapshot" and parsed.get("success"):
snap_data = parsed.get("data", {})
if not snap_data.get("snapshot") and not snap_data.get("refs"):
@@ -879,19 +885,42 @@ def _run_browser_command(
"returncode=%s", result.returncode)
return parsed
except json.JSONDecodeError:
# Non-JSON output indicates agent-browser crash or version mismatch
raw = result.stdout.strip()[:500]
raw = stdout_text[:2000]
logger.warning("browser '%s' returned non-JSON output (rc=%s): %s",
command, result.returncode, raw[:200])
command, result.returncode, raw[:500])
if command == "screenshot":
stderr_text = (result.stderr or "").strip()
combined_text = "\n".join(
part for part in [stdout_text, stderr_text] if part
)
recovered_path = _extract_screenshot_path_from_text(
combined_text)
if recovered_path and Path(recovered_path).exists():
logger.info(
"browser 'screenshot' recovered file from non-JSON output: %s",
recovered_path,
)
return {
"success": True,
"data": {
"path": recovered_path,
"raw": raw,
},
}
return {
"success": True,
"data": {"raw": raw}
"success": False,
"error": f"Non-JSON output from agent-browser for '{command}': {raw}"
}
# Check for errors
if result.returncode != 0:
error_msg = result.stderr.strip() if result.stderr else f"Command failed with code {result.returncode}"
logger.warning("browser '%s' failed (rc=%s): %s", command, result.returncode, error_msg[:300])
error_msg = result.stderr.strip(
) if result.stderr else f"Command failed with code {result.returncode}"
logger.warning("browser '%s' failed (rc=%s): %s",
command, result.returncode, error_msg[:300])
return {"success": False, "error": error_msg}
return {"success": True, "data": {}}
@@ -1258,38 +1287,18 @@ def browser_close(task_id: Optional[str] = None) -> str:
JSON string with close result
"""
effective_task_id = task_id or "default"
with _cleanup_lock:
had_session = effective_task_id in _active_sessions
# Stop auto-recording before closing
_maybe_stop_recording(effective_task_id)
cleanup_browser(effective_task_id)
result = _run_browser_command(effective_task_id, "close", [])
# Close the backend session (Browserbase API in cloud mode, nothing extra in local mode)
session_key = task_id if task_id and task_id in _active_sessions else "default"
if session_key in _active_sessions:
session_info = _active_sessions[session_key]
bb_session_id = session_info.get("bb_session_id")
if bb_session_id:
# Cloud mode: release the Browserbase session via API
try:
config = _get_browserbase_config()
_close_browserbase_session(bb_session_id, config["api_key"], config["project_id"])
except Exception as e:
logger.warning("Could not close BrowserBase session: %s", e)
del _active_sessions[session_key]
if result.get("success"):
return json.dumps({
"success": True,
"closed": True
}, ensure_ascii=False)
else:
# Even if close fails, session was released
return json.dumps({
"success": True,
"closed": True,
"warning": result.get("error", "Session may not have been active")
}, ensure_ascii=False)
response = {
"success": True,
"closed": True,
}
if not had_session:
response["warning"] = "Session may not have been active"
return json.dumps(response, ensure_ascii=False)
def browser_console(clear: bool = False, task_id: Optional[str] = None) -> str:
@@ -1310,8 +1319,10 @@ def browser_console(clear: bool = False, task_id: Optional[str] = None) -> str:
console_args = ["--clear"] if clear else []
error_args = ["--clear"] if clear else []
console_result = _run_browser_command(effective_task_id, "console", console_args)
errors_result = _run_browser_command(effective_task_id, "errors", error_args)
console_result = _run_browser_command(
effective_task_id, "console", console_args)
errors_result = _run_browser_command(
effective_task_id, "errors", error_args)
messages = []
if console_result.get("success"):
@@ -1344,14 +1355,16 @@ def _maybe_start_recording(task_id: str):
if task_id in _recording_sessions:
return
try:
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
hermes_home = Path(os.environ.get(
"HERMES_HOME", Path.home() / ".hermes"))
config_path = hermes_home / "config.yaml"
record_enabled = False
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
record_enabled = cfg.get("browser", {}).get("record_sessions", False)
record_enabled = cfg.get("browser", {}).get(
"record_sessions", False)
if not record_enabled:
return
@@ -1362,14 +1375,18 @@ def _maybe_start_recording(task_id: str):
import time
timestamp = time.strftime("%Y%m%d_%H%M%S")
recording_path = recordings_dir / f"session_{timestamp}_{task_id[:16]}.webm"
recording_path = recordings_dir / \
f"session_{timestamp}_{task_id[:16]}.webm"
result = _run_browser_command(task_id, "record", ["start", str(recording_path)])
result = _run_browser_command(
task_id, "record", ["start", str(recording_path)])
if result.get("success"):
_recording_sessions.add(task_id)
logger.info("Auto-recording browser session %s to %s", task_id, recording_path)
logger.info("Auto-recording browser session %s to %s",
task_id, recording_path)
else:
logger.debug("Could not start auto-recording: %s", result.get("error"))
logger.debug("Could not start auto-recording: %s",
result.get("error"))
except Exception as e:
logger.debug("Auto-recording setup failed: %s", e)
@@ -1382,7 +1399,8 @@ def _maybe_stop_recording(task_id: str):
result = _run_browser_command(task_id, "record", ["stop"])
if result.get("success"):
path = result.get("data", {}).get("path", "")
logger.info("Saved browser recording for session %s: %s", task_id, path)
logger.info(
"Saved browser recording for session %s: %s", task_id, path)
except Exception as e:
logger.debug("Could not stop recording for %s: %s", task_id, e)
finally:
@@ -1468,11 +1486,11 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
from pathlib import Path
effective_task_id = task_id or "default"
# Save screenshot to persistent location so it can be shared with users
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
screenshots_dir = hermes_home / "browser_screenshots"
screenshot_path = screenshots_dir / f"browser_screenshot_{uuid_mod.uuid4().hex}.png"
screenshot_path = screenshots_dir / \
f"browser_screenshot_{uuid_mod.uuid4().hex}.png"
try:
screenshots_dir.mkdir(parents=True, exist_ok=True)
@@ -1481,9 +1499,11 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
_cleanup_old_screenshots(screenshots_dir, max_age_hours=24)
# Take screenshot using agent-browser
screenshot_args = [str(screenshot_path)]
screenshot_args = []
if annotate:
screenshot_args.insert(0, "--annotate")
screenshot_args.append("--annotate")
screenshot_args.append("--full")
screenshot_args.append(str(screenshot_path))
result = _run_browser_command(
effective_task_id,
"screenshot",
@@ -1499,6 +1519,10 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
"error": f"Failed to take screenshot ({mode} mode): {error_detail}"
}, ensure_ascii=False)
actual_screenshot_path = result.get("data", {}).get("path")
if actual_screenshot_path:
screenshot_path = Path(actual_screenshot_path)
# Check if screenshot file was created
if not screenshot_path.exists():
mode = "local" if _is_local_mode() else "cloud"
@@ -1565,7 +1589,8 @@ def browser_vision(question: str, annotate: bool = False, task_id: Optional[str]
# screenshot loses evidence the user might need. The 24-hour cleanup
# in _cleanup_old_screenshots prevents unbounded disk growth.
logger.warning("browser_vision failed: %s", e, exc_info=True)
error_info = {"success": False, "error": f"Error during vision analysis: {str(e)}"}
error_info = {"success": False,
"error": f"Error during vision analysis: {str(e)}"}
if screenshot_path.exists():
error_info["screenshot_path"] = str(screenshot_path)
error_info["note"] = "Screenshot was captured but vision analysis failed. You can still share it via MEDIA:<path>."
@@ -1600,7 +1625,8 @@ def _cleanup_old_recordings(max_age_hours=72):
"""Remove browser recordings older than max_age_hours to prevent disk bloat."""
import time
try:
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
hermes_home = Path(os.environ.get(
"HERMES_HOME", Path.home() / ".hermes"))
recordings_dir = hermes_home / "browser_recordings"
if not recordings_dir.exists():
return
@@ -1650,10 +1676,12 @@ def _close_browserbase_session(session_id: str, api_key: str, project_id: str) -
)
if response.status_code in (200, 201, 204):
logger.debug("Successfully closed BrowserBase session %s", session_id)
logger.debug(
"Successfully closed BrowserBase session %s", session_id)
return True
else:
logger.warning("Failed to close session %s: HTTP %s - %s", session_id, response.status_code, response.text[:200])
logger.warning("Failed to close session %s: HTTP %s - %s",
session_id, response.status_code, response.text[:200])
return False
except Exception as e:
@@ -1684,7 +1712,8 @@ def cleanup_browser(task_id: Optional[str] = None) -> None:
if session_info:
bb_session_id = session_info.get("bb_session_id", "unknown")
logger.debug("Found session for task %s: bb_session_id=%s", task_id, bb_session_id)
logger.debug("Found session for task %s: bb_session_id=%s",
task_id, bb_session_id)
# Stop auto-recording before closing (saves the file)
_maybe_stop_recording(task_id)
@@ -1692,9 +1721,11 @@ def cleanup_browser(task_id: Optional[str] = None) -> None:
# Try to close via agent-browser first (needs session in _active_sessions)
try:
_run_browser_command(task_id, "close", [], timeout=10)
logger.debug("agent-browser close command completed for task %s", task_id)
logger.debug(
"agent-browser close command completed for task %s", task_id)
except Exception as e:
logger.warning("agent-browser close failed for task %s: %s", task_id, e)
logger.warning(
"agent-browser close failed for task %s: %s", task_id, e)
# Now remove from tracking under lock
with _cleanup_lock:
@@ -1705,16 +1736,20 @@ def cleanup_browser(task_id: Optional[str] = None) -> None:
if bb_session_id and not _is_local_mode():
try:
config = _get_browserbase_config()
success = _close_browserbase_session(bb_session_id, config["api_key"], config["project_id"])
success = _close_browserbase_session(
bb_session_id, config["api_key"], config["project_id"])
if not success:
logger.warning("Could not close BrowserBase session %s", bb_session_id)
logger.warning(
"Could not close BrowserBase session %s", bb_session_id)
except Exception as e:
logger.error("Exception during BrowserBase session close: %s", e)
logger.error(
"Exception during BrowserBase session close: %s", e)
# Kill the daemon process and clean up socket directory
session_name = session_info.get("session_name", "")
if session_name:
socket_dir = os.path.join(_socket_safe_tmpdir(), f"agent-browser-{session_name}")
socket_dir = os.path.join(
_socket_safe_tmpdir(), f"agent-browser-{session_name}")
if os.path.exists(socket_dir):
# agent-browser writes {session}.pid in the socket dir
pid_file = os.path.join(socket_dir, f"{session_name}.pid")
@@ -1722,9 +1757,11 @@ def cleanup_browser(task_id: Optional[str] = None) -> None:
try:
daemon_pid = int(Path(pid_file).read_text().strip())
os.kill(daemon_pid, signal.SIGTERM)
logger.debug("Killed daemon pid %s for %s", daemon_pid, session_name)
logger.debug("Killed daemon pid %s for %s",
daemon_pid, session_name)
except (ProcessLookupError, ValueError, PermissionError, OSError):
logger.debug("Could not kill daemon pid for %s (already dead or inaccessible)", session_name)
logger.debug(
"Could not kill daemon pid for %s (already dead or inaccessible)", session_name)
shutil.rmtree(socket_dir, ignore_errors=True)
logger.debug("Removed task %s from active sessions", task_id)
@@ -1811,7 +1848,8 @@ if __name__ == "__main__":
_find_agent_browser()
except FileNotFoundError:
print(" - agent-browser CLI not found")
print(" Install: npm install -g agent-browser && agent-browser install --with-deps")
print(
" Install: npm install -g agent-browser && agent-browser install --with-deps")
if not _is_local_mode():
if not os.environ.get("BROWSERBASE_API_KEY"):
print(" - BROWSERBASE_API_KEY not set (required for cloud mode)")
@@ -1832,7 +1870,6 @@ if __name__ == "__main__":
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
from tools.registry import registry
_BROWSER_SCHEMA_MAP = {s["name"]: s for s in BROWSER_TOOL_SCHEMAS}
@@ -1840,7 +1877,8 @@ registry.register(
name="browser_navigate",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_navigate"],
handler=lambda args, **kw: browser_navigate(url=args.get("url", ""), task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_navigate(
url=args.get("url", ""), task_id=kw.get("task_id")),
check_fn=check_browser_requirements,
)
registry.register(
@@ -1855,7 +1893,8 @@ registry.register(
name="browser_click",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_click"],
handler=lambda args, **kw: browser_click(**args, task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_click(**
args, task_id=kw.get("task_id")),
check_fn=check_browser_requirements,
)
registry.register(
@@ -1869,7 +1908,8 @@ registry.register(
name="browser_scroll",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_scroll"],
handler=lambda args, **kw: browser_scroll(**args, task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_scroll(**
args, task_id=kw.get("task_id")),
check_fn=check_browser_requirements,
)
registry.register(
@@ -1883,7 +1923,8 @@ registry.register(
name="browser_press",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_press"],
handler=lambda args, **kw: browser_press(key=args.get("key", ""), task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_press(
key=args.get("key", ""), task_id=kw.get("task_id")),
check_fn=check_browser_requirements,
)
registry.register(
@@ -1904,13 +1945,20 @@ registry.register(
name="browser_vision",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_vision"],
handler=lambda args, **kw: browser_vision(question=args.get("question", ""), annotate=args.get("annotate", False), task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_vision(
question=args.get("question", ""),
annotate=args.get("annotate", False),
task_id=kw.get("task_id"),
),
check_fn=check_browser_requirements,
)
registry.register(
name="browser_console",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_console"],
handler=lambda args, **kw: browser_console(clear=args.get("clear", False), task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_console(
clear=args.get("clear", False),
task_id=kw.get("task_id"),
),
check_fn=check_browser_requirements,
)