refactor(environments): migrate Modal + Daytona to unified model — Phase 5+6

ModalEnvironment:
- Create _ModalProcessHandle adapter (async SDK → ProcessHandle protocol)
- Routes async sandbox.exec through thread + OS pipe for stdout
- Remove BaseModalExecutionEnvironment inheritance, use BaseEnvironment
- Remove _start_modal_exec/_poll_modal_exec/_cancel_modal_exec
- Move file sync to _before_execute hook
- Preserve _AsyncWorker, sandbox lifecycle, snapshot management

DaytonaEnvironment:
- Create _DaytonaProcessHandle adapter (blocking SDK → ProcessHandle)
- Preserves shell timeout wrapper (SDK timeout unreliable)
- Add _run_bash with heredoc stdin embedding
- Move file sync to _before_execute hook
- Preserve sandbox lifecycle, persistent resume logic

ManagedModal left unchanged (HTTP-based, keeps modal_common.py dep).
42/42 local tests passing. SDK backends untested (require credentials).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
alt-glitch
2026-04-02 13:49:50 +05:30
committed by Hermes Agent
parent 0482133559
commit ef8a985ee3
2 changed files with 188 additions and 191 deletions

View File

@@ -6,20 +6,72 @@ and resumed on next creation, preserving the filesystem across sessions.
"""
import logging
import time
import math
import os
import shlex
import threading
import uuid
import warnings
from typing import Optional
from pathlib import Path
from typing import Dict, Optional
from tools.environments.base import BaseEnvironment
from tools.interrupt import is_interrupted
logger = logging.getLogger(__name__)
class _DaytonaProcessHandle:
"""Adapter making Daytona's blocking SDK exec look like Popen."""
def __init__(self, sandbox, cmd_string, cwd, timeout):
self._done = threading.Event()
self._returncode = None
self._read_fd, self._write_fd = os.pipe()
self.stdout = os.fdopen(self._read_fd, "r")
self.stdin = None
# Wrap with shell timeout (Daytona SDK timeout is unreliable)
timed_cmd = f"timeout {timeout} bash -c {shlex.quote(cmd_string)}"
def _run():
try:
response = sandbox.process.exec(timed_cmd, cwd=cwd)
writer = os.fdopen(self._write_fd, "w")
writer.write(response.result or "")
writer.close()
self._returncode = response.exit_code
except Exception as e:
try:
writer = os.fdopen(self._write_fd, "w")
writer.write(f"Daytona execution error: {e}")
writer.close()
except Exception:
try:
os.close(self._write_fd)
except Exception:
pass
self._returncode = 1
finally:
self._done.set()
self._thread = threading.Thread(target=_run, daemon=True)
self._thread.start()
def poll(self):
return self._returncode if self._done.is_set() else None
def kill(self):
pass
def wait(self, timeout=None):
self._done.wait(timeout=timeout)
return self._returncode
@property
def returncode(self):
return self._returncode
class DaytonaEnvironment(BaseEnvironment):
"""Daytona cloud sandbox execution backend.
@@ -132,6 +184,13 @@ class DaytonaEnvironment(BaseEnvironment):
# Upload credential files and skills directory into the sandbox.
self._sync_skills_and_credentials()
# Capture login-shell environment into a snapshot for the unified model
self.init_session()
# ------------------------------------------------------------------
# File sync
# ------------------------------------------------------------------
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool:
"""Upload a file if its mtime/size changed since last sync."""
hp = Path(host_path)
@@ -176,111 +235,35 @@ class DaytonaEnvironment(BaseEnvironment):
self._sandbox.start()
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
def _exec_in_thread(self, exec_command: str, cwd: Optional[str], timeout: int) -> dict:
"""Run exec in a background thread with interrupt polling.
# ------------------------------------------------------------------
# Unified execution hooks
# ------------------------------------------------------------------
The Daytona SDK's exec(timeout=...) parameter is unreliable (the
server-side timeout is not enforced and the SDK has no client-side
fallback), so we wrap the command with the shell ``timeout`` utility
which reliably kills the process and returns exit code 124.
"""
# Wrap with shell `timeout` to enforce the deadline reliably.
# Add a small buffer so the shell timeout fires before any SDK-level
# timeout would, giving us a clean exit code 124.
timed_command = f"timeout {timeout} sh -c {shlex.quote(exec_command)}"
result_holder: dict = {"value": None, "error": None}
def _run():
try:
response = self._sandbox.process.exec(
timed_command, cwd=cwd,
)
result_holder["value"] = {
"output": response.result or "",
"returncode": response.exit_code,
}
except Exception as e:
result_holder["error"] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
# Wait for timeout + generous buffer for network/SDK overhead
deadline = time.monotonic() + timeout + 10
while t.is_alive():
t.join(timeout=0.2)
if is_interrupted():
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return {
"output": "[Command interrupted - Daytona sandbox stopped]",
"returncode": 130,
}
if time.monotonic() > deadline:
# Shell timeout didn't fire and SDK is hung — force stop
with self._lock:
try:
self._sandbox.stop()
except Exception:
pass
return self._timeout_result(timeout)
if result_holder["error"]:
return {"error": result_holder["error"]}
return result_holder["value"]
def execute(self, command: str, cwd: str = "", *,
timeout: Optional[int] = None,
stdin_data: Optional[str] = None) -> dict:
def _before_execute(self) -> None:
"""Ensure sandbox is ready and sync credentials before each command."""
with self._lock:
self._ensure_sandbox_ready()
# Incremental sync before each command so mid-session credential
# refreshes and skill updates are picked up.
self._sync_skills_and_credentials()
def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None):
"""Spawn ``bash -c <cmd_string>`` inside the Daytona sandbox.
Returns a _DaytonaProcessHandle (satisfies the ProcessHandle protocol).
stdin_data is embedded as a heredoc since Daytona cannot pipe stdin.
"""
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
command = f"{command} << '{marker}'\n{stdin_data}\n{marker}"
cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}"
effective_cwd = self.cwd or None
return _DaytonaProcessHandle(
self._sandbox, cmd_string, effective_cwd, self.timeout,
)
exec_command, sudo_stdin = self._prepare_command(command)
# Daytona sandboxes execute commands via the Daytona SDK and cannot
# pipe subprocess stdin directly the way a local Popen can. When a
# sudo password is present, use a shell-level pipe from printf so that
# the password feeds sudo -S without appearing as an echo argument
# embedded in the shell string. The password is still visible in the
# remote sandbox's command line, but it is not exposed on the user's
# local machine — which is the primary threat being mitigated.
if sudo_stdin is not None:
import shlex
exec_command = (
f"printf '%s\\n' {shlex.quote(sudo_stdin.rstrip())} | {exec_command}"
)
effective_cwd = cwd or self.cwd or None
effective_timeout = timeout or self.timeout
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" in result:
from daytona import DaytonaError
err = result["error"]
if isinstance(err, DaytonaError):
with self._lock:
try:
self._ensure_sandbox_ready()
except Exception:
return {"output": f"Daytona execution error: {err}", "returncode": 1}
result = self._exec_in_thread(exec_command, effective_cwd, effective_timeout)
if "error" not in result:
return result
return {"output": f"Daytona execution error: {err}", "returncode": 1}
return result
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def cleanup(self):
with self._lock:

View File

@@ -7,18 +7,16 @@ wrapper, while preserving Hermes' persistent snapshot behavior across sessions.
import asyncio
import json
import logging
import os
import shlex
import threading
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Optional
from hermes_constants import get_hermes_home
from tools.environments.modal_common import (
BaseModalExecutionEnvironment,
ModalExecStart,
PreparedModalExec,
)
from tools.environments.base import BaseEnvironment
logger = logging.getLogger(__name__)
@@ -138,19 +136,82 @@ class _AsyncWorker:
self._thread.join(timeout=10)
@dataclass
class _DirectModalExecHandle:
thread: threading.Thread
result_holder: Dict[str, Any]
class _ModalProcessHandle:
"""Adapter making Modal's async sandbox.exec look like Popen."""
def __init__(self, worker, sandbox, cmd_string, timeout):
self._done = threading.Event()
self._returncode = None
self._read_fd, self._write_fd = os.pipe()
self.stdout = os.fdopen(self._read_fd, "r")
self.stdin = None
def _run():
try:
async def _exec():
process = await sandbox.exec.aio(
"bash", "-c", cmd_string, timeout=timeout,
)
stdout = await process.stdout.read.aio()
stderr = await process.stderr.read.aio()
exit_code = await process.wait.aio()
return stdout, stderr, exit_code
stdout, stderr, exit_code = worker.run_coroutine(
_exec(), timeout=timeout + 30,
)
writer = os.fdopen(self._write_fd, "w")
if stdout:
writer.write(
stdout if isinstance(stdout, str)
else stdout.decode("utf-8", errors="replace")
)
if stderr:
writer.write(
stderr if isinstance(stderr, str)
else stderr.decode("utf-8", errors="replace")
)
writer.close()
self._returncode = exit_code
except Exception as e:
try:
writer = os.fdopen(self._write_fd, "w")
writer.write(f"Modal execution error: {e}")
writer.close()
except Exception:
try:
os.close(self._write_fd)
except Exception:
pass
self._returncode = 1
finally:
self._done.set()
self._thread = threading.Thread(target=_run, daemon=True)
self._thread.start()
def poll(self):
return self._returncode if self._done.is_set() else None
def kill(self):
pass # Handled by sandbox termination in environment
def wait(self, timeout=None):
self._done.wait(timeout=timeout)
return self._returncode
@property
def returncode(self):
return self._returncode
class ModalEnvironment(BaseModalExecutionEnvironment):
"""Modal cloud execution via native Modal sandboxes."""
class ModalEnvironment(BaseEnvironment):
"""Modal cloud execution via native Modal sandboxes.
_stdin_mode = "heredoc"
_poll_interval_seconds = 0.2
_interrupt_output = "[Command interrupted - Modal sandbox terminated]"
_unexpected_error_prefix = "Modal execution error"
Uses the unified spawn-per-call model: _run_bash() returns a
_ModalProcessHandle that wraps Modal's async SDK in a thread + OS pipe,
satisfying the ProcessHandle protocol for BaseEnvironment._wait_for_process().
"""
def __init__(
self,
@@ -186,11 +247,7 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
cred_mounts = []
try:
from tools.credential_files import (
get_credential_file_mounts,
iter_skills_files,
iter_cache_files,
)
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for mount_entry in get_credential_file_mounts():
cred_mounts.append(
@@ -216,20 +273,6 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
)
if skills_files:
logger.info("Modal: mounting %d skill files", len(skills_files))
# Mount host-side cache files (documents, images, audio,
# screenshots). New files arriving mid-session are picked up
# by _sync_files() before each command execution.
cache_files = iter_cache_files()
for entry in cache_files:
cred_mounts.append(
_modal.Mount.from_local_file(
entry["host_path"],
remote_path=entry["container_path"],
)
)
if cache_files:
logger.info("Modal: mounting %d cache files", len(cache_files))
except Exception as e:
logger.debug("Modal: could not load credential file mounts: %s", e)
@@ -292,6 +335,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
logger.info("Modal: sandbox created (task=%s)", self._task_id)
# Capture login-shell environment into a snapshot for the unified model
self.init_session()
# ------------------------------------------------------------------
# File sync
# ------------------------------------------------------------------
def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool:
"""Push a single file into the sandbox if changed. Returns True if synced."""
hp = Path(host_path)
@@ -326,19 +376,13 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
return True
def _sync_files(self) -> None:
"""Push credential, skill, and cache files into the running sandbox.
"""Push credential files and skill files into the running sandbox.
Runs before each command. Uses mtime+size caching so only changed
files are pushed (~13μs overhead in the no-op case). Cache files
are especially important here — new uploads/screenshots may appear
mid-session after sandbox creation.
files are pushed (~13us overhead in the no-op case).
"""
try:
from tools.credential_files import (
get_credential_file_mounts,
iter_skills_files,
iter_cache_files,
)
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for entry in get_credential_file_mounts():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
@@ -347,65 +391,35 @@ class ModalEnvironment(BaseModalExecutionEnvironment):
for entry in iter_skills_files():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
logger.debug("Modal: synced skill file %s", entry["container_path"])
for entry in iter_cache_files():
if self._push_file_to_sandbox(entry["host_path"], entry["container_path"]):
logger.debug("Modal: synced cache file %s", entry["container_path"])
except Exception as e:
logger.debug("Modal: file sync failed: %s", e)
# ------------------------------------------------------------------
# Unified execution hooks
# ------------------------------------------------------------------
def _before_execute(self) -> None:
self._sync_files()
def _start_modal_exec(self, prepared: PreparedModalExec) -> ModalExecStart:
full_command = f"cd {shlex.quote(prepared.cwd)} && {prepared.command}"
result_holder = {"value": None, "error": None}
def _run_bash(self, cmd_string: str, *, stdin_data: str | None = None):
"""Spawn ``bash -c <cmd_string>`` inside the Modal sandbox.
def _run():
try:
async def _do_execute():
process = await self._sandbox.exec.aio(
"bash",
"-c",
full_command,
timeout=prepared.timeout,
)
stdout = await process.stdout.read.aio()
stderr = await process.stderr.read.aio()
exit_code = await process.wait.aio()
if isinstance(stdout, bytes):
stdout = stdout.decode("utf-8", errors="replace")
if isinstance(stderr, bytes):
stderr = stderr.decode("utf-8", errors="replace")
output = stdout
if stderr:
output = f"{stdout}\n{stderr}" if stdout else stderr
return self._result(output, exit_code)
result_holder["value"] = self._worker.run_coroutine(
_do_execute(),
timeout=prepared.timeout + 30,
)
except Exception as e:
result_holder["error"] = e
t = threading.Thread(target=_run, daemon=True)
t.start()
return ModalExecStart(handle=_DirectModalExecHandle(thread=t, result_holder=result_holder))
def _poll_modal_exec(self, handle: _DirectModalExecHandle) -> dict | None:
if handle.thread.is_alive():
return None
if handle.result_holder["error"]:
return self._error_result(f"Modal execution error: {handle.result_holder['error']}")
return handle.result_holder["value"]
def _cancel_modal_exec(self, handle: _DirectModalExecHandle) -> None:
self._worker.run_coroutine(
self._sandbox.terminate.aio(),
timeout=15,
Returns a _ModalProcessHandle (satisfies the ProcessHandle protocol).
stdin_data is embedded as a heredoc since Modal cannot pipe stdin.
"""
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
cmd_string = f"{cmd_string} << '{marker}'\n{stdin_data}\n{marker}"
return _ModalProcessHandle(
self._worker, self._sandbox, cmd_string, self.timeout,
)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def cleanup(self):
"""Snapshot the filesystem (if persistent) then stop the sandbox."""
if self._sandbox is None: