diff --git a/tools/environments/docker.py b/tools/environments/docker.py index 1d2d325cbae..be66e228c22 100644 --- a/tools/environments/docker.py +++ b/tools/environments/docker.py @@ -8,17 +8,13 @@ persistence via bind mounts. import logging import os import re -import shlex import shutil import subprocess import sys -import threading -import time import uuid from typing import Optional from tools.environments.base import BaseEnvironment -from tools.interrupt import is_interrupted logger = logging.getLogger(__name__) @@ -61,36 +57,6 @@ def _normalize_forward_env_names(forward_env: list[str] | None) -> list[str]: return normalized -def _normalize_env_dict(env: dict | None) -> dict[str, str]: - """Validate and normalize a docker_env dict to {str: str}. - - Filters out entries with invalid variable names or non-string values. - """ - if not env: - return {} - if not isinstance(env, dict): - logger.warning("docker_env is not a dict: %r", env) - return {} - - normalized: dict[str, str] = {} - for key, value in env.items(): - if not isinstance(key, str) or not _ENV_VAR_NAME_RE.match(key.strip()): - logger.warning("Ignoring invalid docker_env key: %r", key) - continue - key = key.strip() - if not isinstance(value, str): - # Coerce simple scalar types (int, bool, float) to string; - # reject complex types. - if isinstance(value, (int, float, bool)): - value = str(value) - else: - logger.warning("Ignoring non-string docker_env value for %r: %r", key, value) - continue - normalized[key] = value - - return normalized - - def _load_hermes_env_vars() -> dict[str, str]: """Load ~/.hermes/.env values without failing Docker command execution.""" try: @@ -241,7 +207,6 @@ class DockerEnvironment(BaseEnvironment): task_id: str = "default", volumes: list = None, forward_env: list[str] | None = None, - env: dict | None = None, network: bool = True, host_cwd: str = None, auto_mount_cwd: bool = False, @@ -253,7 +218,6 @@ class DockerEnvironment(BaseEnvironment): self._persistent = persistent_filesystem self._task_id = task_id self._forward_env = _normalize_forward_env_names(forward_env) - self._env = _normalize_env_dict(env) self._container_id: Optional[str] = None logger.info(f"DockerEnvironment volumes: {volumes}") # Ensure volumes is a list (config.yaml could be malformed) @@ -348,11 +312,7 @@ class DockerEnvironment(BaseEnvironment): # Mount credential files (OAuth tokens, etc.) declared by skills. # Read-only so the container can authenticate but not modify host creds. try: - from tools.credential_files import ( - get_credential_file_mounts, - get_skills_directory_mount, - get_cache_directory_mounts, - ) + from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount for mount_entry in get_credential_file_mounts(): volume_args.extend([ @@ -365,9 +325,10 @@ class DockerEnvironment(BaseEnvironment): mount_entry["container_path"], ) - # Mount skill directories (local + external) so skill - # scripts/templates are available inside the container. - for skills_mount in get_skills_directory_mount(): + # Mount the skills directory so skill scripts/templates are + # available inside the container at the same relative path. + skills_mount = get_skills_directory_mount() + if skills_mount: volume_args.extend([ "-v", f"{skills_mount['host_path']}:{skills_mount['container_path']}:ro", @@ -377,32 +338,11 @@ class DockerEnvironment(BaseEnvironment): skills_mount["host_path"], skills_mount["container_path"], ) - - # Mount host-side cache directories (documents, images, audio, - # screenshots) so the agent can access uploaded files and other - # cached media from inside the container. Read-only — the - # container reads these but the host gateway manages writes. - for cache_mount in get_cache_directory_mounts(): - volume_args.extend([ - "-v", - f"{cache_mount['host_path']}:{cache_mount['container_path']}:ro", - ]) - logger.info( - "Docker: mounting cache dir %s -> %s", - cache_mount["host_path"], - cache_mount["container_path"], - ) except Exception as e: logger.debug("Docker: could not load credential file mounts: %s", e) - # Explicit environment variables (docker_env config) — set at container - # creation so they're available to all processes (including entrypoint). - env_args = [] - for key in sorted(self._env): - env_args.extend(["-e", f"{key}={self._env[key]}"]) - logger.info(f"Docker volume_args: {volume_args}") - all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args + env_args + all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args logger.info(f"Docker run_args: {all_run_args}") # Resolve the docker executable once so it works even when @@ -430,6 +370,9 @@ class DockerEnvironment(BaseEnvironment): self._container_id = result.stdout.strip() logger.info(f"Started container {container_name} ({self._container_id[:12]})") + # Capture login-shell environment into a snapshot for the unified model + self.init_session() + @staticmethod def _storage_opt_supported() -> bool: """Check if Docker's storage driver supports --storage-opt size=. @@ -470,110 +413,86 @@ class DockerEnvironment(BaseEnvironment): logger.debug("Docker --storage-opt support: %s", _storage_opt_ok) return _storage_opt_ok - def execute(self, command: str, cwd: str = "", *, - timeout: int | None = None, - stdin_data: str | None = None) -> dict: - exec_command, sudo_stdin = self._prepare_command(command) - work_dir = cwd or self.cwd - effective_timeout = timeout or self.timeout + # ------------------------------------------------------------------ + # Unified execution primitives + # ------------------------------------------------------------------ - # Merge sudo password (if any) with caller-supplied stdin_data. - if sudo_stdin is not None and stdin_data is not None: - effective_stdin = sudo_stdin + stdin_data - elif sudo_stdin is not None: - effective_stdin = sudo_stdin - else: - effective_stdin = stdin_data - - # docker exec -w doesn't expand ~, so prepend a cd into the command. - # Keep ~ unquoted (for shell expansion) and quote only the subpath. - if work_dir == "~": - exec_command = f"cd ~ && {exec_command}" - work_dir = "/" - elif work_dir.startswith("~/"): - exec_command = f"cd ~/{shlex.quote(work_dir[2:])} && {exec_command}" - work_dir = "/" - - assert self._container_id, "Container not started" - cmd = [self._docker_exe, "exec"] - if effective_stdin is not None: - cmd.append("-i") - cmd.extend(["-w", work_dir]) - # Build the per-exec environment: start with explicit docker_env values - # (static config), then overlay docker_forward_env / skill env_passthrough - # (dynamic from host process). Forward values take precedence. - exec_env: dict[str, str] = dict(self._env) + def _build_forward_env_args(self) -> list[str]: + """Build ``-e KEY=VALUE`` arguments for docker exec env forwarding. + Combines explicit ``docker_forward_env`` with skill-declared + ``env_passthrough`` vars so skills that declare + ``required_environment_variables`` (e.g. Notion) have their keys + forwarded into the container automatically. + """ forward_keys = set(self._forward_env) try: from tools.env_passthrough import get_all_passthrough forward_keys |= get_all_passthrough() except Exception: pass + hermes_env = _load_hermes_env_vars() if forward_keys else {} + args: list[str] = [] for key in sorted(forward_keys): value = os.getenv(key) if value is None: value = hermes_env.get(key) if value is not None: - exec_env[key] = value + args.extend(["-e", f"{key}={value}"]) + return args - for key in sorted(exec_env): - cmd.extend(["-e", f"{key}={exec_env[key]}"]) - cmd.extend([self._container_id, "bash", "-lc", exec_command]) + def _run_bash(self, cmd_string: str, *, + stdin_data: str | None = None) -> subprocess.Popen: + """Spawn ``bash -c `` inside the Docker container.""" + assert self._container_id, "Container not started" + cmd = [self._docker_exe, "exec"] + if stdin_data is not None: + cmd.append("-i") + cmd.extend(self._build_forward_env_args()) + cmd.extend([self._container_id, "bash", "-c", cmd_string]) + proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL, + text=True, + ) + if stdin_data: + try: + proc.stdin.write(stdin_data) + proc.stdin.close() + except (BrokenPipeError, OSError): + pass + return proc - try: - _output_chunks = [] - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL, - text=True, - ) - if effective_stdin: - try: - proc.stdin.write(effective_stdin) - proc.stdin.close() - except Exception: - pass - - def _drain(): - try: - for line in proc.stdout: - _output_chunks.append(line) - except Exception: - pass - - reader = threading.Thread(target=_drain, daemon=True) - reader.start() - deadline = time.monotonic() + effective_timeout - - while proc.poll() is None: - if is_interrupted(): - proc.terminate() - try: - proc.wait(timeout=1) - except subprocess.TimeoutExpired: - proc.kill() - reader.join(timeout=2) - return { - "output": "".join(_output_chunks) + "\n[Command interrupted]", - "returncode": 130, - } - if time.monotonic() > deadline: - proc.kill() - reader.join(timeout=2) - return self._timeout_result(effective_timeout) - time.sleep(0.2) - - reader.join(timeout=5) - return {"output": "".join(_output_chunks), "returncode": proc.returncode} - except Exception as e: - return {"output": f"Docker execution error: {e}", "returncode": 1} + def _run_bash_login(self, cmd_string: str) -> subprocess.Popen: + """Spawn ``bash -l -c `` for snapshot creation.""" + assert self._container_id, "Container not started" + cmd = [self._docker_exe, "exec", self._container_id, + "bash", "-l", "-c", cmd_string] + return subprocess.Popen( + cmd, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + stdin=subprocess.DEVNULL, text=True, + ) def cleanup(self): """Stop and remove the container. Bind-mount dirs persist if persistent=True.""" if self._container_id: + # Clean up snapshot and cwdfile inside the container + paths_to_rm = " ".join( + p for p in (self._snapshot_path, self._cwdfile_path) if p + ) + if paths_to_rm: + try: + subprocess.run( + [self._docker_exe, "exec", self._container_id, + "rm", "-f", *paths_to_rm.split()], + capture_output=True, timeout=5, + ) + except Exception: + pass + try: # Stop in background so cleanup doesn't block stop_cmd = ( diff --git a/tools/environments/ssh.py b/tools/environments/ssh.py index afd28c4affa..3badab03b98 100644 --- a/tools/environments/ssh.py +++ b/tools/environments/ssh.py @@ -5,13 +5,9 @@ import shlex import shutil import subprocess import tempfile -import threading -import time from pathlib import Path from tools.environments.base import BaseEnvironment -from tools.environments.persistent_shell import PersistentShellMixin -from tools.interrupt import is_interrupted logger = logging.getLogger(__name__) @@ -24,7 +20,7 @@ def _ensure_ssh_available() -> None: ) -class SSHEnvironment(PersistentShellMixin, BaseEnvironment): +class SSHEnvironment(BaseEnvironment): """Run commands on a remote machine over SSH. Uses SSH ControlMaster for connection persistence so subsequent @@ -34,22 +30,20 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): Foreground commands are interruptible: the local ssh process is killed and a remote kill is attempted over the ControlMaster socket. - When ``persistent=True``, a single long-lived bash shell is kept alive - over SSH and state (cwd, env vars, shell variables) persists across - ``execute()`` calls. Output capture uses file-based IPC on the remote - host (stdout/stderr/exit-code written to temp files, polled via fast - ControlMaster one-shot reads). + Uses the unified spawn-per-call model: + - bash -l once at session start to capture env snapshot on the remote + - bash -c for every subsequent command (fast, no shell init overhead) + - CWD tracked via cwdfile written after each command on the remote host """ def __init__(self, host: str, user: str, cwd: str = "~", timeout: int = 60, port: int = 22, key_path: str = "", - persistent: bool = False): + **kwargs): super().__init__(cwd=cwd, timeout=timeout) self.host = host self.user = user self.port = port self.key_path = key_path - self.persistent = persistent self.control_dir = Path(tempfile.gettempdir()) / "hermes-ssh" self.control_dir.mkdir(parents=True, exist_ok=True) @@ -58,9 +52,7 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): self._establish_connection() self._remote_home = self._detect_remote_home() self._sync_skills_and_credentials() - - if self.persistent: - self._init_persistent_shell() + self.init_session() def _build_ssh_command(self, extra_args: list | None = None) -> list: cmd = ["ssh"] @@ -136,8 +128,9 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): else: logger.debug("SSH: rsync credential failed: %s", result.stderr.strip()) - # Sync skill directories (local + external, remap to detected home) - for skills_mount in get_skills_directory_mount(container_base=container_base): + # Sync skills directory (remap to detected home) + skills_mount = get_skills_directory_mount(container_base=container_base) + if skills_mount: remote_path = skills_mount["container_path"] mkdir_cmd = self._build_ssh_command() mkdir_cmd.append(f"mkdir -p {remote_path}") @@ -154,151 +147,67 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment): except Exception as e: logger.debug("SSH: could not sync skills/credentials: %s", e) - def execute(self, command: str, cwd: str = "", *, - timeout: int | None = None, - stdin_data: str | None = None) -> dict: - # Incremental sync before each command so mid-session credential - # refreshes and skill updates are picked up. + # ------------------------------------------------------------------ + # Unified execution hooks + # ------------------------------------------------------------------ + + def _before_execute(self): + """Incremental sync before each command so mid-session credential + refreshes and skill updates are picked up.""" self._sync_skills_and_credentials() - return super().execute(command, cwd, timeout=timeout, stdin_data=stdin_data) - _poll_interval_start: float = 0.15 # SSH: higher initial interval (150ms) for network latency - - @property - def _temp_prefix(self) -> str: - return f"/tmp/hermes-ssh-{self._session_id}" - - def _spawn_shell_process(self) -> subprocess.Popen: + def _run_bash(self, cmd_string, *, stdin_data=None): cmd = self._build_ssh_command() - cmd.append("bash -l") - return subprocess.Popen( + cmd.extend(["bash", "-c", shlex.quote(cmd_string)]) + proc = subprocess.Popen( cmd, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + stdin=subprocess.PIPE if stdin_data is not None else subprocess.DEVNULL, text=True, ) - - def _read_temp_files(self, *paths: str) -> list[str]: - if len(paths) == 1: - cmd = self._build_ssh_command() - cmd.append(f"cat {paths[0]} 2>/dev/null") + if stdin_data: try: - result = subprocess.run( - cmd, capture_output=True, text=True, timeout=10, - ) - return [result.stdout] - except (subprocess.TimeoutExpired, OSError): - return [""] + proc.stdin.write(stdin_data) + proc.stdin.close() + except (BrokenPipeError, OSError): + pass + return proc - delim = f"__HERMES_SEP_{self._session_id}__" - script = "; ".join( - f"cat {p} 2>/dev/null; echo '{delim}'" for p in paths - ) + def _run_bash_login(self, cmd_string): cmd = self._build_ssh_command() - cmd.append(script) + cmd.extend(["bash", "-l", "-c", shlex.quote(cmd_string)]) + return subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + stdin=subprocess.DEVNULL, text=True, + ) + + def _read_file_in_env(self, path: str) -> str: + """SSH override: use subprocess.run for single-shot cat, suppress stderr. + + SSH connection warnings (post-quantum, etc.) must not pollute + the cwdfile read — use separate stderr to discard them. + """ + cmd = self._build_ssh_command() + cmd.append(f"cat {shlex.quote(path)} 2>/dev/null") try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=10, ) - parts = result.stdout.split(delim + "\n") - return [parts[i] if i < len(parts) else "" for i in range(len(paths))] + return result.stdout except (subprocess.TimeoutExpired, OSError): - return [""] * len(paths) + return "" - def _kill_shell_children(self): - if self._shell_pid is None: - return - cmd = self._build_ssh_command() - cmd.append(f"pkill -P {self._shell_pid} 2>/dev/null; true") - try: - subprocess.run(cmd, capture_output=True, timeout=5) - except (subprocess.TimeoutExpired, OSError): - pass - - def _cleanup_temp_files(self): - cmd = self._build_ssh_command() - cmd.append(f"rm -f {self._temp_prefix}-*") - try: - subprocess.run(cmd, capture_output=True, timeout=5) - except (subprocess.TimeoutExpired, OSError): - pass - - def _execute_oneshot(self, command: str, cwd: str = "", *, - timeout: int | None = None, - stdin_data: str | None = None) -> dict: - work_dir = cwd or self.cwd - exec_command, sudo_stdin = self._prepare_command(command) - # Keep ~ unquoted (for shell expansion) and quote only the subpath. - if work_dir == "~": - wrapped = f'cd ~ && {exec_command}' - elif work_dir.startswith("~/"): - wrapped = f'cd ~/{shlex.quote(work_dir[2:])} && {exec_command}' - else: - wrapped = f'cd {shlex.quote(work_dir)} && {exec_command}' - effective_timeout = timeout or self.timeout - - if sudo_stdin is not None and stdin_data is not None: - effective_stdin = sudo_stdin + stdin_data - elif sudo_stdin is not None: - effective_stdin = sudo_stdin - else: - effective_stdin = stdin_data - - cmd = self._build_ssh_command() - cmd.append(wrapped) - - kwargs = self._build_run_kwargs(timeout, effective_stdin) - kwargs.pop("timeout", None) - _output_chunks = [] - proc = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - stdin=subprocess.PIPE if effective_stdin else subprocess.DEVNULL, - text=True, - ) - - if effective_stdin: + def cleanup(self): + # Clean up remote snapshot and cwdfile before closing ControlMaster + if self._snapshot_path or self._cwdfile_path: + paths = " ".join(p for p in (self._snapshot_path, self._cwdfile_path) if p) try: - proc.stdin.write(effective_stdin) - proc.stdin.close() - except (BrokenPipeError, OSError): - pass - - def _drain(): - try: - for line in proc.stdout: - _output_chunks.append(line) + cmd = self._build_ssh_command() + cmd.append(f"rm -f {paths}") + subprocess.run(cmd, capture_output=True, timeout=5) except Exception: pass - reader = threading.Thread(target=_drain, daemon=True) - reader.start() - deadline = time.monotonic() + effective_timeout - - while proc.poll() is None: - if is_interrupted(): - proc.terminate() - try: - proc.wait(timeout=1) - except subprocess.TimeoutExpired: - proc.kill() - reader.join(timeout=2) - return { - "output": "".join(_output_chunks) + "\n[Command interrupted]", - "returncode": 130, - } - if time.monotonic() > deadline: - proc.kill() - reader.join(timeout=2) - return self._timeout_result(effective_timeout) - time.sleep(0.2) - - reader.join(timeout=5) - return {"output": "".join(_output_chunks), "returncode": proc.returncode} - - def cleanup(self): super().cleanup() if self.control_socket.exists(): try: