diff --git a/tools/environments/daytona.py b/tools/environments/daytona.py index eb2a6731106..011a640e3c3 100644 --- a/tools/environments/daytona.py +++ b/tools/environments/daytona.py @@ -6,20 +6,72 @@ and resumed on next creation, preserving the filesystem across sessions. """ import logging -import time import math +import os import shlex import threading import uuid import warnings -from typing import Optional +from pathlib import Path +from typing import Dict, Optional from tools.environments.base import BaseEnvironment -from tools.interrupt import is_interrupted logger = logging.getLogger(__name__) +class _DaytonaProcessHandle: + """Adapter making Daytona's blocking SDK exec look like Popen.""" + + def __init__(self, sandbox, cmd_string, cwd, timeout): + self._done = threading.Event() + self._returncode = None + self._read_fd, self._write_fd = os.pipe() + self.stdout = os.fdopen(self._read_fd, "r") + self.stdin = None + + # Wrap with shell timeout (Daytona SDK timeout is unreliable) + timed_cmd = f"timeout {timeout} bash -c {shlex.quote(cmd_string)}" + + def _run(): + try: + response = sandbox.process.exec(timed_cmd, cwd=cwd) + writer = os.fdopen(self._write_fd, "w") + writer.write(response.result or "") + writer.close() + self._returncode = response.exit_code + except Exception as e: + try: + writer = os.fdopen(self._write_fd, "w") + writer.write(f"Daytona execution error: {e}") + writer.close() + except Exception: + try: + os.close(self._write_fd) + except Exception: + pass + self._returncode = 1 + finally: + self._done.set() + + self._thread = threading.Thread(target=_run, daemon=True) + self._thread.start() + + def poll(self): + return self._returncode if self._done.is_set() else None + + def kill(self): + pass + + def wait(self, timeout=None): + self._done.wait(timeout=timeout) + return self._returncode + + @property + def returncode(self): + return self._returncode + + class DaytonaEnvironment(BaseEnvironment): """Daytona cloud sandbox execution backend. @@ -132,6 +184,13 @@ class DaytonaEnvironment(BaseEnvironment): # Upload credential files and skills directory into the sandbox. self._sync_skills_and_credentials() + # Capture login-shell environment into a snapshot for the unified model + self.init_session() + + # ------------------------------------------------------------------ + # File sync + # ------------------------------------------------------------------ + def _upload_if_changed(self, host_path: str, remote_path: str) -> bool: """Upload a file if its mtime/size changed since last sync.""" hp = Path(host_path) @@ -176,111 +235,35 @@ class DaytonaEnvironment(BaseEnvironment): self._sandbox.start() logger.info("Daytona: restarted sandbox %s", self._sandbox.id) - def _exec_in_thread(self, exec_command: str, cwd: Optional[str], timeout: int) -> dict: - """Run exec in a background thread with interrupt polling. + # ------------------------------------------------------------------ + # Unified execution hooks + # ------------------------------------------------------------------ - The Daytona SDK's exec(timeout=...) parameter is unreliable (the - server-side timeout is not enforced and the SDK has no client-side - fallback), so we wrap the command with the shell ``timeout`` utility - which reliably kills the process and returns exit code 124. - """ - # Wrap with shell `timeout` to enforce the deadline reliably. - # Add a small buffer so the shell timeout fires before any SDK-level - # timeout would, giving us a clean exit code 124. - timed_command = f"timeout {timeout} sh -c {shlex.quote(exec_command)}" - - result_holder: dict = {"value": None, "error": None} - - def _run(): - try: - response = self._sandbox.process.exec( - timed_command, cwd=cwd, - ) - result_holder["value"] = { - "output": response.result or "", - "returncode": response.exit_code, - } - except Exception as e: - result_holder["error"] = e - - t = threading.Thread(target=_run, daemon=True) - t.start() - # Wait for timeout + generous buffer for network/SDK overhead - deadline = time.monotonic() + timeout + 10 - while t.is_alive(): - t.join(timeout=0.2) - if is_interrupted(): - with self._lock: - try: - self._sandbox.stop() - except Exception: - pass - return { - "output": "[Command interrupted - Daytona sandbox stopped]", - "returncode": 130, - } - if time.monotonic() > deadline: - # Shell timeout didn't fire and SDK is hung — force stop - with self._lock: - try: - self._sandbox.stop() - except Exception: - pass - return self._timeout_result(timeout) - - if result_holder["error"]: - return {"error": result_holder["error"]} - return result_holder["value"] - - def execute(self, command: str, cwd: str = "", *, - timeout: Optional[int] = None, - stdin_data: Optional[str] = None) -> dict: + def _before_execute(self) -> None: + """Ensure sandbox is ready and sync credentials before each command.""" with self._lock: self._ensure_sandbox_ready() - # Incremental sync before each command so mid-session credential - # refreshes and skill updates are picked up. self._sync_skills_and_credentials() + def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None): + """Spawn ``bash -c `` inside the Daytona sandbox. + + Returns a _DaytonaProcessHandle (satisfies the ProcessHandle protocol). + stdin_data is embedded as a heredoc since Daytona cannot pipe stdin. + """ if stdin_data is not None: marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" while marker in stdin_data: marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" - command = f"{command} << '{marker}'\n{stdin_data}\n{marker}" + cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}" + effective_cwd = self.cwd or None + return _DaytonaProcessHandle( + self._sandbox, cmd_string, effective_cwd, self.timeout, + ) - exec_command, sudo_stdin = self._prepare_command(command) - - # Daytona sandboxes execute commands via the Daytona SDK and cannot - # pipe subprocess stdin directly the way a local Popen can. When a - # sudo password is present, use a shell-level pipe from printf so that - # the password feeds sudo -S without appearing as an echo argument - # embedded in the shell string. The password is still visible in the - # remote sandbox's command line, but it is not exposed on the user's - # local machine — which is the primary threat being mitigated. - if sudo_stdin is not None: - import shlex - exec_command = ( - f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}" - ) - effective_cwd = cwd or self.cwd or None - effective_timeout = timeout or self.timeout - - result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout) - - if "error" in result: - from daytona import DaytonaError - err = result["error"] - if isinstance(err, DaytonaError): - with self._lock: - try: - self._ensure_sandbox_ready() - except Exception: - return {"output": f"Daytona execution error: {err}", "returncode": 1} - result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout) - if "error" not in result: - return result - return {"output": f"Daytona execution error: {err}", "returncode": 1} - - return result + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ def cleanup(self): with self._lock: diff --git a/tools/environments/modal.py b/tools/environments/modal.py index 7916a2c449a..beac5de4b1e 100644 --- a/tools/environments/modal.py +++ b/tools/environments/modal.py @@ -7,18 +7,16 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions. import asyncio import json import logging +import os import shlex import threading +import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Optional from hermes_constants import get_hermes_home -from tools.environments.modal_common import ( - BaseModalExecutionEnvironment, - ModalExecStart, - PreparedModalExec, -) +from tools.environments.base import BaseEnvironment logger = logging.getLogger(__name__) @@ -138,19 +136,82 @@ class _AsyncWorker: self._thread.join(timeout=10) -@dataclass -class _DirectModalExecHandle: - thread: threading.Thread - result_holder: Dict[str, Any] +class _ModalProcessHandle: + """Adapter making Modal's async sandbox.exec look like Popen.""" + + def __init__(self, worker, sandbox, cmd_string, timeout): + self._done = threading.Event() + self._returncode = None + self._read_fd, self._write_fd = os.pipe() + self.stdout = os.fdopen(self._read_fd, "r") + self.stdin = None + + def _run(): + try: + async def _exec(): + process = await sandbox.exec.aio( + "bash", "-c", cmd_string, timeout=timeout, + ) + stdout = await process.stdout.read.aio() + stderr = await process.stderr.read.aio() + exit_code = await process.wait.aio() + return stdout, stderr, exit_code + + stdout, stderr, exit_code = worker.run_coroutine( + _exec(), timeout=timeout + 30, + ) + writer = os.fdopen(self._write_fd, "w") + if stdout: + writer.write( + stdout if isinstance(stdout, str) + else stdout.decode("utf-8", errors="replace") + ) + if stderr: + writer.write( + stderr if isinstance(stderr, str) + else stderr.decode("utf-8", errors="replace") + ) + writer.close() + self._returncode = exit_code + except Exception as e: + try: + writer = os.fdopen(self._write_fd, "w") + writer.write(f"Modal execution error: {e}") + writer.close() + except Exception: + try: + os.close(self._write_fd) + except Exception: + pass + self._returncode = 1 + finally: + self._done.set() + + self._thread = threading.Thread(target=_run, daemon=True) + self._thread.start() + + def poll(self): + return self._returncode if self._done.is_set() else None + + def kill(self): + pass # Handled by sandbox termination in environment + + def wait(self, timeout=None): + self._done.wait(timeout=timeout) + return self._returncode + + @property + def returncode(self): + return self._returncode -class ModalEnvironment(BaseModalExecutionEnvironment): - """Modal cloud execution via native Modal sandboxes.""" +class ModalEnvironment(BaseEnvironment): + """Modal cloud execution via native Modal sandboxes. - _stdin_mode = "heredoc" - _poll_interval_seconds = 0.2 - _interrupt_output = "[Command interrupted - Modal sandbox terminated]" - _unexpected_error_prefix = "Modal execution error" + Uses the unified spawn-per-call model: _run_bash() returns a + _ModalProcessHandle that wraps Modal's async SDK in a thread + OS pipe, + satisfying the ProcessHandle protocol for BaseEnvironment._wait_for_process(). + """ def __init__( self, @@ -186,11 +247,7 @@ class ModalEnvironment(BaseModalExecutionEnvironment): cred_mounts = [] try: - from tools.credential_files import ( - get_credential_file_mounts, - iter_skills_files, - iter_cache_files, - ) + from tools.credential_files import get_credential_file_mounts, iter_skills_files for mount_entry in get_credential_file_mounts(): cred_mounts.append( @@ -216,20 +273,6 @@ class ModalEnvironment(BaseModalExecutionEnvironment): ) if skills_files: logger.info("Modal: mounting %d skill files", len(skills_files)) - - # Mount host-side cache files (documents, images, audio, - # screenshots). New files arriving mid-session are picked up - # by _sync_files() before each command execution. - cache_files = iter_cache_files() - for entry in cache_files: - cred_mounts.append( - _modal.Mount.from_local_file( - entry["host_path"], - remote_path=entry["container_path"], - ) - ) - if cache_files: - logger.info("Modal: mounting %d cache files", len(cache_files)) except Exception as e: logger.debug("Modal: could not load credential file mounts: %s", e) @@ -292,6 +335,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment): logger.info("Modal: sandbox created (task=%s)", self._task_id) + # Capture login-shell environment into a snapshot for the unified model + self.init_session() + + # ------------------------------------------------------------------ + # File sync + # ------------------------------------------------------------------ + def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool: """Push a single file into the sandbox if changed. Returns True if synced.""" hp = Path(host_path) @@ -326,19 +376,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment): return True def _sync_files(self) -> None: - """Push credential, skill, and cache files into the running sandbox. + """Push credential files and skill files into the running sandbox. Runs before each command. Uses mtime+size caching so only changed - files are pushed (~13μs overhead in the no-op case). Cache files - are especially important here — new uploads/screenshots may appear - mid-session after sandbox creation. + files are pushed (~13us overhead in the no-op case). """ try: - from tools.credential_files import ( - get_credential_file_mounts, - iter_skills_files, - iter_cache_files, - ) + from tools.credential_files import get_credential_file_mounts, iter_skills_files for entry in get_credential_file_mounts(): if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]): @@ -347,65 +391,35 @@ class ModalEnvironment(BaseModalExecutionEnvironment): for entry in iter_skills_files(): if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]): logger.debug("Modal: synced skill file %s", entry["container_path"]) - - for entry in iter_cache_files(): - if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]): - logger.debug("Modal: synced cache file %s", entry["container_path"]) except Exception as e: logger.debug("Modal: file sync failed: %s", e) + # ------------------------------------------------------------------ + # Unified execution hooks + # ------------------------------------------------------------------ + def _before_execute(self) -> None: self._sync_files() - def _start_modal_exec(self, prepared: PreparedModalExec) -> ModalExecStart: - full_command = f"cd {shlex.quote(prepared.cwd)} && {prepared.command}" - result_holder = {"value": None, "error": None} + def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None): + """Spawn ``bash -c `` inside the Modal sandbox. - def _run(): - try: - async def _do_execute(): - process = await self._sandbox.exec.aio( - "bash", - "-c", - full_command, - timeout=prepared.timeout, - ) - stdout = await process.stdout.read.aio() - stderr = await process.stderr.read.aio() - exit_code = await process.wait.aio() - if isinstance(stdout, bytes): - stdout = stdout.decode("utf-8", errors="replace") - if isinstance(stderr, bytes): - stderr = stderr.decode("utf-8", errors="replace") - output = stdout - if stderr: - output = f"{stdout}\n{stderr}" if stdout else stderr - return self._result(output, exit_code) - - result_holder["value"] = self._worker.run_coroutine( - _do_execute(), - timeout=prepared.timeout + 30, - ) - except Exception as e: - result_holder["error"] = e - - t = threading.Thread(target=_run, daemon=True) - t.start() - return ModalExecStart(handle=_DirectModalExecHandle(thread=t, result_holder=result_holder)) - - def _poll_modal_exec(self, handle: _DirectModalExecHandle) -> dict | None: - if handle.thread.is_alive(): - return None - if handle.result_holder["error"]: - return self._error_result(f"Modal execution error: {handle.result_holder['error']}") - return handle.result_holder["value"] - - def _cancel_modal_exec(self, handle: _DirectModalExecHandle) -> None: - self._worker.run_coroutine( - self._sandbox.terminate.aio(), - timeout=15, + Returns a _ModalProcessHandle (satisfies the ProcessHandle protocol). + stdin_data is embedded as a heredoc since Modal cannot pipe stdin. + """ + if stdin_data is not None: + marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" + while marker in stdin_data: + marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}" + cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}" + return _ModalProcessHandle( + self._worker, self._sandbox, cmd_string, self.timeout, ) + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + def cleanup(self): """Snapshot the filesystem (if persistent) then stop the sandbox.""" if self._sandbox is None: