diff --git a/cli.py b/cli.py index 1e687f7b5d5..011045092e6 100644 --- a/cli.py +++ b/cli.py @@ -8887,6 +8887,28 @@ class HermesCLI: f"Output:\n{_out}]" ) self._pending_input.put(_synth) + # Check for watch pattern match notifications + if not process_registry.watch_queue.empty(): + watch_evt = process_registry.watch_queue.get_nowait() + _wsid = watch_evt.get("session_id", "unknown") + _wcmd = watch_evt.get("command", "unknown") + _wtype = watch_evt.get("type", "watch_match") + if _wtype == "watch_disabled": + _wsynth = f"[SYSTEM: {watch_evt.get('message', '')}]" + else: + _wpat = watch_evt.get("pattern", "?") + _wout = watch_evt.get("output", "") + _wsup = watch_evt.get("suppressed", 0) + _wsynth = ( + f"[SYSTEM: Background process {_wsid} matched " + f"watch pattern \"{_wpat}\".\n" + f"Command: {_wcmd}\n" + f"Matched output:\n{_wout}" + ) + if _wsup: + _wsynth += f"\n({_wsup} earlier matches were suppressed by rate limit)" + _wsynth += "]" + self._pending_input.put(_wsynth) except Exception: pass continue @@ -9004,10 +9026,9 @@ class HermesCLI: _cprint(f"{_DIM}Voice auto-restart failed: {e}{_RST}") threading.Thread(target=_restart_recording, daemon=True).start() - # Drain process completion notifications — any background - # process that finished with notify_on_complete while the - # agent was running (or before) gets auto-injected as a - # new user message so the agent can react to it. + # Drain process completion and watch notifications — any + # background process events that arrived while the agent + # was running get auto-injected as a new user message. try: from tools.process_registry import process_registry while not process_registry.completion_queue.empty(): @@ -9023,6 +9044,27 @@ class HermesCLI: f"Output:\n{_out}]" ) self._pending_input.put(_synth) + while not process_registry.watch_queue.empty(): + watch_evt = process_registry.watch_queue.get_nowait() + _wsid = watch_evt.get("session_id", "unknown") + _wcmd = watch_evt.get("command", "unknown") + _wtype = watch_evt.get("type", "watch_match") + if _wtype == "watch_disabled": + _wsynth = f"[SYSTEM: {watch_evt.get('message', '')}]" + else: + _wpat = watch_evt.get("pattern", "?") + _wout = watch_evt.get("output", "") + _wsup = watch_evt.get("suppressed", 0) + _wsynth = ( + f"[SYSTEM: Background process {_wsid} matched " + f"watch pattern \"{_wpat}\".\n" + f"Command: {_wcmd}\n" + f"Matched output:\n{_wout}" + ) + if _wsup: + _wsynth += f"\n({_wsup} earlier matches were suppressed by rate limit)" + _wsynth += "]" + self._pending_input.put(_wsynth) except Exception: pass # Non-fatal — don't break the main loop diff --git a/gateway/run.py b/gateway/run.py index 00156f12692..e154ef36476 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3430,6 +3430,37 @@ class GatewayRunner: except Exception as e: logger.error("Process watcher setup error: %s", e) + # Drain watch pattern notifications that arrived during the agent run + try: + from tools.process_registry import process_registry as _pr + while not _pr.watch_queue.empty(): + watch_evt = _pr.watch_queue.get_nowait() + _wsid = watch_evt.get("session_id", "unknown") + _wcmd = watch_evt.get("command", "unknown") + _wtype = watch_evt.get("type", "watch_match") + if _wtype == "watch_disabled": + synth_text = f"[SYSTEM: {watch_evt.get('message', '')}]" + else: + _wpat = watch_evt.get("pattern", "?") + _wout = watch_evt.get("output", "") + _wsup = watch_evt.get("suppressed", 0) + synth_text = ( + f"[SYSTEM: Background process {_wsid} matched " + f"watch pattern \"{_wpat}\".\n" + f"Command: {_wcmd}\n" + f"Matched output:\n{_wout}" + ) + if _wsup: + synth_text += f"\n({_wsup} earlier matches were suppressed by rate limit)" + synth_text += "]" + # Inject as synthetic message to trigger a new agent turn + try: + await self._inject_watch_notification(synth_text, event) + except Exception as e2: + logger.error("Watch notification injection error: %s", e2) + except Exception as e: + logger.debug("Watch queue drain error: %s", e) + # NOTE: Dangerous command approvals are now handled inline by the # blocking gateway approval mechanism in tools/approval.py. The agent # thread blocks until the user responds with /approve or /deny, so by @@ -6708,6 +6739,36 @@ class GatewayRunner: return prefix return user_text + async def _inject_watch_notification(self, synth_text: str, original_event) -> None: + """Inject a watch-pattern notification as a synthetic message event. + + Uses the source from the original user event to route the notification + back to the correct chat/adapter. + """ + source = getattr(original_event, "source", None) + if not source: + return + platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform) + adapter = None + for p, a in self.adapters.items(): + if p.value == platform_name: + adapter = a + break + if not adapter: + return + try: + from gateway.platforms.base import MessageEvent, MessageType + synth_event = MessageEvent( + text=synth_text, + message_type=MessageType.TEXT, + source=source, + internal=True, + ) + logger.info("Watch pattern notification — injecting for %s", platform_name) + await adapter.handle_message(synth_event) + except Exception as e: + logger.error("Watch notification injection error: %s", e) + async def _run_process_watcher(self, watcher: dict) -> None: """ Periodically check a background process and push updates to the user. diff --git a/tests/tools/test_watch_patterns.py b/tests/tools/test_watch_patterns.py new file mode 100644 index 00000000000..510d5d55b10 --- /dev/null +++ b/tests/tools/test_watch_patterns.py @@ -0,0 +1,304 @@ +"""Tests for watch_patterns background process monitoring feature. + +Covers: + - ProcessSession.watch_patterns field + - ProcessRegistry._check_watch_patterns() matching + notification + - Rate limiting (WATCH_MAX_PER_WINDOW) and overload kill switch + - watch_queue population + - Checkpoint persistence of watch_patterns + - Terminal tool schema includes watch_patterns + - Terminal tool handler passes watch_patterns through +""" + +import json +import queue +import time +import pytest +from unittest.mock import patch + +from tools.process_registry import ( + ProcessRegistry, + ProcessSession, + WATCH_MAX_PER_WINDOW, + WATCH_WINDOW_SECONDS, + WATCH_OVERLOAD_KILL_SECONDS, +) + + +@pytest.fixture() +def registry(): + """Create a fresh ProcessRegistry.""" + return ProcessRegistry() + + +def _make_session( + sid="proc_test_watch", + command="tail -f app.log", + task_id="t1", + watch_patterns=None, +) -> ProcessSession: + s = ProcessSession( + id=sid, + command=command, + task_id=task_id, + started_at=time.time(), + watch_patterns=watch_patterns or [], + ) + return s + + +# ========================================================================= +# ProcessSession field defaults +# ========================================================================= + +class TestProcessSessionField: + def test_default_empty(self): + s = ProcessSession(id="proc_1", command="echo hi") + assert s.watch_patterns == [] + assert s._watch_disabled is False + assert s._watch_hits == 0 + assert s._watch_suppressed == 0 + + def test_can_set_patterns(self): + s = _make_session(watch_patterns=["ERROR", "WARN"]) + assert s.watch_patterns == ["ERROR", "WARN"] + + +# ========================================================================= +# Pattern matching + queue population +# ========================================================================= + +class TestCheckWatchPatterns: + def test_no_patterns_no_notification(self, registry): + """No watch_patterns → no notifications.""" + session = _make_session(watch_patterns=[]) + registry._check_watch_patterns(session, "ERROR: something broke\n") + assert registry.watch_queue.empty() + + def test_no_match_no_notification(self, registry): + """Output that doesn't match any pattern → no notification.""" + session = _make_session(watch_patterns=["ERROR", "FAIL"]) + registry._check_watch_patterns(session, "INFO: all good\nDEBUG: fine\n") + assert registry.watch_queue.empty() + + def test_basic_match(self, registry): + """Single matching line triggers a notification.""" + session = _make_session(watch_patterns=["ERROR"]) + registry._check_watch_patterns(session, "INFO: ok\nERROR: disk full\n") + assert not registry.watch_queue.empty() + evt = registry.watch_queue.get_nowait() + assert evt["type"] == "watch_match" + assert evt["pattern"] == "ERROR" + assert "disk full" in evt["output"] + assert evt["session_id"] == "proc_test_watch" + + def test_multiple_patterns(self, registry): + """First matching pattern is reported.""" + session = _make_session(watch_patterns=["WARN", "ERROR"]) + registry._check_watch_patterns(session, "ERROR: bad\nWARN: hmm\n") + evt = registry.watch_queue.get_nowait() + # ERROR appears first in the output, and we check patterns in order + # so "WARN" won't match "ERROR: bad" but "ERROR" will + assert evt["pattern"] == "ERROR" + assert "bad" in evt["output"] + + def test_disabled_skips(self, registry): + """Disabled watch produces no notifications.""" + session = _make_session(watch_patterns=["ERROR"]) + session._watch_disabled = True + registry._check_watch_patterns(session, "ERROR: boom\n") + assert registry.watch_queue.empty() + + def test_hit_counter_increments(self, registry): + """Each delivered notification increments _watch_hits.""" + session = _make_session(watch_patterns=["X"]) + registry._check_watch_patterns(session, "X\n") + assert session._watch_hits == 1 + registry._check_watch_patterns(session, "X\n") + assert session._watch_hits == 2 + + def test_output_truncation(self, registry): + """Very long matched output is truncated.""" + session = _make_session(watch_patterns=["X"]) + # Generate 30 matching lines (more than the 20-line cap) + text = "\n".join(f"X line {i}" for i in range(30)) + "\n" + registry._check_watch_patterns(session, text) + evt = registry.watch_queue.get_nowait() + # Should only have 20 lines max + assert evt["output"].count("\n") <= 20 + + +# ========================================================================= +# Rate limiting +# ========================================================================= + +class TestRateLimiting: + def test_within_window_limit(self, registry): + """Notifications within the rate limit all get delivered.""" + session = _make_session(watch_patterns=["E"]) + for i in range(WATCH_MAX_PER_WINDOW): + registry._check_watch_patterns(session, f"E {i}\n") + assert registry.watch_queue.qsize() == WATCH_MAX_PER_WINDOW + + def test_exceeds_window_limit(self, registry): + """Notifications beyond the rate limit are suppressed.""" + session = _make_session(watch_patterns=["E"]) + for i in range(WATCH_MAX_PER_WINDOW + 5): + registry._check_watch_patterns(session, f"E {i}\n") + # Only WATCH_MAX_PER_WINDOW should be in the queue + assert registry.watch_queue.qsize() == WATCH_MAX_PER_WINDOW + assert session._watch_suppressed == 5 + + def test_window_resets(self, registry): + """After the window expires, notifications can flow again.""" + session = _make_session(watch_patterns=["E"]) + # Fill the window + for i in range(WATCH_MAX_PER_WINDOW): + registry._check_watch_patterns(session, f"E {i}\n") + # One more should be suppressed + registry._check_watch_patterns(session, "E extra\n") + assert session._watch_suppressed == 1 + + # Fast-forward past window + session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1 + registry._check_watch_patterns(session, "E after reset\n") + # Should deliver now (window reset) + assert registry.watch_queue.qsize() == WATCH_MAX_PER_WINDOW + 1 + + def test_suppressed_count_in_next_delivery(self, registry): + """Suppressed count is reported in the next successful delivery.""" + session = _make_session(watch_patterns=["E"]) + for i in range(WATCH_MAX_PER_WINDOW): + registry._check_watch_patterns(session, f"E {i}\n") + # Suppress 3 more + for i in range(3): + registry._check_watch_patterns(session, f"E suppressed {i}\n") + assert session._watch_suppressed == 3 + + # Fast-forward past window to allow delivery + session._watch_window_start = time.time() - WATCH_WINDOW_SECONDS - 1 + registry._check_watch_patterns(session, "E back\n") + # Drain to the last event + last_evt = None + while not registry.watch_queue.empty(): + last_evt = registry.watch_queue.get_nowait() + assert last_evt["suppressed"] == 3 + assert session._watch_suppressed == 0 # reset after delivery + + +# ========================================================================= +# Overload kill switch +# ========================================================================= + +class TestOverloadKillSwitch: + def test_sustained_overload_disables(self, registry): + """Sustained overload beyond threshold permanently disables watching.""" + session = _make_session(watch_patterns=["E"]) + # Fill the window to trigger rate limit + for i in range(WATCH_MAX_PER_WINDOW): + registry._check_watch_patterns(session, f"E {i}\n") + + # Simulate sustained overload: set overload_since to past threshold + session._watch_overload_since = time.time() - WATCH_OVERLOAD_KILL_SECONDS - 1 + # Force another suppressed hit + registry._check_watch_patterns(session, "E overload\n") + registry._check_watch_patterns(session, "E overload2\n") + + assert session._watch_disabled is True + # Should have a watch_disabled event in the queue + disabled_evts = [] + while not registry.watch_queue.empty(): + evt = registry.watch_queue.get_nowait() + if evt.get("type") == "watch_disabled": + disabled_evts.append(evt) + assert len(disabled_evts) == 1 + assert "too many matches" in disabled_evts[0]["message"] + + def test_overload_resets_on_delivery(self, registry): + """Overload timer resets when a notification gets through.""" + session = _make_session(watch_patterns=["E"]) + # Start overload tracking + session._watch_overload_since = time.time() - 10 + # But window allows delivery → overload should reset + registry._check_watch_patterns(session, "E ok\n") + assert session._watch_overload_since == 0.0 + assert session._watch_disabled is False + + +# ========================================================================= +# Checkpoint persistence +# ========================================================================= + +class TestCheckpointPersistence: + def test_watch_patterns_in_checkpoint(self, registry): + """watch_patterns is included in checkpoint data.""" + session = _make_session(watch_patterns=["ERROR", "FAIL"]) + with registry._lock: + registry._running[session.id] = session + + with patch("utils.atomic_json_write") as mock_write: + registry._write_checkpoint() + args = mock_write.call_args + entries = args[0][1] # second positional arg + assert len(entries) == 1 + assert entries[0]["watch_patterns"] == ["ERROR", "FAIL"] + + def test_watch_patterns_recovery(self, registry, tmp_path, monkeypatch): + """watch_patterns survives checkpoint recovery.""" + import tools.process_registry as pr_mod + checkpoint = tmp_path / "processes.json" + checkpoint.write_text(json.dumps([{ + "session_id": "proc_recovered", + "command": "tail -f log", + "pid": 99999999, # non-existent + "pid_scope": "host", + "started_at": time.time(), + "task_id": "", + "session_key": "", + "watcher_platform": "", + "watcher_chat_id": "", + "watcher_thread_id": "", + "watcher_interval": 0, + "notify_on_complete": False, + "watch_patterns": ["PANIC", "OOM"], + }])) + monkeypatch.setattr(pr_mod, "CHECKPOINT_PATH", checkpoint) + # PID doesn't exist, so nothing will be recovered + count = registry.recover_from_checkpoint() + # Won't recover since PID is fake, but verify the code path doesn't crash + assert count == 0 + + +# ========================================================================= +# Terminal tool schema + handler +# ========================================================================= + +class TestTerminalToolSchema: + def test_schema_includes_watch_patterns(self): + from tools.terminal_tool import TERMINAL_SCHEMA + props = TERMINAL_SCHEMA["parameters"]["properties"] + assert "watch_patterns" in props + assert props["watch_patterns"]["type"] == "array" + assert props["watch_patterns"]["items"] == {"type": "string"} + + def test_handler_passes_watch_patterns(self): + """_handle_terminal passes watch_patterns to terminal_tool.""" + from tools.terminal_tool import _handle_terminal + with patch("tools.terminal_tool.terminal_tool") as mock_tt: + mock_tt.return_value = json.dumps({"output": "ok", "exit_code": 0}) + _handle_terminal( + {"command": "echo hi", "watch_patterns": ["ERR"]}, + task_id="t1", + ) + _, kwargs = mock_tt.call_args + assert kwargs.get("watch_patterns") == ["ERR"] + + +# ========================================================================= +# Code execution tool blocked params +# ========================================================================= + +class TestCodeExecutionBlocked: + def test_watch_patterns_blocked(self): + from tools.code_execution_tool import _TERMINAL_BLOCKED_PARAMS + assert "watch_patterns" in _TERMINAL_BLOCKED_PARAMS diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index 93863efe994..7837d70d6c2 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -301,7 +301,7 @@ def _call(tool_name, args): # --------------------------------------------------------------------------- # Terminal parameters that must not be used from ephemeral sandbox scripts -_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete"} +_TERMINAL_BLOCKED_PARAMS = {"background", "check_interval", "pty", "notify_on_complete", "watch_patterns"} def _rpc_server_loop( diff --git a/tools/process_registry.py b/tools/process_registry.py index fb656d0f3a3..6ba3340a358 100644 --- a/tools/process_registry.py +++ b/tools/process_registry.py @@ -58,6 +58,11 @@ MAX_OUTPUT_CHARS = 200_000 # 200KB rolling output buffer FINISHED_TTL_SECONDS = 1800 # Keep finished processes for 30 minutes MAX_PROCESSES = 64 # Max concurrent tracked processes (LRU pruning) +# Watch pattern rate limiting +WATCH_MAX_PER_WINDOW = 8 # Max notifications delivered per window +WATCH_WINDOW_SECONDS = 10 # Rolling window length +WATCH_OVERLOAD_KILL_SECONDS = 45 # Sustained overload duration before disabling watch + @dataclass class ProcessSession: @@ -83,6 +88,14 @@ class ProcessSession: watcher_thread_id: str = "" watcher_interval: int = 0 # 0 = no watcher configured notify_on_complete: bool = False # Queue agent notification on exit + # Watch patterns — trigger agent notification when output matches any pattern + watch_patterns: List[str] = field(default_factory=list) + _watch_hits: int = field(default=0, repr=False) # total matches delivered + _watch_suppressed: int = field(default=0, repr=False) # matches dropped by rate limit + _watch_overload_since: float = field(default=0.0, repr=False) # when sustained overload began + _watch_disabled: bool = field(default=False, repr=False) # permanently killed by overload + _watch_window_hits: int = field(default=0, repr=False) # hits in current rate window + _watch_window_start: float = field(default=0.0, repr=False) _lock: threading.Lock = field(default_factory=threading.Lock) _reader_thread: Optional[threading.Thread] = field(default=None, repr=False) _pty: Any = field(default=None, repr=False) # ptyprocess handle (when use_pty=True) @@ -120,6 +133,10 @@ class ProcessRegistry: import queue as _queue_mod self.completion_queue: _queue_mod.Queue = _queue_mod.Queue() + # Watch pattern notifications — processes with watch_patterns push here + # when output matches a pattern. Same consumption model as completion_queue. + self.watch_queue: _queue_mod.Queue = _queue_mod.Queue() + @staticmethod def _clean_shell_noise(text: str) -> str: """Strip shell startup warnings from the beginning of output.""" @@ -128,6 +145,84 @@ class ProcessRegistry: lines.pop(0) return "\n".join(lines) + def _check_watch_patterns(self, session: ProcessSession, new_text: str) -> None: + """Scan new output for watch patterns and queue notifications. + + Called from reader threads with new_text being the freshly-read chunk. + Rate-limited: max WATCH_MAX_PER_WINDOW notifications per WATCH_WINDOW_SECONDS. + If sustained overload exceeds WATCH_OVERLOAD_KILL_SECONDS, watching is + disabled permanently for this process. + """ + if not session.watch_patterns or session._watch_disabled: + return + + # Scan new text line-by-line for pattern matches + matched_lines = [] + matched_pattern = None + for line in new_text.splitlines(): + for pat in session.watch_patterns: + if pat in line: + matched_lines.append(line.rstrip()) + if matched_pattern is None: + matched_pattern = pat + break # one match per line is enough + + if not matched_lines: + return + + now = time.time() + with session._lock: + # Reset window if it's expired + if now - session._watch_window_start >= WATCH_WINDOW_SECONDS: + session._watch_window_hits = 0 + session._watch_window_start = now + + # Check rate limit + if session._watch_window_hits >= WATCH_MAX_PER_WINDOW: + session._watch_suppressed += len(matched_lines) + + # Track sustained overload for kill switch + if session._watch_overload_since == 0.0: + session._watch_overload_since = now + elif now - session._watch_overload_since > WATCH_OVERLOAD_KILL_SECONDS: + session._watch_disabled = True + self.watch_queue.put({ + "session_id": session.id, + "command": session.command, + "type": "watch_disabled", + "suppressed": session._watch_suppressed, + "message": ( + f"Watch patterns disabled for process {session.id} — " + f"too many matches ({session._watch_suppressed} suppressed). " + f"Use process(action='poll') to check output manually." + ), + }) + return + + # Under the rate limit — deliver notification + session._watch_window_hits += 1 + session._watch_hits += 1 + # Clear overload tracker since we got a delivery through + session._watch_overload_since = 0.0 + + # Include suppressed count if any events were dropped + suppressed = session._watch_suppressed + session._watch_suppressed = 0 + + # Trim matched output to a reasonable size + output = "\n".join(matched_lines[:20]) + if len(output) > 2000: + output = output[:2000] + "\n...(truncated)" + + self.watch_queue.put({ + "session_id": session.id, + "command": session.command, + "type": "watch_match", + "pattern": matched_pattern, + "output": output, + "suppressed": suppressed, + }) + @staticmethod def _is_host_pid_alive(pid: Optional[int]) -> bool: """Best-effort liveness check for host-visible PIDs.""" @@ -394,6 +489,7 @@ class ProcessRegistry: session.output_buffer += chunk if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] + self._check_watch_patterns(session, chunk) except Exception as e: logger.debug("Process stdout reader ended: %s", e) finally: @@ -413,6 +509,7 @@ class ProcessRegistry: quoted_log_path = shlex.quote(log_path) quoted_pid_path = shlex.quote(pid_path) quoted_exit_path = shlex.quote(exit_path) + prev_output_len = 0 # track delta for watch pattern scanning while not session.exited: time.sleep(2) # Poll every 2 seconds try: @@ -420,10 +517,15 @@ class ProcessRegistry: result = env.execute(f"cat {quoted_log_path} 2>/dev/null", timeout=10) new_output = result.get("output", "") if new_output: + # Compute delta for watch pattern scanning + delta = new_output[prev_output_len:] if len(new_output) > prev_output_len else "" + prev_output_len = len(new_output) with session._lock: session.output_buffer = new_output if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] + if delta: + self._check_watch_patterns(session, delta) # Check if process is still running check = env.execute( @@ -467,6 +569,7 @@ class ProcessRegistry: session.output_buffer += text if len(session.output_buffer) > session.max_output_chars: session.output_buffer = session.output_buffer[-session.max_output_chars:] + self._check_watch_patterns(session, text) except EOFError: break except Exception: @@ -872,6 +975,7 @@ class ProcessRegistry: "watcher_thread_id": s.watcher_thread_id, "watcher_interval": s.watcher_interval, "notify_on_complete": s.notify_on_complete, + "watch_patterns": s.watch_patterns, }) # Atomic write to avoid corruption on crash @@ -932,6 +1036,7 @@ class ProcessRegistry: watcher_thread_id=entry.get("watcher_thread_id", ""), watcher_interval=entry.get("watcher_interval", 0), notify_on_complete=entry.get("notify_on_complete", False), + watch_patterns=entry.get("watch_patterns", []), ) with self._lock: self._running[session.id] = session diff --git a/tools/terminal_tool.py b/tools/terminal_tool.py index 42415a5f14a..859f0f1f368 100644 --- a/tools/terminal_tool.py +++ b/tools/terminal_tool.py @@ -42,7 +42,7 @@ import atexit import shutil import subprocess from pathlib import Path -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List logger = logging.getLogger(__name__) @@ -1140,6 +1140,7 @@ def terminal_tool( check_interval: Optional[int] = None, pty: bool = False, notify_on_complete: bool = False, + watch_patterns: Optional[List[str]] = None, ) -> str: """ Execute a command in the configured terminal environment. @@ -1154,6 +1155,7 @@ def terminal_tool( check_interval: Seconds between auto-checks for background processes (gateway only, min 30) pty: If True, use pseudo-terminal for interactive CLI tools (local backend only) notify_on_complete: If True and background=True, auto-notify the agent when the process exits + watch_patterns: List of strings to watch for in background output; triggers notification on match Returns: str: JSON string with output, exit_code, and error fields @@ -1439,6 +1441,11 @@ def terminal_tool( "notify_on_complete": True, }) + # Set watch patterns for output monitoring + if watch_patterns and background: + proc_session.watch_patterns = list(watch_patterns) + result_data["watch_patterns"] = proc_session.watch_patterns + # Register check_interval watcher (gateway picks this up after agent run) if check_interval and background: effective_interval = max(30, check_interval) @@ -1762,6 +1769,11 @@ TERMINAL_SCHEMA = { "type": "boolean", "description": "When true (and background=true), you'll be automatically notified when the process finishes — no polling needed. Use this for tasks that take a while (tests, builds, deployments) so you can keep working on other things in the meantime.", "default": False + }, + "watch_patterns": { + "type": "array", + "items": {"type": "string"}, + "description": "List of strings to watch for in background process output. When any pattern matches a line of output, you'll be notified with the matching text — like notify_on_complete but triggers mid-process on specific output. Use for monitoring logs, watching for errors, or waiting for specific events (e.g. [\"ERROR\", \"FAIL\", \"listening on port\"])." } }, "required": ["command"] @@ -1779,6 +1791,7 @@ def _handle_terminal(args, **kw): check_interval=args.get("check_interval"), pty=args.get("pty", False), notify_on_complete=args.get("notify_on_complete", False), + watch_patterns=args.get("watch_patterns"), )