Compare commits

...

3 Commits

Author SHA1 Message Date
alt-glitch
7d91228c2a fix(tests): update mocks for file sync changes
- Modal snapshot tests: accept **kw in iter_skills_files/iter_cache_files
  mock lambdas to match new container_base kwarg
- SSH preflight test: mock _detect_remote_home, _ensure_remote_dirs,
  init_session, and FileSyncManager added in file sync PR
2026-04-10 02:53:51 -07:00
alt-glitch
6eba279556 test: add reproducible perf benchmark for file sync overhead
Direct env.execute() timing — no LLM in the loop.
Measures per-command wall-clock including sync check.

Results on SSH:
- echo median: 617ms (pure SSH round-trip + spawn overhead)
- sync-triggered after 6s wait: 621ms (mtime skip adds ~0ms)
- within-interval (no sync): 618ms

Confirms mtime skip makes sync overhead unmeasurable.
2026-04-10 02:53:51 -07:00
alt-glitch
517ea7ed45 feat(environments): unified file sync with change tracking and deletion
Replace per-backend ad-hoc file sync with a shared FileSyncManager
that handles mtime-based change detection, remote deletion of
locally-removed files, and transactional state updates.

- New FileSyncManager class (tools/environments/file_sync.py)
  with callbacks for upload/delete, rate limiting, and rollback
- Shared iter_sync_files() eliminates 3 duplicate implementations
- SSH: replace unconditional rsync with scp + mtime skip
- Modal/Daytona: replace inline _synced_files dict with manager
- All 3 backends now sync credentials + skills + cache uniformly
- Remote deletion: files removed locally are cleaned from remote
- HERMES_FORCE_FILE_SYNC=1 env var for debugging
- Base class _before_execute() simplified to empty hook
- 12 unit tests covering mtime skip, deletion, rollback, rate limiting
2026-04-10 02:53:51 -07:00
9 changed files with 655 additions and 136 deletions

View File

@@ -0,0 +1,257 @@
"""Tests for FileSyncManager — mtime tracking, deletion detection, transactional rollback."""
import os
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from tools.environments.file_sync import FileSyncManager, _FORCE_SYNC_ENV
@pytest.fixture
def tmp_files(tmp_path):
"""Create a few temp files to use as sync sources."""
files = {}
for name in ("cred_a.json", "cred_b.json", "skill_main.py"):
p = tmp_path / name
p.write_text(f"content of {name}")
files[name] = str(p)
return files
def _make_get_files(tmp_files, remote_base="/root/.hermes"):
"""Return a get_files_fn that maps local files to remote paths."""
mapping = [(hp, f"{remote_base}/{name}") for name, hp in tmp_files.items()]
def get_files():
return [(hp, rp) for hp, rp in mapping if Path(hp).exists()]
return get_files
def _make_manager(tmp_files, remote_base="/root/.hermes", upload=None, delete=None):
"""Create a FileSyncManager with test callbacks."""
return FileSyncManager(
get_files_fn=_make_get_files(tmp_files, remote_base),
upload_fn=upload or MagicMock(),
delete_fn=delete or MagicMock(),
)
class TestMtimeSkip:
def test_unchanged_files_not_re_uploaded(self, tmp_files):
upload = MagicMock()
mgr = _make_manager(tmp_files, upload=upload)
mgr.sync(force=True)
assert upload.call_count == 3
upload.reset_mock()
mgr.sync(force=True)
assert upload.call_count == 0, "unchanged files should not be re-uploaded"
def test_changed_file_re_uploaded(self, tmp_files):
upload = MagicMock()
mgr = _make_manager(tmp_files, upload=upload)
mgr.sync(force=True)
upload.reset_mock()
# Touch one file
time.sleep(0.05)
Path(tmp_files["cred_a.json"]).write_text("updated content")
mgr.sync(force=True)
assert upload.call_count == 1
assert tmp_files["cred_a.json"] in upload.call_args[0][0]
def test_new_file_detected(self, tmp_files, tmp_path):
upload = MagicMock()
mgr = FileSyncManager(
get_files_fn=_make_get_files(tmp_files),
upload_fn=upload,
delete_fn=MagicMock(),
)
mgr.sync(force=True)
assert upload.call_count == 3
# Add a new file
new_file = tmp_path / "new_skill.py"
new_file.write_text("new content")
tmp_files["new_skill.py"] = str(new_file)
# Recreate manager with updated file list
mgr._get_files_fn = _make_get_files(tmp_files)
upload.reset_mock()
mgr.sync(force=True)
assert upload.call_count == 1
class TestDeletion:
def test_removed_file_triggers_delete(self, tmp_files):
upload = MagicMock()
delete = MagicMock()
mgr = _make_manager(tmp_files, upload=upload, delete=delete)
mgr.sync(force=True)
delete.assert_not_called()
# Remove a file locally
os.unlink(tmp_files["cred_b.json"])
del tmp_files["cred_b.json"]
mgr._get_files_fn = _make_get_files(tmp_files)
mgr.sync(force=True)
delete.assert_called_once()
deleted_paths = delete.call_args[0][0]
assert any("cred_b.json" in p for p in deleted_paths)
def test_no_delete_when_no_removals(self, tmp_files):
delete = MagicMock()
mgr = _make_manager(tmp_files, delete=delete)
mgr.sync(force=True)
mgr.sync(force=True)
delete.assert_not_called()
class TestTransactionalRollback:
def test_upload_failure_rolls_back(self, tmp_files):
call_count = 0
def failing_upload(host_path, remote_path):
nonlocal call_count
call_count += 1
if call_count == 2:
raise RuntimeError("upload failed")
mgr = _make_manager(tmp_files, upload=failing_upload)
# First sync fails (swallowed, logged, state rolled back)
mgr.sync(force=True)
# State should be empty (rolled back) — next sync retries all files
good_upload = MagicMock()
mgr._upload_fn = good_upload
mgr.sync(force=True)
assert good_upload.call_count == 3, "all files should be retried after rollback"
def test_delete_failure_rolls_back(self, tmp_files):
upload = MagicMock()
mgr = _make_manager(tmp_files, upload=upload)
# Initial sync
mgr.sync(force=True)
# Remove a file
os.unlink(tmp_files["skill_main.py"])
del tmp_files["skill_main.py"]
mgr._get_files_fn = _make_get_files(tmp_files)
# Delete fails (swallowed, state rolled back)
mgr._delete_fn = MagicMock(side_effect=RuntimeError("delete failed"))
mgr.sync(force=True)
# Next sync should retry the delete
good_delete = MagicMock()
mgr._delete_fn = good_delete
upload.reset_mock()
mgr.sync(force=True)
good_delete.assert_called_once()
class TestRateLimiting:
def test_sync_skipped_within_interval(self, tmp_files):
upload = MagicMock()
mgr = FileSyncManager(
get_files_fn=_make_get_files(tmp_files),
upload_fn=upload,
delete_fn=MagicMock(),
sync_interval=10.0,
)
mgr.sync(force=True)
assert upload.call_count == 3
upload.reset_mock()
# Without force, should skip due to rate limit
mgr.sync()
assert upload.call_count == 0
def test_force_bypasses_rate_limit(self, tmp_files, tmp_path):
upload = MagicMock()
mgr = FileSyncManager(
get_files_fn=_make_get_files(tmp_files),
upload_fn=upload,
delete_fn=MagicMock(),
sync_interval=10.0,
)
mgr.sync(force=True)
upload.reset_mock()
# Add a new file and force sync
new_file = tmp_path / "forced.txt"
new_file.write_text("forced")
tmp_files["forced.txt"] = str(new_file)
mgr._get_files_fn = _make_get_files(tmp_files)
mgr.sync(force=True)
assert upload.call_count == 1
def test_env_var_forces_sync(self, tmp_files, tmp_path):
upload = MagicMock()
mgr = FileSyncManager(
get_files_fn=_make_get_files(tmp_files),
upload_fn=upload,
delete_fn=MagicMock(),
sync_interval=10.0,
)
mgr.sync(force=True)
upload.reset_mock()
new_file = tmp_path / "env_forced.txt"
new_file.write_text("env forced")
tmp_files["env_forced.txt"] = str(new_file)
mgr._get_files_fn = _make_get_files(tmp_files)
with patch.dict(os.environ, {_FORCE_SYNC_ENV: "1"}):
mgr.sync()
assert upload.call_count == 1
class TestEdgeCases:
def test_empty_file_list(self):
upload = MagicMock()
delete = MagicMock()
mgr = FileSyncManager(
get_files_fn=lambda: [],
upload_fn=upload,
delete_fn=delete,
)
mgr.sync(force=True)
upload.assert_not_called()
delete.assert_not_called()
def test_file_disappears_between_list_and_upload(self, tmp_path):
"""File listed by get_files but deleted before _file_mtime_key reads it."""
f = tmp_path / "ephemeral.txt"
f.write_text("here now")
upload = MagicMock()
mgr = FileSyncManager(
get_files_fn=lambda: [(str(f), "/root/.hermes/ephemeral.txt")],
upload_fn=upload,
delete_fn=MagicMock(),
)
# Delete the file before sync can stat it
os.unlink(str(f))
mgr.sync(force=True)
upload.assert_not_called() # _file_mtime_key returns None, skipped

View File

@@ -0,0 +1,127 @@
"""Reproducible perf benchmark for file sync overhead.
Measures actual env.execute() wall-clock time, no LLM in the loop.
Run with: uv run pytest tests/tools/test_file_sync_perf.py -v -o "addopts=" -s
Requires backends to be configured (SSH host, Modal creds, etc).
Skip markers gate each backend.
"""
import statistics
import time
import pytest
# ---------------------------------------------------------------------------
# Backend fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def local_env():
from tools.environments.local import LocalEnvironment
env = LocalEnvironment(cwd="/tmp", timeout=30)
yield env
env.cleanup()
@pytest.fixture
def ssh_env():
import os
host = os.environ.get("TERMINAL_SSH_HOST")
user = os.environ.get("TERMINAL_SSH_USER")
if not host or not user:
pytest.skip("TERMINAL_SSH_HOST and TERMINAL_SSH_USER required")
from tools.environments.ssh import SSHEnvironment
env = SSHEnvironment(host=host, user=user, cwd="/tmp", timeout=30)
yield env
env.cleanup()
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _time_executions(env, command: str, n: int = 10) -> list[float]:
"""Run *command* n times and return per-call wall-clock durations."""
durations = []
for _ in range(n):
t0 = time.monotonic()
result = env.execute(command, timeout=10)
elapsed = time.monotonic() - t0
durations.append(elapsed)
assert result.get("returncode", result.get("exit_code", -1)) == 0, \
f"command failed: {result}"
return durations
def _report(label: str, durations: list[float]):
"""Print timing stats."""
med = statistics.median(durations)
mean = statistics.mean(durations)
p95 = sorted(durations)[int(len(durations) * 0.95)]
print(f"\n {label}:")
print(f" n={len(durations)} median={med*1000:.0f}ms mean={mean*1000:.0f}ms p95={p95*1000:.0f}ms")
print(f" raw: {[f'{d*1000:.0f}ms' for d in durations]}")
return med
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestLocalPerf:
"""Local baseline — no file sync, no network. Sets the floor."""
def test_echo_latency(self, local_env):
durations = _time_executions(local_env, "echo hello", n=20)
med = _report("local echo", durations)
# Spawn-per-call overhead should be < 500ms
assert med < 0.5, f"local echo median {med*1000:.0f}ms exceeds 500ms"
@pytest.mark.ssh
class TestSSHPerf:
"""SSH with FileSyncManager — mtime skip should make sync ~0ms."""
def test_echo_latency(self, ssh_env):
"""Sequential echo commands — measures per-command overhead including sync check."""
durations = _time_executions(ssh_env, "echo hello", n=20)
med = _report("ssh echo (with sync check)", durations)
# SSH round-trip + spawn-per-call, but sync should be ~0ms (rate limited)
assert med < 2.0, f"ssh echo median {med*1000:.0f}ms exceeds 2000ms"
def test_sync_overhead_after_interval(self, ssh_env):
"""Measure sync cost when the rate-limit window has expired.
Sleep past the 5s interval, then time the next command which
triggers a real sync cycle (but with mtime skip, should be fast).
"""
# Warm up
ssh_env.execute("echo warmup", timeout=10)
# Wait for sync interval to expire
time.sleep(6)
# This command will trigger a real sync cycle
t0 = time.monotonic()
result = ssh_env.execute("echo after-interval", timeout=10)
elapsed = time.monotonic() - t0
print(f"\n ssh echo after 6s wait (sync triggered): {elapsed*1000:.0f}ms")
assert result.get("returncode", result.get("exit_code", -1)) == 0
# Even with sync triggered, mtime skip should keep it fast
# Old rsync approach: ~2-3s. New mtime skip: should be < 1.5s
assert elapsed < 1.5, f"sync-triggered command took {elapsed*1000:.0f}ms (expected < 1500ms)"
def test_no_sync_within_interval(self, ssh_env):
"""Rapid sequential commands within 5s window — no sync at all."""
# First command triggers sync
ssh_env.execute("echo prime", timeout=10)
# Immediately run 10 more — all within rate-limit window
durations = _time_executions(ssh_env, "echo rapid", n=10)
med = _report("ssh echo (within interval, no sync)", durations)
# Should be pure SSH overhead, no sync
assert med < 1.5, f"within-interval median {med*1000:.0f}ms exceeds 1500ms"

View File

@@ -124,8 +124,8 @@ def _install_modal_test_modules(
sys.modules["tools.interrupt"] = types.SimpleNamespace(is_interrupted=lambda: False) sys.modules["tools.interrupt"] = types.SimpleNamespace(is_interrupted=lambda: False)
sys.modules["tools.credential_files"] = types.SimpleNamespace( sys.modules["tools.credential_files"] = types.SimpleNamespace(
get_credential_file_mounts=lambda: [], get_credential_file_mounts=lambda: [],
iter_skills_files=lambda: [], iter_skills_files=lambda **kw: [],
iter_cache_files=lambda: [], iter_cache_files=lambda **kw: [],
) )
from_id_calls: list[str] = [] from_id_calls: list[str] = []

View File

@@ -121,6 +121,10 @@ class TestSSHPreflight:
called["count"] += 1 called["count"] += 1
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", _fake_establish) monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", _fake_establish)
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/home/alice")
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
monkeypatch.setattr(ssh_env, "FileSyncManager", lambda **kw: type("M", (), {"sync": lambda self, **k: None})())
env = ssh_env.SSHEnvironment(host="example.com", user="alice") env = ssh_env.SSHEnvironment(host="example.com", user="alice")

View File

@@ -43,8 +43,6 @@ def get_sandbox_dir() -> Path:
# Shared constants and utilities # Shared constants and utilities
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_SYNC_INTERVAL_SECONDS = 5.0
def _pipe_stdin(proc: subprocess.Popen, data: str) -> None: def _pipe_stdin(proc: subprocess.Popen, data: str) -> None:
"""Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks.""" """Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks."""
@@ -246,9 +244,6 @@ class BaseEnvironment(ABC):
self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt" self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt"
self._cwd_marker = _cwd_marker(self._session_id) self._cwd_marker = _cwd_marker(self._session_id)
self._snapshot_ready = False self._snapshot_ready = False
self._last_sync_time: float | None = (
None # set to 0 by backends that need file sync
)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Abstract methods # Abstract methods
@@ -477,22 +472,14 @@ class BaseEnvironment(ABC):
# Hooks # Hooks
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _before_execute(self): def _before_execute(self) -> None:
"""Rate-limited file sync before each command. """Hook called before each command execution.
Backends that need pre-command sync set ``self._last_sync_time = 0`` Remote backends (SSH, Modal, Daytona) override this to trigger
in ``__init__`` and override :meth:`_sync_files`. Backends needing their FileSyncManager. Bind-mount backends (Docker, Singularity)
extra pre-exec logic (e.g. Daytona sandbox restart check) override and Local don't need file sync — the host filesystem is directly
this method and call ``super()._before_execute()``. visible inside the container/process.
""" """
if self._last_sync_time is not None:
now = time.monotonic()
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
self._sync_files()
self._last_sync_time = now
def _sync_files(self):
"""Push files to remote environment. Called rate-limited by _before_execute."""
pass pass
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View File

@@ -11,13 +11,12 @@ import shlex
import threading import threading
import warnings import warnings
from pathlib import Path from pathlib import Path
from typing import Dict, Optional
from tools.environments.base import ( from tools.environments.base import (
BaseEnvironment, BaseEnvironment,
_ThreadedProcessHandle, _ThreadedProcessHandle,
_file_mtime_key,
) )
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -61,7 +60,6 @@ class DaytonaEnvironment(BaseEnvironment):
self._daytona = Daytona() self._daytona = Daytona()
self._sandbox = None self._sandbox = None
self._lock = threading.Lock() self._lock = threading.Lock()
self._last_sync_time: float = 0
memory_gib = max(1, math.ceil(memory / 1024)) memory_gib = max(1, math.ceil(memory / 1024))
disk_gib = max(1, math.ceil(disk / 1024)) disk_gib = max(1, math.ceil(disk / 1024))
@@ -128,50 +126,40 @@ class DaytonaEnvironment(BaseEnvironment):
pass pass
logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd) logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
self._synced_files: Dict[str, tuple] = {} self._sync_manager = FileSyncManager(
self._sync_files() get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
upload_fn=self._daytona_upload,
delete_fn=self._daytona_delete,
)
self._sync_manager.sync(force=True)
self.init_session() self.init_session()
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool: def _daytona_upload(self, host_path: str, remote_path: str) -> None:
file_key = _file_mtime_key(host_path) """Upload a single file via Daytona SDK."""
if file_key is None:
return False
if self._synced_files.get(remote_path) == file_key:
return False
try:
parent = str(Path(remote_path).parent) parent = str(Path(remote_path).parent)
self._sandbox.process.exec(f"mkdir -p {parent}") self._sandbox.process.exec(f"mkdir -p {parent}")
self._sandbox.fs.upload_file(host_path, remote_path) self._sandbox.fs.upload_file(host_path, remote_path)
self._synced_files[remote_path] = file_key
return True
except Exception as e:
logger.debug("Daytona: upload failed %s: %s", host_path, e)
return False
def _sync_files(self) -> None: def _daytona_delete(self, remote_paths: list[str]) -> None:
container_base = f"{self._remote_home}/.hermes" """Batch-delete remote files via SDK exec."""
try: self._sandbox.process.exec(quoted_rm_command(remote_paths))
from tools.credential_files import get_credential_file_mounts, iter_skills_files
for mount_entry in get_credential_file_mounts():
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
self._upload_if_changed(mount_entry["host_path"], remote_path)
for entry in iter_skills_files(container_base=container_base):
self._upload_if_changed(entry["host_path"], entry["container_path"])
except Exception as e:
logger.debug("Daytona: could not sync skills/credentials: %s", e)
def _ensure_sandbox_ready(self): # ------------------------------------------------------------------
# Sandbox lifecycle
# ------------------------------------------------------------------
def _ensure_sandbox_ready(self) -> None:
"""Restart sandbox if it was stopped (e.g., by a previous interrupt).""" """Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
self._sandbox.refresh_data() self._sandbox.refresh_data()
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED): if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
self._sandbox.start() self._sandbox.start()
logger.info("Daytona: restarted sandbox %s", self._sandbox.id) logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
def _before_execute(self): def _before_execute(self) -> None:
"""Ensure sandbox is ready, then rate-limited file sync via base class.""" """Ensure sandbox is ready, then sync files via FileSyncManager."""
with self._lock: with self._lock:
self._ensure_sandbox_ready() self._ensure_sandbox_ready()
super()._before_execute() self._sync_manager.sync()
def _run_bash(self, cmd_string: str, *, login: bool = False, def _run_bash(self, cmd_string: str, *, login: bool = False,
timeout: int = 120, timeout: int = 120,

View File

@@ -0,0 +1,150 @@
"""Shared file sync manager for remote execution backends.
Tracks local file changes via mtime+size, detects deletions, and
syncs to remote environments transactionally. Used by SSH, Modal,
and Daytona. Docker and Singularity use bind mounts (live host FS
view) and don't need this.
"""
import logging
import os
import shlex
import time
from typing import Callable
from tools.environments.base import _file_mtime_key
logger = logging.getLogger(__name__)
_SYNC_INTERVAL_SECONDS = 5.0
_FORCE_SYNC_ENV = "HERMES_FORCE_FILE_SYNC"
# Transport callbacks provided by each backend
UploadFn = Callable[[str, str], None] # (host_path, remote_path) -> raises on failure
DeleteFn = Callable[[list[str]], None] # (remote_paths) -> raises on failure
GetFilesFn = Callable[[], list[tuple[str, str]]] # () -> [(host_path, remote_path), ...]
def iter_sync_files(container_base: str = "/root/.hermes") -> list[tuple[str, str]]:
"""Enumerate all files that should be synced to a remote environment.
Combines credentials, skills, and cache into a single flat list of
(host_path, remote_path) pairs. Credential paths are remapped from
the hardcoded /root/.hermes to *container_base* because the remote
user's home may differ (e.g. /home/daytona, /home/user).
"""
# Late import: credential_files imports agent modules that create
# circular dependencies if loaded at file_sync module level.
from tools.credential_files import (
get_credential_file_mounts,
iter_cache_files,
iter_skills_files,
)
files: list[tuple[str, str]] = []
for entry in get_credential_file_mounts():
remote = entry["container_path"].replace(
"/root/.hermes", container_base, 1
)
files.append((entry["host_path"], remote))
for entry in iter_skills_files(container_base=container_base):
files.append((entry["host_path"], entry["container_path"]))
for entry in iter_cache_files(container_base=container_base):
files.append((entry["host_path"], entry["container_path"]))
return files
def quoted_rm_command(remote_paths: list[str]) -> str:
"""Build a shell ``rm -f`` command for a batch of remote paths."""
return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths)
class FileSyncManager:
"""Tracks local file changes and syncs to a remote environment.
Backends instantiate this with transport callbacks (upload, delete)
and a file-source callable. The manager handles mtime-based change
detection, deletion tracking, rate limiting, and transactional state.
Not used by bind-mount backends (Docker, Singularity) — those get
live host FS views and don't need file sync.
"""
def __init__(
self,
get_files_fn: GetFilesFn,
upload_fn: UploadFn,
delete_fn: DeleteFn,
sync_interval: float = _SYNC_INTERVAL_SECONDS,
):
self._get_files_fn = get_files_fn
self._upload_fn = upload_fn
self._delete_fn = delete_fn
self._synced_files: dict[str, tuple[float, int]] = {} # remote_path -> (mtime, size)
self._last_sync_time: float = 0.0 # monotonic; 0 ensures first sync runs
self._sync_interval = sync_interval
def sync(self, *, force: bool = False) -> None:
"""Run a sync cycle: upload changed files, delete removed files.
Rate-limited to once per ``sync_interval`` unless *force* is True
or ``HERMES_FORCE_FILE_SYNC=1`` is set.
Transactional: state only committed if ALL operations succeed.
On failure, state rolls back so the next cycle retries everything.
"""
if not force and not os.environ.get(_FORCE_SYNC_ENV):
now = time.monotonic()
if now - self._last_sync_time < self._sync_interval:
return
current_files = self._get_files_fn()
current_remote_paths = {remote for _, remote in current_files}
# --- Uploads: new or changed files ---
to_upload: list[tuple[str, str]] = []
new_files = dict(self._synced_files)
for host_path, remote_path in current_files:
file_key = _file_mtime_key(host_path)
if file_key is None:
continue
if self._synced_files.get(remote_path) == file_key:
continue
to_upload.append((host_path, remote_path))
new_files[remote_path] = file_key
# --- Deletes: synced paths no longer in current set ---
to_delete = [p for p in self._synced_files if p not in current_remote_paths]
if not to_upload and not to_delete:
self._last_sync_time = time.monotonic()
return
# Snapshot for rollback (only when there's work to do)
prev_files = dict(self._synced_files)
if to_upload:
logger.debug("file_sync: uploading %d file(s)", len(to_upload))
if to_delete:
logger.debug("file_sync: deleting %d stale remote file(s)", len(to_delete))
try:
for host_path, remote_path in to_upload:
self._upload_fn(host_path, remote_path)
logger.debug("file_sync: uploaded %s -> %s", host_path, remote_path)
if to_delete:
self._delete_fn(to_delete)
logger.debug("file_sync: deleted %s", to_delete)
# --- Commit (all succeeded) ---
for p in to_delete:
new_files.pop(p, None)
self._synced_files = new_files
self._last_sync_time = time.monotonic()
except Exception as exc:
self._synced_files = prev_files
self._last_sync_time = time.monotonic()
logger.warning("file_sync: sync failed, rolled back state: %s", exc)

View File

@@ -9,16 +9,16 @@ import logging
import shlex import shlex
import threading import threading
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Optional from typing import Any, Optional
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
from tools.environments.base import ( from tools.environments.base import (
BaseEnvironment, BaseEnvironment,
_ThreadedProcessHandle, _ThreadedProcessHandle,
_file_mtime_key,
_load_json_store, _load_json_store,
_save_json_store, _save_json_store,
) )
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -150,7 +150,7 @@ class ModalEnvironment(BaseEnvironment):
image: str, image: str,
cwd: str = "/root", cwd: str = "/root",
timeout: int = 60, timeout: int = 60,
modal_sandbox_kwargs: Optional[Dict[str, Any]] = None, modal_sandbox_kwargs: Optional[dict[str, Any]] = None,
persistent_filesystem: bool = True, persistent_filesystem: bool = True,
task_id: str = "default", task_id: str = "default",
): ):
@@ -162,8 +162,7 @@ class ModalEnvironment(BaseEnvironment):
self._sandbox = None self._sandbox = None
self._app = None self._app = None
self._worker = _AsyncWorker() self._worker = _AsyncWorker()
self._synced_files: Dict[str, tuple] = {} self._sync_manager: FileSyncManager | None = None # initialized after sandbox creation
self._last_sync_time: float = 0
sandbox_kwargs = dict(modal_sandbox_kwargs or {}) sandbox_kwargs = dict(modal_sandbox_kwargs or {})
@@ -256,26 +255,24 @@ class ModalEnvironment(BaseEnvironment):
raise raise
logger.info("Modal: sandbox created (task=%s)", self._task_id) logger.info("Modal: sandbox created (task=%s)", self._task_id)
self._sync_manager = FileSyncManager(
get_files_fn=lambda: iter_sync_files("/root/.hermes"),
upload_fn=self._modal_upload,
delete_fn=self._modal_delete,
)
self._sync_manager.sync(force=True)
self.init_session() self.init_session()
def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool: def _modal_upload(self, host_path: str, remote_path: str) -> None:
"""Push a single file into the sandbox if changed.""" """Upload a single file via base64-over-exec."""
file_key = _file_mtime_key(host_path)
if file_key is None:
return False
if self._synced_files.get(container_path) == file_key:
return False
try:
content = Path(host_path).read_bytes()
except Exception:
return False
import base64 import base64
content = Path(host_path).read_bytes()
b64 = base64.b64encode(content).decode("ascii") b64 = base64.b64encode(content).decode("ascii")
container_dir = str(Path(container_path).parent) container_dir = str(Path(remote_path).parent)
cmd = ( cmd = (
f"mkdir -p {shlex.quote(container_dir)} && " f"mkdir -p {shlex.quote(container_dir)} && "
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(container_path)}" f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
) )
async def _write(): async def _write():
@@ -283,25 +280,24 @@ class ModalEnvironment(BaseEnvironment):
await proc.wait.aio() await proc.wait.aio()
self._worker.run_coroutine(_write(), timeout=15) self._worker.run_coroutine(_write(), timeout=15)
self._synced_files[container_path] = file_key
return True
def _sync_files(self) -> None: def _modal_delete(self, remote_paths: list[str]) -> None:
"""Push credential, skill, and cache files into the running sandbox.""" """Batch-delete remote files via exec."""
try: rm_cmd = quoted_rm_command(remote_paths)
from tools.credential_files import (
get_credential_file_mounts, async def _rm():
iter_skills_files, proc = await self._sandbox.exec.aio("bash", "-c", rm_cmd)
iter_cache_files, await proc.wait.aio()
)
for entry in get_credential_file_mounts(): self._worker.run_coroutine(_rm(), timeout=15)
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
for entry in iter_skills_files(): def _before_execute(self) -> None:
self._push_file_to_sandbox(entry["host_path"], entry["container_path"]) """Sync files to sandbox via FileSyncManager (rate-limited internally)."""
for entry in iter_cache_files(): self._sync_manager.sync()
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
except Exception as e: # ------------------------------------------------------------------
logger.debug("Modal: file sync failed: %s", e) # Execution
# ------------------------------------------------------------------
def _run_bash(self, cmd_string: str, *, login: bool = False, def _run_bash(self, cmd_string: str, *, login: bool = False,
timeout: int = 120, timeout: int = 120,

View File

@@ -8,6 +8,7 @@ import tempfile
from pathlib import Path from pathlib import Path
from tools.environments.base import BaseEnvironment, _popen_bash from tools.environments.base import BaseEnvironment, _popen_bash
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -43,8 +44,14 @@ class SSHEnvironment(BaseEnvironment):
_ensure_ssh_available() _ensure_ssh_available()
self._establish_connection() self._establish_connection()
self._remote_home = self._detect_remote_home() self._remote_home = self._detect_remote_home()
self._last_sync_time: float = 0 # guarantees first _before_execute syncs
self._sync_files() self._ensure_remote_dirs()
self._sync_manager = FileSyncManager(
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
upload_fn=self._scp_upload,
delete_fn=self._ssh_delete,
)
self._sync_manager.sync(force=True)
self.init_session() self.init_session()
@@ -92,50 +99,53 @@ class SSHEnvironment(BaseEnvironment):
return "/root" return "/root"
return f"/home/{self.user}" return f"/home/{self.user}"
def _sync_files(self) -> None: # ------------------------------------------------------------------
"""Rsync skills directory and credential files to the remote host.""" # File sync (via FileSyncManager)
try: # ------------------------------------------------------------------
container_base = f"{self._remote_home}/.hermes"
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
rsync_base = ["rsync", "-az", "--timeout=30", "--safe-links"] def _ensure_remote_dirs(self) -> None:
ssh_opts = f"ssh -o ControlPath={self.control_socket} -o ControlMaster=auto" """Create base ~/.hermes directory tree on remote in one SSH call."""
base = f"{self._remote_home}/.hermes"
dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"]
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
cmd = self._build_ssh_command()
cmd.append(mkdir_cmd)
subprocess.run(cmd, capture_output=True, text=True, timeout=10)
# _get_sync_files provided via iter_sync_files in FileSyncManager init
def _scp_upload(self, host_path: str, remote_path: str) -> None:
"""Upload a single file via scp over ControlMaster."""
parent = str(Path(remote_path).parent)
mkdir_cmd = self._build_ssh_command()
mkdir_cmd.append(f"mkdir -p {shlex.quote(parent)}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
scp_cmd = ["scp", "-o", f"ControlPath={self.control_socket}"]
if self.port != 22: if self.port != 22:
ssh_opts += f" -p {self.port}" scp_cmd.extend(["-P", str(self.port)])
if self.key_path: if self.key_path:
ssh_opts += f" -i {self.key_path}" scp_cmd.extend(["-i", self.key_path])
rsync_base.extend(["-e", ssh_opts]) scp_cmd.extend([host_path, f"{self.user}@{self.host}:{remote_path}"])
dest_prefix = f"{self.user}@{self.host}" result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
raise RuntimeError(f"scp failed: {result.stderr.strip()}")
for mount_entry in get_credential_file_mounts(): def _ssh_delete(self, remote_paths: list[str]) -> None:
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1) """Batch-delete remote files in one SSH call."""
parent_dir = str(Path(remote_path).parent) cmd = self._build_ssh_command()
mkdir_cmd = self._build_ssh_command() cmd.append(quoted_rm_command(remote_paths))
mkdir_cmd.append(f"mkdir -p {parent_dir}") result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) if result.returncode != 0:
cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"] raise RuntimeError(f"remote rm failed: {result.stderr.strip()}")
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path)
else:
logger.debug("SSH: rsync credential failed: %s", result.stderr.strip())
for skills_mount in get_skills_directory_mount(container_base=container_base): def _before_execute(self) -> None:
remote_path = skills_mount["container_path"] """Sync files to remote via FileSyncManager (rate-limited internally)."""
mkdir_cmd = self._build_ssh_command() self._sync_manager.sync()
mkdir_cmd.append(f"mkdir -p {remote_path}")
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10) # ------------------------------------------------------------------
cmd = rsync_base + [ # Execution
skills_mount["host_path"].rstrip("/") + "/", # ------------------------------------------------------------------
f"{dest_prefix}:{remote_path}/",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
if result.returncode == 0:
logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path)
else:
logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip())
except Exception as e:
logger.debug("SSH: could not sync skills/credentials: %s", e)
def _run_bash(self, cmd_string: str, *, login: bool = False, def _run_bash(self, cmd_string: str, *, login: bool = False,
timeout: int = 120, timeout: int = 120,