Compare commits

...

5 Commits

Author SHA1 Message Date
kshitijk4poor
2af14bd401 fix: self-review findings — logging, create_task, get_running_loop, benchmark path
Self-review findings addressed:

- browser_tool.py: log swallowed supervisor error at DEBUG instead of bare
  'pass' (was silent, triggered F841 for unused 'exc' variable). Renamed
  to '_exc' to signal intentional discard.
- browser_tool.py: rename unused 'press_id' to '_press_id' in both normal
  and retry paths (mouseReleased-only wait is intentional; press_id is never
  used after send).
- browser_tool.py: get_event_loop() → get_running_loop() in 3 locations
  inside _cdp_resolve_session and _cdp_coordinate_click_async. Both are
  async functions and get_event_loop() is deprecated in async context in
  Python 3.10+.
- browser_supervisor.py: ensure_future → create_task in dispatch_mouse_click.
  create_task is the correct modern API when already inside a running
  coroutine; ensure_future is deprecated for coroutines in Python 3.10+.
  Also consistent with the rest of browser_supervisor.py which uses
  create_task exclusively everywhere else.
- scripts/benchmark_click_paths.py: replace hardcoded /private/tmp/hermes-
  coord-click sys.path hack with __file__-relative repo root detection so
  the script works from any checkout location.

27/27 tests pass.
2026-05-07 10:49:06 +05:30
kshitijk4poor
aef97da6d4 perf: reuse supervisor's persistent WS for coordinate clicks (23x speedup)
The CDPSupervisor (browser_supervisor.py) already maintains a persistent
WebSocket connection per task_id for dialog detection and frame tracking.
After browser_navigate(), a supervisor is always running with an open WS.
Instead of opening a new connection per click, dispatch directly on it.

Changes:
- browser_supervisor.py: add CDPSupervisor.dispatch_mouse_click() — sync
  bridge onto the supervisor's asyncio loop via run_coroutine_threadsafe.
  Pipelines mousePressed + mouseReleased via asyncio.gather (Playwright
  Promise.all pattern), no serial round-trips.
- browser_tool.py: _cdp_coordinate_click() now checks
  SUPERVISOR_REGISTRY.get(task_id) first; falls back to per-click WS
  connect if no supervisor is running (e.g. raw CDP without navigate).

Dispatch priority (fastest first):
  1. Supervisor path  — zero WS connection cost (supervisor WS already open)
  2. Warm-cache path  — 1 WS open + 2 mouse events (session cached)
  3. Cold-cache path  — 1 WS open + getTargets + attachToTarget + 2 events
  4. agent-browser    — 3 subprocess IPC calls (no CDP endpoint configured)

Benchmark vs real Lightpanda WS at ws://127.0.0.1:63372/ (300 iterations):
  Baseline   (3 connections):          4.86ms mean
  Warm cache (1 conn + cache):         1.30ms mean   (3.74x)
  Supervisor (persistent WS):          0.20ms mean  (23.75x)
  Ref-click IPC baseline:              0.14ms mean  (parity)

The supervisor path is 1.5x ref-click (0.07ms overhead) — essentially
the cost of one cross-thread future dispatch.

27/27 tests pass (+3 new TestSupervisorPath tests).
2026-05-07 10:28:48 +05:30
kshitijk4poor
451c55bd9c perf: session ID caching + skip mousePressed ack (browser-harness/Playwright patterns)
Two additional optimizations from researching Playwright, Puppeteer, and
browser-harness source:

SESSION ID CACHING (browser-harness daemon pattern)
  Target.getTargets + Target.attachToTarget are stable across clicks on the
  same page. Cache the resolved session_id keyed by CDP endpoint URL.
  Subsequent clicks skip straight to mousePressed+mouseReleased with no
  session negotiation overhead.

  Self-healing: on 'Session with given id not found' (stale after navigation),
  the cache is invalidated and session resolution runs once before retrying.
  This matches the exact retry pattern from browser-harness's daemon.handle().

SKIP mousePressed ACK (Playwright Promise.all pattern)
  Browser processes CDP messages sequentially within a session. If
  mouseReleased is acknowledged, mousePressed was already processed.
  We skip waiting for the press ack entirely, saving one RTT. This is
  the same pattern as Playwright's Mouse.click() using Promise.all and
  Puppeteer's concurrent down+up dispatch.

COMPRESSION=NONE (Puppeteer NodeWebSocketTransport pattern)
  Small CDP messages (Input.dispatchMouseEvent payloads are ~80 bytes)
  don't benefit from per-message compression. Disable it explicitly.
  Puppeteer uses perMessageDeflate: false for the same reason.

Benchmark vs real Lightpanda WS (300 iterations):
  Baseline (3 connections):       3.28ms mean
  Optimized cold cache (1 conn):  1.17ms mean  (2.79x speedup)
  Optimized warm cache (1 conn):  1.17ms mean  (2.82x speedup)

The cold/warm delta is <0.01ms because getTargets+attachToTarget on an
already-open socket costs almost nothing on localhost — the dominant cost
is WS connection setup, which we eliminated in the previous commit.
The session cache still removes real work (2 CDP round-trips) and prevents
accumulating latency on remote/higher-latency CDP endpoints.

Tests: 24 passed (21 existing + 3 new session caching tests)
2026-05-07 10:17:29 +05:30
kshitijk4poor
0bfab1d361 perf: batch CDP click into single WS connection (2.4x speedup)
Replace the 3-separate-_cdp_call() approach (one WS connection per
message) with a single _cdp_coordinate_click_async() coroutine that
opens the WebSocket once and sequences all CDP messages on it:

  1. Target.getTargets
  2. Target.attachToTarget (if page target found)
  3. Input.dispatchMouseEvent (mousePressed)  } pipelined — both sent
  4. Input.dispatchMouseEvent (mouseReleased) } before awaiting either

Benchmark vs real Lightpanda WS at ws://127.0.0.1:63372/ (300 iters):

  Baseline  (current main, 3 connections): 3.14ms mean, 2.97ms median
  Optimized (this commit, 1 connection):   1.30ms mean, 1.11ms median
  Speedup: 2.42x mean, 2.68x median, 1.62x p95

The savings come entirely from eliminating 2 TCP+WS handshakes.
mousePressed + mouseReleased are pipelined on the same connection,
so they travel in the same network burst.

21/21 tests pass.
2026-05-07 10:04:38 +05:30
kshitijk4poor
ff8c6f2d64 feat: add compositor-level coordinate click to browser_click
Add optional x/y parameters to browser_click for viewport-coordinate
clicking via CDP Input.dispatchMouseEvent. When coordinates are provided,
clicks are dispatched at the browser compositor level — Chrome does its own
hit-testing, bypassing DOM selectors entirely.

Use cases where ref-based click fails but coordinate click works:
- Cross-origin iframes (OOPIFs)
- Closed shadow DOM
- Canvas/WebGL elements
- Dynamic overlays where the snapshot may be stale

Implementation:
- CDP path (preferred): Input.dispatchMouseEvent via WebSocket
  (Target.getTargets + mousePressed + mouseReleased)
- agent-browser fallback: mouse move/down/up when no CDP endpoint available
- ref is no longer required — either ref OR x+y must be provided

Benchmark (real Lightpanda WS at ws://127.0.0.1:63372, 200 iterations):
  CDP coord click:         3.71ms mean (2.97ms median, 2.61ms min, 7.01ms p95)
  Single WS conn baseline: 1.57ms mean (cost per connection open+call)
  agent-browser IPC:       0.20ms mean per HTTP call

The 3.71ms per CDP click comes from 3 sequential fresh WS connections
(pre-existing architecture in browser_cdp_tool.py). A persistent WS
connection pool would bring this to ~3.1ms (just the 2 mouse events).
Both paths are well under the 100ms human perception threshold.

Files:
- tools/browser_tool.py: schema update (x/y, ref no longer required),
  _cdp_coordinate_click(), _coordinate_click_via_agent_browser(),
  updated browser_click() with validation and dispatch
- tests/tools/test_browser_coordinate_click.py: 21 tests covering
  validation, CDP path, fallback path, ref preservation, schema, registry
- scripts/benchmark_click_paths.py: real-browser latency benchmark
2026-05-07 09:57:44 +05:30
4 changed files with 1341 additions and 7 deletions

View File

@@ -0,0 +1,296 @@
"""
Benchmark: Current main (3 separate WS connections) vs optimized (1 connection).
Compares the two CDP coordinate click implementations against a real
Lightpanda WebSocket at ws://127.0.0.1:63372/.
- Baseline (current main style): 3 separate _cdp_call() invocations, each
opening a fresh WS connection (Target.getTargets, mousePressed, mouseReleased)
- Optimized (this PR): single WS connection with all 4 messages pipelined
(getTargets + attachToTarget + mousePressed+mouseReleased in one burst)
Also measures the agent-browser HTTP IPC round-trip as a reference point
for how fast the existing ref-based click path is.
Usage:
python scripts/benchmark_click_paths.py
python scripts/benchmark_click_paths.py --iterations 300 --warmup 20
"""
from __future__ import annotations
import argparse
import asyncio
import json
import sys
import time
import urllib.request
from statistics import mean, median, stdev
from typing import List, Dict, Optional, Tuple
import os
# Add repo root to sys.path when running this script directly
_repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
LIGHTPANDA_WS = "ws://127.0.0.1:63372/"
AGENT_BROWSER_PORT = 63371
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _stats(times_s: List[float]) -> Dict:
ms = [t * 1000 for t in times_s]
return {
"mean_ms": mean(ms),
"median_ms": median(ms),
"min_ms": min(ms),
"max_ms": max(ms),
"stdev_ms": stdev(ms) if len(ms) > 1 else 0.0,
"p95_ms": sorted(ms)[int(len(ms) * 0.95)],
}
def _bench(fn, warmup: int, n: int) -> Tuple[List[float], int]:
for _ in range(warmup):
fn()
times, errors = [], 0
for _ in range(n):
t0 = time.perf_counter()
try:
result = fn()
elapsed = time.perf_counter() - t0
if isinstance(result, str):
d = json.loads(result)
if not d.get("success"):
errors += 1
except Exception:
elapsed = time.perf_counter() - t0
errors += 1
times.append(elapsed)
return times, errors
def _row(label: str, stats: Dict, col_w: int = 9) -> None:
print(
f" {label:<46} "
f"{stats['mean_ms']:>{col_w}.2f} "
f"{stats['median_ms']:>{col_w}.2f} "
f"{stats['min_ms']:>{col_w}.2f} "
f"{stats['p95_ms']:>{col_w}.2f} "
f"{stats['max_ms']:>{col_w}.2f} ms"
)
# ---------------------------------------------------------------------------
# The "current main" approach — 3 separate _cdp_call() connections
# ---------------------------------------------------------------------------
def _baseline_cdp_click(endpoint: str, x: int, y: int, button: str = "left") -> str:
"""Replicate the previous 3-connection approach from the original PR."""
from tools.browser_cdp_tool import _cdp_call, _run_async
try:
targets_result = _run_async(_cdp_call(endpoint, "Target.getTargets", {}, None, 10.0))
page_target = None
for t in targets_result.get("targetInfos", []):
if t.get("type") == "page" and t.get("attached", True):
page_target = t["targetId"]
break
except Exception:
page_target = None
mouse_params = {"type": "", "x": x, "y": y, "button": button, "clickCount": 1}
try:
_run_async(_cdp_call(endpoint, "Input.dispatchMouseEvent",
{**mouse_params, "type": "mousePressed"}, page_target, 10.0))
_run_async(_cdp_call(endpoint, "Input.dispatchMouseEvent",
{**mouse_params, "type": "mouseReleased"}, page_target, 10.0))
except Exception as e:
return json.dumps({"success": False, "error": str(e)})
return json.dumps({"success": True, "clicked_at": {"x": x, "y": y}, "method": "baseline"})
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def run_benchmark(iterations: int = 300, warmup: int = 20) -> None:
print(f"\n{'=' * 78}")
print(f" browser_click Coordinate Click: Current Main vs Optimized (1-conn)")
print(f" Real Lightpanda WS: {LIGHTPANDA_WS}")
print(f"{'=' * 78}")
print(f" Iterations: {iterations} | Warmup: {warmup}")
# pre-flight
try:
with urllib.request.urlopen("http://127.0.0.1:63372/json/version", timeout=2) as r:
info = json.loads(r.read())
assert "webSocketDebuggerUrl" in info
print(f" ✓ Lightpanda CDP: {info.get('webSocketDebuggerUrl')}")
except Exception as e:
print(f" ✗ Lightpanda not reachable: {e}")
return
try:
with urllib.request.urlopen(f"http://127.0.0.1:{AGENT_BROWSER_PORT}/api/sessions", timeout=2) as r:
sessions = json.loads(r.read())
print(f" ✓ agent-browser: {len(sessions)} session(s)")
ab_ok = True
except Exception:
print(f" ⚠ agent-browser not reachable — ref-click IPC baseline skipped")
ab_ok = False
import importlib
import tools.browser_tool as bt
import tools.browser_cdp_tool as cdp_mod
importlib.reload(cdp_mod)
importlib.reload(bt)
bt._is_camofox_mode = lambda: False
_orig_resolve = cdp_mod._resolve_cdp_endpoint
# -----------------------------------------------------------------------
# 1. Baseline: current-main 3-connection approach
# -----------------------------------------------------------------------
print(f"\n [1/4] Baseline (current main — 3 separate WS connections per click)")
print(f" Warmup {warmup}, then {iterations} iterations...")
base_times, base_err = _bench(
lambda: _baseline_cdp_click(LIGHTPANDA_WS, 150, 200),
warmup, iterations,
)
base_stats = _stats(base_times)
print(f" Done — {base_err} errors, mean={base_stats['mean_ms']:.2f}ms")
# -----------------------------------------------------------------------
# 2. Optimized: single-connection — cold cache (session resolve included)
# -----------------------------------------------------------------------
print(f"\n [2/4] Optimized — cold cache (1 WS conn, includes getTargets+attachToTarget)")
print(f" {iterations} iterations, cache cleared before each...")
def _cold_click():
bt._CDP_SESSION_CACHE.clear()
return bt.browser_click(x=150.0, y=200.0, task_id="bench")
cdp_mod._resolve_cdp_endpoint = lambda: LIGHTPANDA_WS
# Temporarily null out supervisor registry so this test isolates path 2
import tools.browser_supervisor as sup_mod
_orig_registry_get = sup_mod.SUPERVISOR_REGISTRY.get
sup_mod.SUPERVISOR_REGISTRY.get = lambda tid: None
cold_times, cold_err = _bench(_cold_click, warmup=0, n=iterations)
cold_stats = _stats(cold_times)
print(f" Done — {cold_err} errors, mean={cold_stats['mean_ms']:.2f}ms")
# -----------------------------------------------------------------------
# 3. Optimized: warm cache (session cached — skips getTargets+attachToTarget)
# -----------------------------------------------------------------------
print(f"\n [3/4] Optimized — warm cache (1 WS conn, skips getTargets+attachToTarget)")
print(f" Warmup {warmup} (fills cache), then {iterations} iterations...")
bt._CDP_SESSION_CACHE.clear()
opt_times, opt_err = _bench(
lambda: bt.browser_click(x=150.0, y=200.0, task_id="bench"),
warmup, iterations,
)
sup_mod.SUPERVISOR_REGISTRY.get = _orig_registry_get
cdp_mod._resolve_cdp_endpoint = _orig_resolve
opt_stats = _stats(opt_times)
print(f" Done — {opt_err} errors, mean={opt_stats['mean_ms']:.2f}ms")
# -----------------------------------------------------------------------
# 4. Supervisor path: real CDPSupervisor with persistent WS
# -----------------------------------------------------------------------
print(f"\n [4/4] Supervisor path (persistent WS — zero per-click connection cost)")
print(f" Starting supervisor → {LIGHTPANDA_WS}...")
sup_stats = None
sup_err_count = 0
try:
supervisor = sup_mod.CDPSupervisor.__new__(sup_mod.CDPSupervisor)
# minimal init — we only need _loop, _ws, _page_session_id, _state_lock,
# _pending_calls, _next_call_id, _active, _stop_requested
# Use SUPERVISOR_REGISTRY.get_or_start for a fully initialized supervisor
TASK_ID = "bench-supervisor"
real_sup = sup_mod.SUPERVISOR_REGISTRY.get_or_start(TASK_ID, LIGHTPANDA_WS)
import time as _time
# Give supervisor time to connect and attach
for _ in range(20):
snap = real_sup.snapshot()
if snap.active:
break
_time.sleep(0.1)
if not real_sup.snapshot().active:
print(f" ⚠ Supervisor did not become active — skipping")
else:
print(f" ✓ Supervisor active, warmup {warmup}...")
def _sup_click():
real_sup.dispatch_mouse_click(150, 200)
return json.dumps({"success": True})
for _ in range(warmup):
_sup_click()
print(f" Running {iterations} iterations...")
sup_times, sup_err_count = _bench(_sup_click, warmup=0, n=iterations)
sup_stats = _stats(sup_times)
print(f" Done — {sup_err_count} errors, mean={sup_stats['mean_ms']:.2f}ms")
sup_mod.SUPERVISOR_REGISTRY.stop(TASK_ID)
except Exception as e:
print(f" ⚠ Supervisor benchmark failed: {e}")
# -----------------------------------------------------------------------
# Ref baseline
# -----------------------------------------------------------------------
if ab_ok:
print(f"\n [ref] agent-browser HTTP IPC (ref-click latency baseline)")
ab_times = []
for _ in range(warmup):
urllib.request.urlopen(f"http://127.0.0.1:{AGENT_BROWSER_PORT}/api/sessions", timeout=5).read()
for _ in range(iterations):
t0 = time.perf_counter()
urllib.request.urlopen(f"http://127.0.0.1:{AGENT_BROWSER_PORT}/api/sessions", timeout=5).read()
ab_times.append(time.perf_counter() - t0)
ab_stats = _stats(ab_times)
print(f" Done — mean={ab_stats['mean_ms']:.2f}ms")
# -----------------------------------------------------------------------
# Results
# -----------------------------------------------------------------------
col_w = 9
print(f"\n{'' * 82}")
print(f" {'Approach':<50} {'Mean':>{col_w}} {'Median':>{col_w}} {'Min':>{col_w}} {'p95':>{col_w}}")
print(f"{'' * 82}")
_row("Baseline (3 WS connections, sequential) ", base_stats, col_w)
_row("Optimized — cold cache (1 conn + negotiate) ", cold_stats, col_w)
_row("Optimized — warm cache (1 conn, skip resolve) ", opt_stats, col_w)
if sup_stats:
_row("Supervisor (persistent WS, zero conn cost) ", sup_stats, col_w)
if ab_ok:
_row("Ref-click IPC baseline (1 HTTP req) ", ab_stats, col_w)
print(f"{'' * 82}")
print(f"\n Speedups (mean vs baseline):")
print(f" Cold cache: {base_stats['mean_ms'] / cold_stats['mean_ms']:.2f}x ({base_stats['mean_ms'] - cold_stats['mean_ms']:.2f} ms saved)")
print(f" Warm cache: {base_stats['mean_ms'] / opt_stats['mean_ms']:.2f}x ({base_stats['mean_ms'] - opt_stats['mean_ms']:.2f} ms saved)")
if sup_stats:
print(f" Supervisor: {base_stats['mean_ms'] / sup_stats['mean_ms']:.2f}x ({base_stats['mean_ms'] - sup_stats['mean_ms']:.2f} ms saved)")
print(f" Warm→Supervisor additional gain: {opt_stats['mean_ms'] - sup_stats['mean_ms']:.2f} ms (WS conn eliminated)")
if ab_ok and sup_stats:
print(f" Supervisor vs ref-click: {sup_stats['mean_ms'] / ab_stats['mean_ms']:.1f}x (+{sup_stats['mean_ms'] - ab_stats['mean_ms']:.2f} ms)")
print(f"\n Optimization tiers in this PR:")
print(f" 1. Single WS connection — eliminates 2 TCP+WS handshakes")
print(f" 2. mouseReleased-only wait — skips redundant press ack (Playwright)")
print(f" 3. Session ID cache — skips getTargets+attachToTarget")
print(f" 4. Supervisor reuse (new) — eliminates the WS open entirely")
print(f" Active after browser_navigate; falls back to warm-cache path if absent.")
print()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--iterations", type=int, default=300)
parser.add_argument("--warmup", type=int, default=20)
args = parser.parse_args()
run_benchmark(iterations=args.iterations, warmup=args.warmup)

View File

@@ -0,0 +1,648 @@
"""Tests for compositor-level coordinate click (browser_click with x/y params).
Covers:
- Input validation (ref vs x/y mutually exclusive)
- CDP coordinate click path (via mock CDP server)
- agent-browser mouse fallback path
- Camofox passthrough still works with ref
"""
from __future__ import annotations
import asyncio
import json
import threading
from typing import Any, Dict, List
import pytest
import websockets
from websockets.asyncio.server import serve
# ---------------------------------------------------------------------------
# In-process CDP mock server (reused from test_browser_cdp_tool.py)
# ---------------------------------------------------------------------------
class _CDPServer:
"""Tiny CDP mock — replies to registered method handlers."""
def __init__(self) -> None:
self._handlers: Dict[str, Any] = {}
self._responses: List[Dict[str, Any]] = []
self._loop: asyncio.AbstractEventLoop | None = None
self._server: Any = None
self._thread: threading.Thread | None = None
self._host = "127.0.0.1"
self._port = 0
self._url: str = ""
def on(self, method: str, handler):
self._handlers[method] = handler
def start(self) -> str:
ready = threading.Event()
def _run() -> None:
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
async def _handler(ws):
try:
async for raw in ws:
msg = json.loads(raw)
call_id = msg.get("id")
method = msg.get("method", "")
params = msg.get("params", {}) or {}
session_id = msg.get("sessionId")
self._responses.append(msg)
fn = self._handlers.get(method)
if fn is None:
reply = {
"id": call_id,
"error": {"code": -32601, "message": f"No handler for {method}"},
}
else:
try:
result = fn(params, session_id)
reply = {"id": call_id, "result": result}
except Exception as exc:
reply = {"id": call_id, "error": {"code": -1, "message": str(exc)}}
if session_id:
reply["sessionId"] = session_id
await ws.send(json.dumps(reply))
except websockets.exceptions.ConnectionClosed:
pass
async def _serve() -> None:
self._server = await serve(_handler, self._host, 0)
sock = next(iter(self._server.sockets))
self._port = sock.getsockname()[1]
ready.set()
await self._server.wait_closed()
try:
self._loop.run_until_complete(_serve())
finally:
self._loop.close()
self._thread = threading.Thread(target=_run, daemon=True)
self._thread.start()
if not ready.wait(timeout=5.0):
raise RuntimeError("CDP mock server failed to start")
self._url = f"ws://{self._host}:{self._port}/devtools/browser/mock"
return self._url
def stop(self) -> None:
if self._loop and self._server:
self._loop.call_soon_threadsafe(self._server.close)
if self._thread:
self._thread.join(timeout=3.0)
def received(self) -> List[Dict[str, Any]]:
return list(self._responses)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def cdp_server(monkeypatch):
"""Start a CDP mock and point browser_cdp_tool's resolver at it."""
server = _CDPServer()
ws_url = server.start()
import tools.browser_cdp_tool as cdp_mod
monkeypatch.setattr(cdp_mod, "_resolve_cdp_endpoint", lambda: ws_url)
# clear the session cache so each test starts fresh
from tools import browser_tool as _bt
_bt._CDP_SESSION_CACHE.clear()
try:
yield server
finally:
_bt._CDP_SESSION_CACHE.clear()
server.stop()
# ---------------------------------------------------------------------------
# Input validation
# ---------------------------------------------------------------------------
class TestClickInputValidation:
"""browser_click validates that exactly one of ref / (x,y) is provided."""
def test_neither_ref_nor_coords(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click())
assert result["success"] is False
assert "ref" in result["error"].lower() or "x" in result["error"].lower()
def test_both_ref_and_coords(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click(ref="@e1", x=100, y=200))
assert result["success"] is False
assert "not both" in result["error"].lower()
def test_x_without_y(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click(x=100))
assert result["success"] is False
assert "both" in result["error"].lower()
def test_y_without_x(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click(y=200))
assert result["success"] is False
assert "both" in result["error"].lower()
def test_empty_ref_treated_as_missing(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click(ref=""))
assert result["success"] is False
assert "ref" in result["error"].lower() or "x" in result["error"].lower()
def test_non_numeric_coordinates(self):
from tools.browser_tool import browser_click
result = json.loads(browser_click(x="abc", y="def"))
assert result["success"] is False
assert "number" in result["error"].lower()
# ---------------------------------------------------------------------------
# CDP coordinate click (happy path via mock server)
# ---------------------------------------------------------------------------
class TestCDPCoordinateClick:
"""Coordinate clicks via CDP Input.dispatchMouseEvent."""
def test_cdp_click_dispatches_press_and_release(self, cdp_server):
from tools.browser_tool import browser_click
# Register handlers for the protocol calls
cdp_server.on(
"Target.getTargets",
lambda p, s: {
"targetInfos": [
{"targetId": "page-1", "type": "page", "attached": True, "url": "https://example.com"},
]
},
)
cdp_server.on(
"Target.attachToTarget",
lambda p, s: {"sessionId": f"sess-{p['targetId']}"},
)
cdp_server.on(
"Input.dispatchMouseEvent",
lambda p, s: {},
)
result = json.loads(browser_click(x=150, y=300))
assert result["success"] is True
assert result["clicked_at"] == {"x": 150, "y": 300}
assert result["method"] == "cdp_compositor"
# Verify the CDP calls: Target.getTargets, attach, mousePressed, attach, mouseReleased
calls = cdp_server.received()
methods = [c["method"] for c in calls]
assert "Target.getTargets" in methods
assert "Input.dispatchMouseEvent" in methods
# Find the mouse events
mouse_events = [c for c in calls if c["method"] == "Input.dispatchMouseEvent"]
assert len(mouse_events) == 2
assert mouse_events[0]["params"]["type"] == "mousePressed"
assert mouse_events[0]["params"]["x"] == 150
assert mouse_events[0]["params"]["y"] == 300
assert mouse_events[0]["params"]["button"] == "left"
assert mouse_events[1]["params"]["type"] == "mouseReleased"
def test_cdp_click_rounds_float_coordinates(self, cdp_server):
from tools.browser_tool import browser_click
cdp_server.on(
"Target.getTargets",
lambda p, s: {"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]},
)
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "s1"})
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
result = json.loads(browser_click(x=150.7, y=299.3))
assert result["success"] is True
assert result["clicked_at"] == {"x": 151, "y": 299}
def test_cdp_click_no_page_target_still_works(self, cdp_server):
"""When Target.getTargets returns no page targets, click proceeds without target_id."""
from tools.browser_tool import browser_click
cdp_server.on(
"Target.getTargets",
lambda p, s: {"targetInfos": [{"targetId": "sw1", "type": "service_worker"}]},
)
# No Target.attachToTarget needed — page_target is None so _cdp_call
# sends without attaching
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
result = json.loads(browser_click(x=50, y=50))
assert result["success"] is True
assert result["clicked_at"] == {"x": 50, "y": 50}
def test_cdp_dispatch_mouse_event_failure(self, cdp_server):
"""When Input.dispatchMouseEvent returns a CDP error, return failure."""
from tools.browser_tool import browser_click
cdp_server.on(
"Target.getTargets",
lambda p, s: {"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]},
)
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "s1"})
# No handler for Input.dispatchMouseEvent — server returns CDP error
result = json.loads(browser_click(x=100, y=200))
assert result["success"] is False
assert "CDP coordinate click failed" in result["error"]
# ---------------------------------------------------------------------------
# agent-browser mouse fallback
# ---------------------------------------------------------------------------
class TestAgentBrowserMouseFallback:
"""When no CDP endpoint is available, fall back to agent-browser mouse commands."""
def test_falls_back_to_agent_browser_mouse(self, monkeypatch):
from tools import browser_tool, browser_cdp_tool
# No CDP endpoint available
monkeypatch.setattr(browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "")
# Mock _run_browser_command and _last_session_key
commands_sent = []
def mock_run_cmd(task_id, command, args=None, timeout=None):
commands_sent.append((command, args))
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
result = json.loads(browser_tool.browser_click(x=200, y=400))
assert result["success"] is True
assert result["clicked_at"] == {"x": 200, "y": 400}
assert result["method"] == "agent_browser_mouse"
# Should have sent: mouse move, mouse down, mouse up
assert len(commands_sent) == 3
assert commands_sent[0] == ("mouse", ["move", "200", "400"])
assert commands_sent[1] == ("mouse", ["down"])
assert commands_sent[2] == ("mouse", ["up"])
def test_mouse_move_failure_returns_error(self, monkeypatch):
from tools import browser_tool, browser_cdp_tool
monkeypatch.setattr(browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "")
def mock_run_cmd(task_id, command, args=None, timeout=None):
if args and args[0] == "move":
return {"success": False, "error": "mouse move not supported"}
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
result = json.loads(browser_tool.browser_click(x=100, y=100))
assert result["success"] is False
assert "mouse move" in result["error"]
def test_mouse_down_failure_returns_error(self, monkeypatch):
from tools import browser_tool, browser_cdp_tool
monkeypatch.setattr(browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "")
def mock_run_cmd(task_id, command, args=None, timeout=None):
if args and args[0] == "down":
return {"success": False, "error": "mouse down failed"}
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
result = json.loads(browser_tool.browser_click(x=100, y=100))
assert result["success"] is False
assert "mouse down" in result["error"]
def test_mouse_up_failure_returns_error(self, monkeypatch):
from tools import browser_tool, browser_cdp_tool
monkeypatch.setattr(browser_cdp_tool, "_resolve_cdp_endpoint", lambda: "")
def mock_run_cmd(task_id, command, args=None, timeout=None):
if args and args[0] == "up":
return {"success": False, "error": "mouse up failed"}
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
result = json.loads(browser_tool.browser_click(x=100, y=100))
assert result["success"] is False
assert "mouse up" in result["error"]
# ---------------------------------------------------------------------------
# Ref-based click unchanged
# ---------------------------------------------------------------------------
class TestRefClickPreserved:
"""Existing ref-based click behavior is unchanged."""
def test_ref_click_still_works(self, monkeypatch):
from tools import browser_tool
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
def mock_run_cmd(task_id, command, args=None, timeout=None):
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
result = json.loads(browser_tool.browser_click(ref="@e5"))
assert result["success"] is True
assert result["clicked"] == "@e5"
def test_ref_without_at_prefix_auto_added(self, monkeypatch):
from tools import browser_tool
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
clicked_refs = []
def mock_run_cmd(task_id, command, args=None, timeout=None):
clicked_refs.append(args)
return {"success": True}
monkeypatch.setattr(browser_tool, "_run_browser_command", mock_run_cmd)
browser_tool.browser_click(ref="e12")
assert clicked_refs[0] == ["@e12"]
# ---------------------------------------------------------------------------
# Schema check
# ---------------------------------------------------------------------------
class TestSchemaUpdated:
"""The tool schema reflects x/y params and ref is no longer required."""
def test_schema_has_x_y_properties(self):
from tools.browser_tool import _BROWSER_SCHEMA_MAP
schema = _BROWSER_SCHEMA_MAP["browser_click"]
props = schema["parameters"]["properties"]
assert "x" in props
assert "y" in props
assert props["x"]["type"] == "number"
assert props["y"]["type"] == "number"
def test_schema_no_required_fields(self):
from tools.browser_tool import _BROWSER_SCHEMA_MAP
schema = _BROWSER_SCHEMA_MAP["browser_click"]
# ref is no longer required — either ref or x+y
assert "required" not in schema["parameters"] or schema["parameters"]["required"] == []
def test_schema_ref_still_present(self):
from tools.browser_tool import _BROWSER_SCHEMA_MAP
schema = _BROWSER_SCHEMA_MAP["browser_click"]
assert "ref" in schema["parameters"]["properties"]
# ---------------------------------------------------------------------------
# Registry integration
# ---------------------------------------------------------------------------
class TestRegistryIntegration:
"""browser_click is registered with x/y params wired through."""
def test_dispatch_with_coordinates(self, monkeypatch, cdp_server):
from tools.registry import registry
cdp_server.on(
"Target.getTargets",
lambda p, s: {"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]},
)
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "s1"})
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
raw = registry.dispatch(
"browser_click", {"x": 42, "y": 84}, task_id="t1"
)
result = json.loads(raw)
assert result["success"] is True
assert result["clicked_at"] == {"x": 42, "y": 84}
def test_dispatch_with_ref(self, monkeypatch):
from tools import browser_tool
from tools.registry import registry
monkeypatch.setattr(browser_tool, "_is_camofox_mode", lambda: False)
monkeypatch.setattr(browser_tool, "_last_session_key", lambda tid: tid)
monkeypatch.setattr(
browser_tool, "_run_browser_command",
lambda tid, cmd, args=None, timeout=None: {"success": True},
)
raw = registry.dispatch("browser_click", {"ref": "@e3"}, task_id="t1")
result = json.loads(raw)
assert result["success"] is True
# ---------------------------------------------------------------------------
# Session caching
# ---------------------------------------------------------------------------
class TestSessionCaching:
"""Second click skips Target.getTargets + Target.attachToTarget."""
def test_second_click_skips_session_resolution(self, cdp_server, monkeypatch):
"""After first click the session_id is cached; second click goes straight
to mousePressed+mouseReleased without re-issuing getTargets/attachToTarget."""
from tools import browser_tool
import tools.browser_cdp_tool as cdp_mod
# clear cache
browser_tool._CDP_SESSION_CACHE.clear()
monkeypatch.setattr(cdp_mod, "_resolve_cdp_endpoint", lambda: cdp_server._url)
resolve_count = {"n": 0}
def _getTargets(p, s):
resolve_count["n"] += 1
return {"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]}
cdp_server.on("Target.getTargets", _getTargets)
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "sess-cached"})
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
# First click — must call getTargets
r1 = json.loads(browser_tool.browser_click(x=10.0, y=20.0))
assert r1["success"] is True
assert resolve_count["n"] == 1
# Second click — cache hit; getTargets must NOT be called again
r2 = json.loads(browser_tool.browser_click(x=30.0, y=40.0))
assert r2["success"] is True
assert resolve_count["n"] == 1, "session resolution was repeated despite warm cache"
def test_stale_session_triggers_reattach(self, cdp_server, monkeypatch):
"""If the browser returns 'Session with given id not found', the cache is
cleared and session resolution runs again before retrying the click."""
from tools import browser_tool
import tools.browser_cdp_tool as cdp_mod
browser_tool._CDP_SESSION_CACHE.clear()
monkeypatch.setattr(cdp_mod, "_resolve_cdp_endpoint", lambda: cdp_server._url)
call_count = {"mouse": 0, "resolve": 0}
def _getTargets(p, s):
call_count["resolve"] += 1
return {"targetInfos": [{"targetId": "px", "type": "page", "attached": True, "url": "..."}]}
def _dispatch(p, s):
call_count["mouse"] += 1
# First two mouse calls (with stale session) return an error;
# after re-resolve they should succeed
if call_count["mouse"] <= 2:
raise RuntimeError("Session with given id not found: stale-session-id")
return {}
cdp_server.on("Target.getTargets", _getTargets)
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": f"sess-{call_count['resolve']}"})
cdp_server.on("Input.dispatchMouseEvent", _dispatch)
# Seed cache with stale session to trigger the error path
browser_tool._CDP_SESSION_CACHE[cdp_server._url] = "stale-session-id"
r = json.loads(browser_tool.browser_click(x=50.0, y=60.0))
assert r["success"] is True
# Must have resolved the session once (after evicting stale entry)
assert call_count["resolve"] >= 1
def test_cache_cleared_on_endpoint_change(self, monkeypatch):
"""Cache is keyed per endpoint URL; different URL doesn't reuse cached session."""
from tools import browser_tool
browser_tool._CDP_SESSION_CACHE.clear()
browser_tool._CDP_SESSION_CACHE["ws://endpoint-a/"] = "sess-a"
# Endpoint B must not find endpoint A's session
assert browser_tool._CDP_SESSION_CACHE.get("ws://endpoint-b/") is None
# ---------------------------------------------------------------------------
# Supervisor path
# ---------------------------------------------------------------------------
class TestSupervisorPath:
"""When a CDPSupervisor is alive for the task_id, coordinate clicks use its
persistent WS connection — zero per-click connection setup cost."""
def test_supervisor_path_used_when_supervisor_alive(self, monkeypatch):
"""browser_click delegates to the supervisor when one is registered."""
from tools import browser_tool
clicks = []
class _FakeSupervisor:
def dispatch_mouse_click(self, x, y, button="left", timeout=10.0):
clicks.append((x, y, button))
class _FakeRegistry:
def get(self, task_id):
return _FakeSupervisor()
import tools.browser_supervisor as bs_mod
monkeypatch.setattr(bs_mod, "SUPERVISOR_REGISTRY", _FakeRegistry())
result = json.loads(browser_tool.browser_click(x=77.0, y=88.0, task_id="t1"))
assert result["success"] is True
assert result["method"] == "cdp_supervisor"
assert result["clicked_at"] == {"x": 77, "y": 88}
assert clicks == [(77, 88, "left")]
def test_supervisor_error_falls_through_to_per_click(self, monkeypatch, cdp_server):
"""If dispatch_mouse_click raises, the per-click WS path is used instead."""
from tools import browser_tool
import tools.browser_supervisor as bs_mod
import tools.browser_cdp_tool as cdp_mod
browser_tool._CDP_SESSION_CACHE.clear()
monkeypatch.setattr(cdp_mod, "_resolve_cdp_endpoint", lambda: cdp_server._url)
class _BrokenSupervisor:
def dispatch_mouse_click(self, x, y, button="left", timeout=10.0):
raise RuntimeError("supervisor WS disconnected")
class _BrokenRegistry:
def get(self, task_id):
return _BrokenSupervisor()
monkeypatch.setattr(bs_mod, "SUPERVISOR_REGISTRY", _BrokenRegistry())
cdp_server.on("Target.getTargets", lambda p, s: {
"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]
})
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "s1"})
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
result = json.loads(browser_tool.browser_click(x=10.0, y=20.0, task_id="t2"))
assert result["success"] is True
# Should have fallen through to per-click path (cdp_compositor, not cdp_supervisor)
assert result["method"] == "cdp_compositor"
def test_no_supervisor_uses_per_click_path(self, monkeypatch, cdp_server):
"""When SUPERVISOR_REGISTRY.get() returns None, the per-click WS path runs."""
from tools import browser_tool
import tools.browser_supervisor as bs_mod
import tools.browser_cdp_tool as cdp_mod
browser_tool._CDP_SESSION_CACHE.clear()
monkeypatch.setattr(cdp_mod, "_resolve_cdp_endpoint", lambda: cdp_server._url)
class _EmptyRegistry:
def get(self, task_id):
return None
monkeypatch.setattr(bs_mod, "SUPERVISOR_REGISTRY", _EmptyRegistry())
cdp_server.on("Target.getTargets", lambda p, s: {
"targetInfos": [{"targetId": "p1", "type": "page", "attached": True, "url": "..."}]
})
cdp_server.on("Target.attachToTarget", lambda p, s: {"sessionId": "s1"})
cdp_server.on("Input.dispatchMouseEvent", lambda p, s: {})
result = json.loads(browser_tool.browser_click(x=5.0, y=6.0, task_id="t3"))
assert result["success"] is True
assert result["method"] == "cdp_compositor"

View File

@@ -457,7 +457,57 @@ class CDPSupervisor:
return {"ok": False, "error": f"{type(e).__name__}: {e}"}
return {"ok": True, "dialog": snapshot_copy.to_dict()}
# ── Supervisor loop internals ────────────────────────────────────────────
def dispatch_mouse_click(
self,
x: int,
y: int,
button: str = "left",
timeout: float = 10.0,
) -> None:
"""Dispatch a compositor-level click over the supervisor's live WS.
Uses the supervisor's already-connected WebSocket — zero connection
setup cost vs opening a fresh WS per click. mousePressed and
mouseReleased are both sent before awaiting either response
(pipelined), following the Playwright Promise.all pattern.
Raises RuntimeError if the supervisor is inactive or the click fails.
"""
loop = self._loop
if loop is None or not loop.is_running():
raise RuntimeError("supervisor loop is not running")
with self._state_lock:
if not self._active:
raise RuntimeError("supervisor is not active")
session_id = self._page_session_id
async def _do_click() -> None:
mouse_params = {"x": x, "y": y, "button": button, "clickCount": 1}
# Pipeline both events — send without awaiting press ack.
# Browser processes CDP messages in order; if mouseReleased is
# acked, mousePressed has already been applied.
press_fut = asyncio.create_task(
self._cdp("Input.dispatchMouseEvent",
{**mouse_params, "type": "mousePressed"},
session_id=session_id, timeout=timeout)
)
release_fut = asyncio.create_task(
self._cdp("Input.dispatchMouseEvent",
{**mouse_params, "type": "mouseReleased"},
session_id=session_id, timeout=timeout)
)
await asyncio.gather(press_fut, release_fut)
try:
fut = asyncio.run_coroutine_threadsafe(_do_click(), loop)
fut.result(timeout=timeout + 1)
except Exception as exc:
raise RuntimeError(
f"supervisor mouse click failed: {type(exc).__name__}: {exc}"
) from exc
def _thread_main(self) -> None:
"""Entry point for the supervisor's dedicated thread."""

View File

@@ -1317,16 +1317,23 @@ BROWSER_TOOL_SCHEMAS = [
},
{
"name": "browser_click",
"description": "Click on an element identified by its ref ID from the snapshot (e.g., '@e5'). The ref IDs are shown in square brackets in the snapshot output. Requires browser_navigate and browser_snapshot to be called first.",
"description": "Click on an element identified by its ref ID from the snapshot (e.g., '@e5'). The ref IDs are shown in square brackets in the snapshot output. Requires browser_navigate and browser_snapshot to be called first.\n\nAlternatively, click at exact viewport coordinates (x, y) using compositor-level input. This bypasses DOM selectors entirely — clicks pass through iframes, shadow DOM, cross-origin boundaries, and canvas elements. Use browser_vision with annotate=true to find coordinates, or browser_console to evaluate getBoundingClientRect(). Provide EITHER ref OR (x + y), not both.",
"parameters": {
"type": "object",
"properties": {
"ref": {
"type": "string",
"description": "The element reference from the snapshot (e.g., '@e5', '@e12')"
},
"x": {
"type": "number",
"description": "Viewport X coordinate for compositor-level click. Use with y instead of ref to click through iframes, shadow DOM, or canvas elements."
},
"y": {
"type": "number",
"description": "Viewport Y coordinate for compositor-level click. Use with x instead of ref."
}
},
"required": ["ref"]
}
}
},
{
@@ -2286,17 +2293,350 @@ def browser_snapshot(
return json.dumps(_copy_fallback_warning(response, result), ensure_ascii=False)
def browser_click(ref: str, task_id: Optional[str] = None) -> str:
# ---------------------------------------------------------------------------
# Session cache for CDP coordinate clicks
#
# Target.getTargets + Target.attachToTarget cost one round-trip each and
# their results (page targetId + session_id) are stable across clicks on
# the same page. We cache them keyed by CDP endpoint URL and invalidate
# automatically when the browser reports a stale session error.
#
# Pattern: browser-harness daemon keeps session_id on the daemon object and
# retries once on "Session with given id not found" to self-heal after
# navigation or crash. We replicate that here without a persistent daemon
# process by storing it in a module-level dict.
# ---------------------------------------------------------------------------
_CDP_SESSION_CACHE: dict[str, str] = {} # ws_url → cached session_id
async def _cdp_resolve_session(
ws: Any,
ws_url: str,
deadline: float,
msg_id_ref: list,
) -> Optional[str]:
"""Resolve (and cache) the page-scoped CDP session ID.
Sends Target.getTargets + Target.attachToTarget on *ws* and caches the
resulting session_id for future clicks. Returns None if no page target
is found (Input.dispatchMouseEvent will be sent at browser level, which
works for simple cases).
"""
Click on an element.
import asyncio as _asyncio
async def _send(method: str, params: dict, sid: Optional[str] = None) -> int:
msg_id_ref[0] += 1
call_id = msg_id_ref[0]
req: dict = {"id": call_id, "method": method, "params": params}
if sid:
req["sessionId"] = sid
await ws.send(json.dumps(req))
return call_id
async def _recv_until(call_id: int) -> dict:
while True:
remaining = deadline - _asyncio.get_running_loop().time()
if remaining <= 0:
raise TimeoutError(f"CDP timed out waiting for id={call_id}")
raw = await _asyncio.wait_for(ws.recv(), timeout=remaining)
msg = json.loads(raw)
if msg.get("id") == call_id:
if "error" in msg:
raise RuntimeError(f"CDP error: {msg['error']}")
return msg.get("result", {})
gt_id = await _send("Target.getTargets", {})
gt_result = await _recv_until(gt_id)
page_target_id: Optional[str] = None
for t in gt_result.get("targetInfos", []):
if t.get("type") == "page" and t.get("attached", True):
page_target_id = t["targetId"]
break
if not page_target_id:
return None
at_id = await _send("Target.attachToTarget",
{"targetId": page_target_id, "flatten": True})
at_result = await _recv_until(at_id)
session_id = at_result.get("sessionId") or None
if session_id:
_CDP_SESSION_CACHE[ws_url] = session_id
return session_id
async def _cdp_coordinate_click_async(
ws_url: str,
x: int,
y: int,
button: str,
timeout: float,
) -> None:
"""Perform a compositor-level click on a single persistent WS connection.
Optimizations vs the naïve 3-separate-connections approach:
1. **Single connection** — one TCP+WS handshake for the entire click.
All CDP messages are sent on the same socket.
2. **Session ID caching** — Target.getTargets + Target.attachToTarget are
only paid once per CDP endpoint. Subsequent clicks skip straight to
the two mouse events. Cache is invalidated automatically on stale-
session errors and re-resolved once (browser-harness self-heal pattern).
3. **mouseReleased-only wait** — mousePressed and mouseReleased are both
fired before awaiting either response. Because the browser processes
CDP messages sequentially within a session, if mouseReleased is
acknowledged then mousePressed has already been processed. We only
wait for the release ack (Playwright / Puppeteer pattern), saving one
RTT on the common path.
"""
import asyncio as _asyncio
from tools.browser_cdp_tool import websockets as _ws
async with _ws.connect(
ws_url,
max_size=None,
open_timeout=timeout,
close_timeout=5,
ping_interval=None,
compression=None, # small CDP messages don't benefit from compression
) as ws:
deadline = _asyncio.get_running_loop().time() + timeout
msg_id_ref = [0] # mutable so nested helpers can increment
def _next_id() -> int:
msg_id_ref[0] += 1
return msg_id_ref[0]
async def _send_mouse(event_type: str, sid: Optional[str]) -> int:
call_id = _next_id()
req: dict = {
"id": call_id,
"method": "Input.dispatchMouseEvent",
"params": {"type": event_type, "x": x, "y": y,
"button": button, "clickCount": 1},
}
if sid:
req["sessionId"] = sid
await ws.send(json.dumps(req))
return call_id
async def _recv_until(call_id: int) -> dict:
while True:
remaining = deadline - _asyncio.get_running_loop().time()
if remaining <= 0:
raise TimeoutError(f"CDP timed out waiting for id={call_id}")
raw = await _asyncio.wait_for(ws.recv(), timeout=remaining)
msg = json.loads(raw)
if msg.get("id") == call_id:
if "error" in msg:
raise RuntimeError(f"CDP error: {msg['error']}")
return msg.get("result", {})
# --- resolve session (cached after first click) ---
session_id: Optional[str] = _CDP_SESSION_CACHE.get(ws_url)
if not session_id:
session_id = await _cdp_resolve_session(ws, ws_url, deadline, msg_id_ref)
# --- fire mousePressed + mouseReleased without awaiting press ack ---
# Both messages are sent before we await either response. The browser
# processes them in order, so waiting only for mouseReleased is enough.
_press_id = await _send_mouse("mousePressed", session_id)
release_id = await _send_mouse("mouseReleased", session_id)
try:
await _recv_until(release_id)
except RuntimeError as exc:
# Stale session (e.g. after navigation) — invalidate cache, retry once
if "Session with given id not found" in str(exc) and session_id:
_CDP_SESSION_CACHE.pop(ws_url, None)
session_id = await _cdp_resolve_session(ws, ws_url, deadline, msg_id_ref)
_press_id = await _send_mouse("mousePressed", session_id)
release_id = await _send_mouse("mouseReleased", session_id)
await _recv_until(release_id)
else:
raise
def _cdp_coordinate_click(
x: float,
y: float,
task_id: str,
button: str = "left",
) -> str:
"""Compositor-level click at viewport coordinates via CDP Input.dispatchMouseEvent.
Dispatch priority (fastest first):
1. **Supervisor path** — if a CDPSupervisor is alive for this task_id, reuse
its already-connected WebSocket. Zero connection setup cost; the supervisor
thread owns a persistent WS that self-heals on navigation/crash.
2. **Per-click connect path** — open a single WS, resolve session (cached),
pipeline mousePressed + mouseReleased, close.
3. **agent-browser fallback** — when no CDP endpoint is configured at all.
"""
ix, iy = int(round(x)), int(round(y))
# --- path 1: reuse supervisor's live WS (zero connection overhead) ------
try:
from tools.browser_supervisor import SUPERVISOR_REGISTRY # type: ignore[import-not-found]
supervisor = SUPERVISOR_REGISTRY.get(task_id)
if supervisor is not None:
try:
supervisor.dispatch_mouse_click(ix, iy, button)
return json.dumps({
"success": True,
"clicked_at": {"x": ix, "y": iy},
"method": "cdp_supervisor",
}, ensure_ascii=False)
except Exception as _exc:
# Supervisor present but errored (WS disconnect, stale session, etc.)
# — fall through to per-click WS path.
logger.debug("supervisor coordinate click failed for task=%s, falling back: %s", task_id, _exc)
except ImportError:
pass
# --- path 2: per-click WS connect (with session cache) ------------------
try:
from tools.browser_cdp_tool import _run_async, _resolve_cdp_endpoint, _WS_AVAILABLE
except ImportError:
return json.dumps({
"success": False,
"error": "browser_cdp_tool not available — coordinate clicks require the CDP tool module.",
}, ensure_ascii=False)
if not _WS_AVAILABLE:
return json.dumps({
"success": False,
"error": "The 'websockets' package is required for coordinate clicks. Install with: pip install websockets",
}, ensure_ascii=False)
endpoint = _resolve_cdp_endpoint()
if not endpoint:
return _coordinate_click_via_agent_browser(x, y, task_id, button)
if not endpoint.startswith(("ws://", "wss://")):
return _coordinate_click_via_agent_browser(x, y, task_id, button)
try:
_run_async(_cdp_coordinate_click_async(endpoint, ix, iy, button, 10.0))
except Exception as exc:
return json.dumps({
"success": False,
"error": f"CDP coordinate click failed: {type(exc).__name__}: {exc}",
}, ensure_ascii=False)
return json.dumps({
"success": True,
"clicked_at": {"x": ix, "y": iy},
"method": "cdp_compositor",
}, ensure_ascii=False)
def _coordinate_click_via_agent_browser(
x: float,
y: float,
task_id: str,
button: str = "left",
) -> str:
"""Fallback: coordinate click via agent-browser mouse subcommands."""
effective_task_id = _last_session_key(task_id)
ix, iy = int(round(x)), int(round(y))
# agent-browser mouse move <x> <y> + mouse down + mouse up
move_result = _run_browser_command(effective_task_id, "mouse", ["move", str(ix), str(iy)])
if not move_result.get("success"):
return json.dumps({
"success": False,
"error": f"mouse move failed: {move_result.get('error', 'unknown')}",
}, ensure_ascii=False)
btn_arg = [] if button == "left" else [button]
down_result = _run_browser_command(effective_task_id, "mouse", ["down"] + btn_arg)
if not down_result.get("success"):
return json.dumps({
"success": False,
"error": f"mouse down failed: {down_result.get('error', 'unknown')}",
}, ensure_ascii=False)
up_result = _run_browser_command(effective_task_id, "mouse", ["up"] + btn_arg)
if not up_result.get("success"):
return json.dumps({
"success": False,
"error": f"mouse up failed: {up_result.get('error', 'unknown')}",
}, ensure_ascii=False)
return json.dumps({
"success": True,
"clicked_at": {"x": ix, "y": iy},
"method": "agent_browser_mouse",
}, ensure_ascii=False)
def browser_click(
ref: Optional[str] = None,
x: Optional[float] = None,
y: Optional[float] = None,
task_id: Optional[str] = None,
) -> str:
"""
Click on an element by ref ID, or at exact viewport coordinates.
Provide EITHER ``ref`` (selector-based, via agent-browser) OR ``x`` + ``y``
(compositor-level, via CDP Input.dispatchMouseEvent). Coordinate clicks
bypass DOM selectors entirely — they pass through iframes, shadow DOM,
cross-origin boundaries, and canvas elements.
Args:
ref: Element reference (e.g., "@e5")
x: Viewport X coordinate for compositor-level click
y: Viewport Y coordinate for compositor-level click
task_id: Task identifier for session isolation
Returns:
JSON string with click result
"""
# --- Input validation ---------------------------------------------------
has_ref = ref is not None and str(ref).strip() != ""
has_coords = x is not None and y is not None
if has_ref and has_coords:
return json.dumps({
"success": False,
"error": "Provide either 'ref' or 'x'+'y', not both.",
}, ensure_ascii=False)
if (x is not None) != (y is not None):
return json.dumps({
"success": False,
"error": "Both 'x' and 'y' are required for coordinate clicks.",
}, ensure_ascii=False)
if not has_ref and not has_coords:
return json.dumps({
"success": False,
"error": "Provide either 'ref' (element reference) or 'x'+'y' (viewport coordinates).",
}, ensure_ascii=False)
# --- Coordinate-based click (compositor-level) --------------------------
if has_coords:
try:
fx, fy = float(x), float(y) # type: ignore[arg-type]
except (TypeError, ValueError):
return json.dumps({
"success": False,
"error": f"x and y must be numbers, got x={x!r} y={y!r}",
}, ensure_ascii=False)
return _cdp_coordinate_click(fx, fy, task_id or "default")
# --- Ref-based click (existing path) ------------------------------------
if not has_ref or ref is None:
# Defensive guard — validation above should ensure we never reach here
return json.dumps({
"success": False,
"error": "Internal error: expected ref parameter.",
}, ensure_ascii=False)
if _is_camofox_mode():
from tools.browser_camofox import camofox_click
return camofox_click(ref, task_id)
@@ -3413,7 +3753,7 @@ registry.register(
name="browser_click",
toolset="browser",
schema=_BROWSER_SCHEMA_MAP["browser_click"],
handler=lambda args, **kw: browser_click(ref=args.get("ref", ""), task_id=kw.get("task_id")),
handler=lambda args, **kw: browser_click(ref=args.get("ref"), x=args.get("x"), y=args.get("y"), task_id=kw.get("task_id")),
check_fn=check_browser_requirements,
emoji="👆",
)