diff --git a/tests/tools/test_file_sync.py b/tests/tools/test_file_sync.py new file mode 100644 index 00000000000..283b192e0b1 --- /dev/null +++ b/tests/tools/test_file_sync.py @@ -0,0 +1,257 @@ +"""Tests for FileSyncManager — mtime tracking, deletion detection, transactional rollback.""" + +import os +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from tools.environments.file_sync import FileSyncManager, _FORCE_SYNC_ENV + + +@pytest.fixture +def tmp_files(tmp_path): + """Create a few temp files to use as sync sources.""" + files = {} + for name in ("cred_a.json", "cred_b.json", "skill_main.py"): + p = tmp_path / name + p.write_text(f"content of {name}") + files[name] = str(p) + return files + + +def _make_get_files(tmp_files, remote_base="/root/.hermes"): + """Return a get_files_fn that maps local files to remote paths.""" + mapping = [(hp, f"{remote_base}/{name}") for name, hp in tmp_files.items()] + + def get_files(): + return [(hp, rp) for hp, rp in mapping if Path(hp).exists()] + + return get_files + + +def _make_manager(tmp_files, remote_base="/root/.hermes", upload=None, delete=None): + """Create a FileSyncManager with test callbacks.""" + return FileSyncManager( + get_files_fn=_make_get_files(tmp_files, remote_base), + upload_fn=upload or MagicMock(), + delete_fn=delete or MagicMock(), + ) + + +class TestMtimeSkip: + def test_unchanged_files_not_re_uploaded(self, tmp_files): + upload = MagicMock() + mgr = _make_manager(tmp_files, upload=upload) + + mgr.sync(force=True) + assert upload.call_count == 3 + + upload.reset_mock() + mgr.sync(force=True) + assert upload.call_count == 0, "unchanged files should not be re-uploaded" + + def test_changed_file_re_uploaded(self, tmp_files): + upload = MagicMock() + mgr = _make_manager(tmp_files, upload=upload) + + mgr.sync(force=True) + upload.reset_mock() + + # Touch one file + time.sleep(0.05) + Path(tmp_files["cred_a.json"]).write_text("updated content") + + mgr.sync(force=True) + assert upload.call_count == 1 + assert tmp_files["cred_a.json"] in upload.call_args[0][0] + + def test_new_file_detected(self, tmp_files, tmp_path): + upload = MagicMock() + mgr = FileSyncManager( + get_files_fn=_make_get_files(tmp_files), + upload_fn=upload, + delete_fn=MagicMock(), + ) + + mgr.sync(force=True) + assert upload.call_count == 3 + + # Add a new file + new_file = tmp_path / "new_skill.py" + new_file.write_text("new content") + tmp_files["new_skill.py"] = str(new_file) + # Recreate manager with updated file list + mgr._get_files_fn = _make_get_files(tmp_files) + + upload.reset_mock() + mgr.sync(force=True) + assert upload.call_count == 1 + + +class TestDeletion: + def test_removed_file_triggers_delete(self, tmp_files): + upload = MagicMock() + delete = MagicMock() + mgr = _make_manager(tmp_files, upload=upload, delete=delete) + + mgr.sync(force=True) + delete.assert_not_called() + + # Remove a file locally + os.unlink(tmp_files["cred_b.json"]) + del tmp_files["cred_b.json"] + mgr._get_files_fn = _make_get_files(tmp_files) + + mgr.sync(force=True) + delete.assert_called_once() + deleted_paths = delete.call_args[0][0] + assert any("cred_b.json" in p for p in deleted_paths) + + def test_no_delete_when_no_removals(self, tmp_files): + delete = MagicMock() + mgr = _make_manager(tmp_files, delete=delete) + + mgr.sync(force=True) + mgr.sync(force=True) + delete.assert_not_called() + + +class TestTransactionalRollback: + def test_upload_failure_rolls_back(self, tmp_files): + call_count = 0 + + def failing_upload(host_path, remote_path): + nonlocal call_count + call_count += 1 + if call_count == 2: + raise RuntimeError("upload failed") + + mgr = _make_manager(tmp_files, upload=failing_upload) + + # First sync fails (swallowed, logged, state rolled back) + mgr.sync(force=True) + + # State should be empty (rolled back) — next sync retries all files + good_upload = MagicMock() + mgr._upload_fn = good_upload + mgr.sync(force=True) + assert good_upload.call_count == 3, "all files should be retried after rollback" + + def test_delete_failure_rolls_back(self, tmp_files): + upload = MagicMock() + mgr = _make_manager(tmp_files, upload=upload) + + # Initial sync + mgr.sync(force=True) + + # Remove a file + os.unlink(tmp_files["skill_main.py"]) + del tmp_files["skill_main.py"] + mgr._get_files_fn = _make_get_files(tmp_files) + + # Delete fails (swallowed, state rolled back) + mgr._delete_fn = MagicMock(side_effect=RuntimeError("delete failed")) + mgr.sync(force=True) + + # Next sync should retry the delete + good_delete = MagicMock() + mgr._delete_fn = good_delete + upload.reset_mock() + mgr.sync(force=True) + good_delete.assert_called_once() + + +class TestRateLimiting: + def test_sync_skipped_within_interval(self, tmp_files): + upload = MagicMock() + mgr = FileSyncManager( + get_files_fn=_make_get_files(tmp_files), + upload_fn=upload, + delete_fn=MagicMock(), + sync_interval=10.0, + ) + + mgr.sync(force=True) + assert upload.call_count == 3 + + upload.reset_mock() + # Without force, should skip due to rate limit + mgr.sync() + assert upload.call_count == 0 + + def test_force_bypasses_rate_limit(self, tmp_files, tmp_path): + upload = MagicMock() + mgr = FileSyncManager( + get_files_fn=_make_get_files(tmp_files), + upload_fn=upload, + delete_fn=MagicMock(), + sync_interval=10.0, + ) + + mgr.sync(force=True) + upload.reset_mock() + + # Add a new file and force sync + new_file = tmp_path / "forced.txt" + new_file.write_text("forced") + tmp_files["forced.txt"] = str(new_file) + mgr._get_files_fn = _make_get_files(tmp_files) + + mgr.sync(force=True) + assert upload.call_count == 1 + + def test_env_var_forces_sync(self, tmp_files, tmp_path): + upload = MagicMock() + mgr = FileSyncManager( + get_files_fn=_make_get_files(tmp_files), + upload_fn=upload, + delete_fn=MagicMock(), + sync_interval=10.0, + ) + + mgr.sync(force=True) + upload.reset_mock() + + new_file = tmp_path / "env_forced.txt" + new_file.write_text("env forced") + tmp_files["env_forced.txt"] = str(new_file) + mgr._get_files_fn = _make_get_files(tmp_files) + + with patch.dict(os.environ, {_FORCE_SYNC_ENV: "1"}): + mgr.sync() + assert upload.call_count == 1 + + +class TestEdgeCases: + def test_empty_file_list(self): + upload = MagicMock() + delete = MagicMock() + mgr = FileSyncManager( + get_files_fn=lambda: [], + upload_fn=upload, + delete_fn=delete, + ) + + mgr.sync(force=True) + upload.assert_not_called() + delete.assert_not_called() + + def test_file_disappears_between_list_and_upload(self, tmp_path): + """File listed by get_files but deleted before _file_mtime_key reads it.""" + f = tmp_path / "ephemeral.txt" + f.write_text("here now") + + upload = MagicMock() + mgr = FileSyncManager( + get_files_fn=lambda: [(str(f), "/root/.hermes/ephemeral.txt")], + upload_fn=upload, + delete_fn=MagicMock(), + ) + + # Delete the file before sync can stat it + os.unlink(str(f)) + + mgr.sync(force=True) + upload.assert_not_called() # _file_mtime_key returns None, skipped diff --git a/tools/environments/base.py b/tools/environments/base.py index d2963e4acc1..42d4bdc9970 100644 --- a/tools/environments/base.py +++ b/tools/environments/base.py @@ -43,8 +43,6 @@ def get_sandbox_dir() -> Path: # Shared constants and utilities # --------------------------------------------------------------------------- -_SYNC_INTERVAL_SECONDS = 5.0 - def _pipe_stdin(proc: subprocess.Popen, data: str) -> None: """Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks.""" @@ -246,9 +244,6 @@ class BaseEnvironment(ABC): self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt" self._cwd_marker = _cwd_marker(self._session_id) self._snapshot_ready = False - self._last_sync_time: float | None = ( - None # set to 0 by backends that need file sync - ) # ------------------------------------------------------------------ # Abstract methods @@ -477,22 +472,14 @@ class BaseEnvironment(ABC): # Hooks # ------------------------------------------------------------------ - def _before_execute(self): - """Rate-limited file sync before each command. + def _before_execute(self) -> None: + """Hook called before each command execution. - Backends that need pre-command sync set ``self._last_sync_time = 0`` - in ``__init__`` and override :meth:`_sync_files`. Backends needing - extra pre-exec logic (e.g. Daytona sandbox restart check) override - this method and call ``super()._before_execute()``. + Remote backends (SSH, Modal, Daytona) override this to trigger + their FileSyncManager. Bind-mount backends (Docker, Singularity) + and Local don't need file sync — the host filesystem is directly + visible inside the container/process. """ - if self._last_sync_time is not None: - now = time.monotonic() - if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS: - self._sync_files() - self._last_sync_time = now - - def _sync_files(self): - """Push files to remote environment. Called rate-limited by _before_execute.""" pass # ------------------------------------------------------------------ diff --git a/tools/environments/daytona.py b/tools/environments/daytona.py index 60958fd353e..1a84ce0aa4e 100644 --- a/tools/environments/daytona.py +++ b/tools/environments/daytona.py @@ -11,13 +11,12 @@ import shlex import threading import warnings from pathlib import Path -from typing import Dict, Optional from tools.environments.base import ( BaseEnvironment, _ThreadedProcessHandle, - _file_mtime_key, ) +from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command logger = logging.getLogger(__name__) @@ -61,7 +60,6 @@ class DaytonaEnvironment(BaseEnvironment): self._daytona = Daytona() self._sandbox = None self._lock = threading.Lock() - self._last_sync_time: float = 0 memory_gib = max(1, math.ceil(memory / 1024)) disk_gib = max(1, math.ceil(disk / 1024)) @@ -128,50 +126,40 @@ class DaytonaEnvironment(BaseEnvironment): pass logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd) - self._synced_files: Dict[str, tuple] = {} - self._sync_files() + self._sync_manager = FileSyncManager( + get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"), + upload_fn=self._daytona_upload, + delete_fn=self._daytona_delete, + ) + self._sync_manager.sync(force=True) self.init_session() - def _upload_if_changed(self, host_path: str, remote_path: str) -> bool: - file_key = _file_mtime_key(host_path) - if file_key is None: - return False - if self._synced_files.get(remote_path) == file_key: - return False - try: - parent = str(Path(remote_path).parent) - self._sandbox.process.exec(f"mkdir -p {parent}") - self._sandbox.fs.upload_file(host_path, remote_path) - self._synced_files[remote_path] = file_key - return True - except Exception as e: - logger.debug("Daytona: upload failed %s: %s", host_path, e) - return False + def _daytona_upload(self, host_path: str, remote_path: str) -> None: + """Upload a single file via Daytona SDK.""" + parent = str(Path(remote_path).parent) + self._sandbox.process.exec(f"mkdir -p {parent}") + self._sandbox.fs.upload_file(host_path, remote_path) - def _sync_files(self) -> None: - container_base = f"{self._remote_home}/.hermes" - try: - from tools.credential_files import get_credential_file_mounts, iter_skills_files - for mount_entry in get_credential_file_mounts(): - remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1) - self._upload_if_changed(mount_entry["host_path"], remote_path) - for entry in iter_skills_files(container_base=container_base): - self._upload_if_changed(entry["host_path"], entry["container_path"]) - except Exception as e: - logger.debug("Daytona: could not sync skills/credentials: %s", e) + def _daytona_delete(self, remote_paths: list[str]) -> None: + """Batch-delete remote files via SDK exec.""" + self._sandbox.process.exec(quoted_rm_command(remote_paths)) - def _ensure_sandbox_ready(self): + # ------------------------------------------------------------------ + # Sandbox lifecycle + # ------------------------------------------------------------------ + + def _ensure_sandbox_ready(self) -> None: """Restart sandbox if it was stopped (e.g., by a previous interrupt).""" self._sandbox.refresh_data() if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED): self._sandbox.start() logger.info("Daytona: restarted sandbox %s", self._sandbox.id) - def _before_execute(self): - """Ensure sandbox is ready, then rate-limited file sync via base class.""" + def _before_execute(self) -> None: + """Ensure sandbox is ready, then sync files via FileSyncManager.""" with self._lock: self._ensure_sandbox_ready() - super()._before_execute() + self._sync_manager.sync() def _run_bash(self, cmd_string: str, *, login: bool = False, timeout: int = 120, diff --git a/tools/environments/file_sync.py b/tools/environments/file_sync.py new file mode 100644 index 00000000000..fb5559a93ad --- /dev/null +++ b/tools/environments/file_sync.py @@ -0,0 +1,150 @@ +"""Shared file sync manager for remote execution backends. + +Tracks local file changes via mtime+size, detects deletions, and +syncs to remote environments transactionally. Used by SSH, Modal, +and Daytona. Docker and Singularity use bind mounts (live host FS +view) and don't need this. +""" + +import logging +import os +import shlex +import time +from typing import Callable + +from tools.environments.base import _file_mtime_key + +logger = logging.getLogger(__name__) + +_SYNC_INTERVAL_SECONDS = 5.0 +_FORCE_SYNC_ENV = "HERMES_FORCE_FILE_SYNC" + +# Transport callbacks provided by each backend +UploadFn = Callable[[str, str], None] # (host_path, remote_path) -> raises on failure +DeleteFn = Callable[[list[str]], None] # (remote_paths) -> raises on failure +GetFilesFn = Callable[[], list[tuple[str, str]]] # () -> [(host_path, remote_path), ...] + + +def iter_sync_files(container_base: str = "/root/.hermes") -> list[tuple[str, str]]: + """Enumerate all files that should be synced to a remote environment. + + Combines credentials, skills, and cache into a single flat list of + (host_path, remote_path) pairs. Credential paths are remapped from + the hardcoded /root/.hermes to *container_base* because the remote + user's home may differ (e.g. /home/daytona, /home/user). + """ + # Late import: credential_files imports agent modules that create + # circular dependencies if loaded at file_sync module level. + from tools.credential_files import ( + get_credential_file_mounts, + iter_cache_files, + iter_skills_files, + ) + + files: list[tuple[str, str]] = [] + for entry in get_credential_file_mounts(): + remote = entry["container_path"].replace( + "/root/.hermes", container_base, 1 + ) + files.append((entry["host_path"], remote)) + for entry in iter_skills_files(container_base=container_base): + files.append((entry["host_path"], entry["container_path"])) + for entry in iter_cache_files(container_base=container_base): + files.append((entry["host_path"], entry["container_path"])) + return files + + +def quoted_rm_command(remote_paths: list[str]) -> str: + """Build a shell ``rm -f`` command for a batch of remote paths.""" + return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths) + + +class FileSyncManager: + """Tracks local file changes and syncs to a remote environment. + + Backends instantiate this with transport callbacks (upload, delete) + and a file-source callable. The manager handles mtime-based change + detection, deletion tracking, rate limiting, and transactional state. + + Not used by bind-mount backends (Docker, Singularity) — those get + live host FS views and don't need file sync. + """ + + def __init__( + self, + get_files_fn: GetFilesFn, + upload_fn: UploadFn, + delete_fn: DeleteFn, + sync_interval: float = _SYNC_INTERVAL_SECONDS, + ): + self._get_files_fn = get_files_fn + self._upload_fn = upload_fn + self._delete_fn = delete_fn + self._synced_files: dict[str, tuple[float, int]] = {} # remote_path -> (mtime, size) + self._last_sync_time: float = 0.0 # monotonic; 0 ensures first sync runs + self._sync_interval = sync_interval + + def sync(self, *, force: bool = False) -> None: + """Run a sync cycle: upload changed files, delete removed files. + + Rate-limited to once per ``sync_interval`` unless *force* is True + or ``HERMES_FORCE_FILE_SYNC=1`` is set. + + Transactional: state only committed if ALL operations succeed. + On failure, state rolls back so the next cycle retries everything. + """ + if not force and not os.environ.get(_FORCE_SYNC_ENV): + now = time.monotonic() + if now - self._last_sync_time < self._sync_interval: + return + + current_files = self._get_files_fn() + current_remote_paths = {remote for _, remote in current_files} + + # --- Uploads: new or changed files --- + to_upload: list[tuple[str, str]] = [] + new_files = dict(self._synced_files) + for host_path, remote_path in current_files: + file_key = _file_mtime_key(host_path) + if file_key is None: + continue + if self._synced_files.get(remote_path) == file_key: + continue + to_upload.append((host_path, remote_path)) + new_files[remote_path] = file_key + + # --- Deletes: synced paths no longer in current set --- + to_delete = [p for p in self._synced_files if p not in current_remote_paths] + + if not to_upload and not to_delete: + self._last_sync_time = time.monotonic() + return + + # Snapshot for rollback (only when there's work to do) + prev_files = dict(self._synced_files) + + if to_upload: + logger.debug("file_sync: uploading %d file(s)", len(to_upload)) + if to_delete: + logger.debug("file_sync: deleting %d stale remote file(s)", len(to_delete)) + + try: + for host_path, remote_path in to_upload: + self._upload_fn(host_path, remote_path) + logger.debug("file_sync: uploaded %s -> %s", host_path, remote_path) + + if to_delete: + self._delete_fn(to_delete) + logger.debug("file_sync: deleted %s", to_delete) + + # --- Commit (all succeeded) --- + for p in to_delete: + new_files.pop(p, None) + + self._synced_files = new_files + self._last_sync_time = time.monotonic() + + except Exception as exc: + self._synced_files = prev_files + self._last_sync_time = time.monotonic() + logger.warning("file_sync: sync failed, rolled back state: %s", exc) diff --git a/tools/environments/modal.py b/tools/environments/modal.py index 1cb8e47969e..c002c7333b2 100644 --- a/tools/environments/modal.py +++ b/tools/environments/modal.py @@ -9,16 +9,16 @@ import logging import shlex import threading from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any, Optional from hermes_constants import get_hermes_home from tools.environments.base import ( BaseEnvironment, _ThreadedProcessHandle, - _file_mtime_key, _load_json_store, _save_json_store, ) +from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command logger = logging.getLogger(__name__) @@ -150,7 +150,7 @@ class ModalEnvironment(BaseEnvironment): image: str, cwd: str = "/root", timeout: int = 60, - modal_sandbox_kwargs: Optional[Dict[str, Any]] = None, + modal_sandbox_kwargs: Optional[dict[str, Any]] = None, persistent_filesystem: bool = True, task_id: str = "default", ): @@ -162,8 +162,7 @@ class ModalEnvironment(BaseEnvironment): self._sandbox = None self._app = None self._worker = _AsyncWorker() - self._synced_files: Dict[str, tuple] = {} - self._last_sync_time: float = 0 + self._sync_manager: FileSyncManager | None = None # initialized after sandbox creation sandbox_kwargs = dict(modal_sandbox_kwargs or {}) @@ -256,26 +255,24 @@ class ModalEnvironment(BaseEnvironment): raise logger.info("Modal: sandbox created (task=%s)", self._task_id) + + self._sync_manager = FileSyncManager( + get_files_fn=lambda: iter_sync_files("/root/.hermes"), + upload_fn=self._modal_upload, + delete_fn=self._modal_delete, + ) + self._sync_manager.sync(force=True) self.init_session() - def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool: - """Push a single file into the sandbox if changed.""" - file_key = _file_mtime_key(host_path) - if file_key is None: - return False - if self._synced_files.get(container_path) == file_key: - return False - try: - content = Path(host_path).read_bytes() - except Exception: - return False - + def _modal_upload(self, host_path: str, remote_path: str) -> None: + """Upload a single file via base64-over-exec.""" import base64 + content = Path(host_path).read_bytes() b64 = base64.b64encode(content).decode("ascii") - container_dir = str(Path(container_path).parent) + container_dir = str(Path(remote_path).parent) cmd = ( f"mkdir -p {shlex.quote(container_dir)} && " - f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(container_path)}" + f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}" ) async def _write(): @@ -283,25 +280,24 @@ class ModalEnvironment(BaseEnvironment): await proc.wait.aio() self._worker.run_coroutine(_write(), timeout=15) - self._synced_files[container_path] = file_key - return True - def _sync_files(self) -> None: - """Push credential, skill, and cache files into the running sandbox.""" - try: - from tools.credential_files import ( - get_credential_file_mounts, - iter_skills_files, - iter_cache_files, - ) - for entry in get_credential_file_mounts(): - self._push_file_to_sandbox(entry["host_path"], entry["container_path"]) - for entry in iter_skills_files(): - self._push_file_to_sandbox(entry["host_path"], entry["container_path"]) - for entry in iter_cache_files(): - self._push_file_to_sandbox(entry["host_path"], entry["container_path"]) - except Exception as e: - logger.debug("Modal: file sync failed: %s", e) + def _modal_delete(self, remote_paths: list[str]) -> None: + """Batch-delete remote files via exec.""" + rm_cmd = quoted_rm_command(remote_paths) + + async def _rm(): + proc = await self._sandbox.exec.aio("bash", "-c", rm_cmd) + await proc.wait.aio() + + self._worker.run_coroutine(_rm(), timeout=15) + + def _before_execute(self) -> None: + """Sync files to sandbox via FileSyncManager (rate-limited internally).""" + self._sync_manager.sync() + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ def _run_bash(self, cmd_string: str, *, login: bool = False, timeout: int = 120, diff --git a/tools/environments/ssh.py b/tools/environments/ssh.py index a77eb5c9f40..8cb1b0c570f 100644 --- a/tools/environments/ssh.py +++ b/tools/environments/ssh.py @@ -8,6 +8,7 @@ import tempfile from pathlib import Path from tools.environments.base import BaseEnvironment, _popen_bash +from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command logger = logging.getLogger(__name__) @@ -43,8 +44,14 @@ class SSHEnvironment(BaseEnvironment): _ensure_ssh_available() self._establish_connection() self._remote_home = self._detect_remote_home() - self._last_sync_time: float = 0 # guarantees first _before_execute syncs - self._sync_files() + + self._ensure_remote_dirs() + self._sync_manager = FileSyncManager( + get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"), + upload_fn=self._scp_upload, + delete_fn=self._ssh_delete, + ) + self._sync_manager.sync(force=True) self.init_session() @@ -92,50 +99,53 @@ class SSHEnvironment(BaseEnvironment): return "/root" return f"/home/{self.user}" - def _sync_files(self) -> None: - """Rsync skills directory and credential files to the remote host.""" - try: - container_base = f"{self._remote_home}/.hermes" - from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount + # ------------------------------------------------------------------ + # File sync (via FileSyncManager) + # ------------------------------------------------------------------ - rsync_base = ["rsync", "-az", "--timeout=30", "--safe-links"] - ssh_opts = f"ssh -o ControlPath={self.control_socket} -o ControlMaster=auto" - if self.port != 22: - ssh_opts += f" -p {self.port}" - if self.key_path: - ssh_opts += f" -i {self.key_path}" - rsync_base.extend(["-e", ssh_opts]) - dest_prefix = f"{self.user}@{self.host}" + def _ensure_remote_dirs(self) -> None: + """Create base ~/.hermes directory tree on remote in one SSH call.""" + base = f"{self._remote_home}/.hermes" + dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"] + mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs) + cmd = self._build_ssh_command() + cmd.append(mkdir_cmd) + subprocess.run(cmd, capture_output=True, text=True, timeout=10) - for mount_entry in get_credential_file_mounts(): - remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1) - parent_dir = str(Path(remote_path).parent) - mkdir_cmd = self._build_ssh_command() - mkdir_cmd.append(f"mkdir -p {parent_dir}") - subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) - cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - if result.returncode == 0: - logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path) - else: - logger.debug("SSH: rsync credential failed: %s", result.stderr.strip()) + # _get_sync_files provided via iter_sync_files in FileSyncManager init - for skills_mount in get_skills_directory_mount(container_base=container_base): - remote_path = skills_mount["container_path"] - mkdir_cmd = self._build_ssh_command() - mkdir_cmd.append(f"mkdir -p {remote_path}") - subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) - cmd = rsync_base + [ - skills_mount["host_path"].rstrip("/") + "/", - f"{dest_prefix}:{remote_path}/", - ] - result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) - if result.returncode == 0: - logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path) - else: - logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip()) - except Exception as e: - logger.debug("SSH: could not sync skills/credentials: %s", e) + def _scp_upload(self, host_path: str, remote_path: str) -> None: + """Upload a single file via scp over ControlMaster.""" + parent = str(Path(remote_path).parent) + mkdir_cmd = self._build_ssh_command() + mkdir_cmd.append(f"mkdir -p {shlex.quote(parent)}") + subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) + + scp_cmd = ["scp", "-o", f"ControlPath={self.control_socket}"] + if self.port != 22: + scp_cmd.extend(["-P", str(self.port)]) + if self.key_path: + scp_cmd.extend(["-i", self.key_path]) + scp_cmd.extend([host_path, f"{self.user}@{self.host}:{remote_path}"]) + result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=30) + if result.returncode != 0: + raise RuntimeError(f"scp failed: {result.stderr.strip()}") + + def _ssh_delete(self, remote_paths: list[str]) -> None: + """Batch-delete remote files in one SSH call.""" + cmd = self._build_ssh_command() + cmd.append(quoted_rm_command(remote_paths)) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if result.returncode != 0: + raise RuntimeError(f"remote rm failed: {result.stderr.strip()}") + + def _before_execute(self) -> None: + """Sync files to remote via FileSyncManager (rate-limited internally).""" + self._sync_manager.sync() + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ def _run_bash(self, cmd_string: str, *, login: bool = False, timeout: int = 120,