mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 23:41:35 +08:00
Compare commits
3 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d91228c2a | ||
|
|
6eba279556 | ||
|
|
517ea7ed45 |
257
tests/tools/test_file_sync.py
Normal file
257
tests/tools/test_file_sync.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""Tests for FileSyncManager — mtime tracking, deletion detection, transactional rollback."""
|
||||
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.environments.file_sync import FileSyncManager, _FORCE_SYNC_ENV
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tmp_files(tmp_path):
|
||||
"""Create a few temp files to use as sync sources."""
|
||||
files = {}
|
||||
for name in ("cred_a.json", "cred_b.json", "skill_main.py"):
|
||||
p = tmp_path / name
|
||||
p.write_text(f"content of {name}")
|
||||
files[name] = str(p)
|
||||
return files
|
||||
|
||||
|
||||
def _make_get_files(tmp_files, remote_base="/root/.hermes"):
|
||||
"""Return a get_files_fn that maps local files to remote paths."""
|
||||
mapping = [(hp, f"{remote_base}/{name}") for name, hp in tmp_files.items()]
|
||||
|
||||
def get_files():
|
||||
return [(hp, rp) for hp, rp in mapping if Path(hp).exists()]
|
||||
|
||||
return get_files
|
||||
|
||||
|
||||
def _make_manager(tmp_files, remote_base="/root/.hermes", upload=None, delete=None):
|
||||
"""Create a FileSyncManager with test callbacks."""
|
||||
return FileSyncManager(
|
||||
get_files_fn=_make_get_files(tmp_files, remote_base),
|
||||
upload_fn=upload or MagicMock(),
|
||||
delete_fn=delete or MagicMock(),
|
||||
)
|
||||
|
||||
|
||||
class TestMtimeSkip:
|
||||
def test_unchanged_files_not_re_uploaded(self, tmp_files):
|
||||
upload = MagicMock()
|
||||
mgr = _make_manager(tmp_files, upload=upload)
|
||||
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 3
|
||||
|
||||
upload.reset_mock()
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 0, "unchanged files should not be re-uploaded"
|
||||
|
||||
def test_changed_file_re_uploaded(self, tmp_files):
|
||||
upload = MagicMock()
|
||||
mgr = _make_manager(tmp_files, upload=upload)
|
||||
|
||||
mgr.sync(force=True)
|
||||
upload.reset_mock()
|
||||
|
||||
# Touch one file
|
||||
time.sleep(0.05)
|
||||
Path(tmp_files["cred_a.json"]).write_text("updated content")
|
||||
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 1
|
||||
assert tmp_files["cred_a.json"] in upload.call_args[0][0]
|
||||
|
||||
def test_new_file_detected(self, tmp_files, tmp_path):
|
||||
upload = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=_make_get_files(tmp_files),
|
||||
upload_fn=upload,
|
||||
delete_fn=MagicMock(),
|
||||
)
|
||||
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 3
|
||||
|
||||
# Add a new file
|
||||
new_file = tmp_path / "new_skill.py"
|
||||
new_file.write_text("new content")
|
||||
tmp_files["new_skill.py"] = str(new_file)
|
||||
# Recreate manager with updated file list
|
||||
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||
|
||||
upload.reset_mock()
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 1
|
||||
|
||||
|
||||
class TestDeletion:
|
||||
def test_removed_file_triggers_delete(self, tmp_files):
|
||||
upload = MagicMock()
|
||||
delete = MagicMock()
|
||||
mgr = _make_manager(tmp_files, upload=upload, delete=delete)
|
||||
|
||||
mgr.sync(force=True)
|
||||
delete.assert_not_called()
|
||||
|
||||
# Remove a file locally
|
||||
os.unlink(tmp_files["cred_b.json"])
|
||||
del tmp_files["cred_b.json"]
|
||||
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||
|
||||
mgr.sync(force=True)
|
||||
delete.assert_called_once()
|
||||
deleted_paths = delete.call_args[0][0]
|
||||
assert any("cred_b.json" in p for p in deleted_paths)
|
||||
|
||||
def test_no_delete_when_no_removals(self, tmp_files):
|
||||
delete = MagicMock()
|
||||
mgr = _make_manager(tmp_files, delete=delete)
|
||||
|
||||
mgr.sync(force=True)
|
||||
mgr.sync(force=True)
|
||||
delete.assert_not_called()
|
||||
|
||||
|
||||
class TestTransactionalRollback:
|
||||
def test_upload_failure_rolls_back(self, tmp_files):
|
||||
call_count = 0
|
||||
|
||||
def failing_upload(host_path, remote_path):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 2:
|
||||
raise RuntimeError("upload failed")
|
||||
|
||||
mgr = _make_manager(tmp_files, upload=failing_upload)
|
||||
|
||||
# First sync fails (swallowed, logged, state rolled back)
|
||||
mgr.sync(force=True)
|
||||
|
||||
# State should be empty (rolled back) — next sync retries all files
|
||||
good_upload = MagicMock()
|
||||
mgr._upload_fn = good_upload
|
||||
mgr.sync(force=True)
|
||||
assert good_upload.call_count == 3, "all files should be retried after rollback"
|
||||
|
||||
def test_delete_failure_rolls_back(self, tmp_files):
|
||||
upload = MagicMock()
|
||||
mgr = _make_manager(tmp_files, upload=upload)
|
||||
|
||||
# Initial sync
|
||||
mgr.sync(force=True)
|
||||
|
||||
# Remove a file
|
||||
os.unlink(tmp_files["skill_main.py"])
|
||||
del tmp_files["skill_main.py"]
|
||||
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||
|
||||
# Delete fails (swallowed, state rolled back)
|
||||
mgr._delete_fn = MagicMock(side_effect=RuntimeError("delete failed"))
|
||||
mgr.sync(force=True)
|
||||
|
||||
# Next sync should retry the delete
|
||||
good_delete = MagicMock()
|
||||
mgr._delete_fn = good_delete
|
||||
upload.reset_mock()
|
||||
mgr.sync(force=True)
|
||||
good_delete.assert_called_once()
|
||||
|
||||
|
||||
class TestRateLimiting:
|
||||
def test_sync_skipped_within_interval(self, tmp_files):
|
||||
upload = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=_make_get_files(tmp_files),
|
||||
upload_fn=upload,
|
||||
delete_fn=MagicMock(),
|
||||
sync_interval=10.0,
|
||||
)
|
||||
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 3
|
||||
|
||||
upload.reset_mock()
|
||||
# Without force, should skip due to rate limit
|
||||
mgr.sync()
|
||||
assert upload.call_count == 0
|
||||
|
||||
def test_force_bypasses_rate_limit(self, tmp_files, tmp_path):
|
||||
upload = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=_make_get_files(tmp_files),
|
||||
upload_fn=upload,
|
||||
delete_fn=MagicMock(),
|
||||
sync_interval=10.0,
|
||||
)
|
||||
|
||||
mgr.sync(force=True)
|
||||
upload.reset_mock()
|
||||
|
||||
# Add a new file and force sync
|
||||
new_file = tmp_path / "forced.txt"
|
||||
new_file.write_text("forced")
|
||||
tmp_files["forced.txt"] = str(new_file)
|
||||
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||
|
||||
mgr.sync(force=True)
|
||||
assert upload.call_count == 1
|
||||
|
||||
def test_env_var_forces_sync(self, tmp_files, tmp_path):
|
||||
upload = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=_make_get_files(tmp_files),
|
||||
upload_fn=upload,
|
||||
delete_fn=MagicMock(),
|
||||
sync_interval=10.0,
|
||||
)
|
||||
|
||||
mgr.sync(force=True)
|
||||
upload.reset_mock()
|
||||
|
||||
new_file = tmp_path / "env_forced.txt"
|
||||
new_file.write_text("env forced")
|
||||
tmp_files["env_forced.txt"] = str(new_file)
|
||||
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||
|
||||
with patch.dict(os.environ, {_FORCE_SYNC_ENV: "1"}):
|
||||
mgr.sync()
|
||||
assert upload.call_count == 1
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_empty_file_list(self):
|
||||
upload = MagicMock()
|
||||
delete = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=lambda: [],
|
||||
upload_fn=upload,
|
||||
delete_fn=delete,
|
||||
)
|
||||
|
||||
mgr.sync(force=True)
|
||||
upload.assert_not_called()
|
||||
delete.assert_not_called()
|
||||
|
||||
def test_file_disappears_between_list_and_upload(self, tmp_path):
|
||||
"""File listed by get_files but deleted before _file_mtime_key reads it."""
|
||||
f = tmp_path / "ephemeral.txt"
|
||||
f.write_text("here now")
|
||||
|
||||
upload = MagicMock()
|
||||
mgr = FileSyncManager(
|
||||
get_files_fn=lambda: [(str(f), "/root/.hermes/ephemeral.txt")],
|
||||
upload_fn=upload,
|
||||
delete_fn=MagicMock(),
|
||||
)
|
||||
|
||||
# Delete the file before sync can stat it
|
||||
os.unlink(str(f))
|
||||
|
||||
mgr.sync(force=True)
|
||||
upload.assert_not_called() # _file_mtime_key returns None, skipped
|
||||
127
tests/tools/test_file_sync_perf.py
Normal file
127
tests/tools/test_file_sync_perf.py
Normal file
@@ -0,0 +1,127 @@
|
||||
"""Reproducible perf benchmark for file sync overhead.
|
||||
|
||||
Measures actual env.execute() wall-clock time, no LLM in the loop.
|
||||
Run with: uv run pytest tests/tools/test_file_sync_perf.py -v -o "addopts=" -s
|
||||
|
||||
Requires backends to be configured (SSH host, Modal creds, etc).
|
||||
Skip markers gate each backend.
|
||||
"""
|
||||
|
||||
import statistics
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backend fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def local_env():
|
||||
from tools.environments.local import LocalEnvironment
|
||||
env = LocalEnvironment(cwd="/tmp", timeout=30)
|
||||
yield env
|
||||
env.cleanup()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ssh_env():
|
||||
import os
|
||||
host = os.environ.get("TERMINAL_SSH_HOST")
|
||||
user = os.environ.get("TERMINAL_SSH_USER")
|
||||
if not host or not user:
|
||||
pytest.skip("TERMINAL_SSH_HOST and TERMINAL_SSH_USER required")
|
||||
from tools.environments.ssh import SSHEnvironment
|
||||
env = SSHEnvironment(host=host, user=user, cwd="/tmp", timeout=30)
|
||||
yield env
|
||||
env.cleanup()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _time_executions(env, command: str, n: int = 10) -> list[float]:
|
||||
"""Run *command* n times and return per-call wall-clock durations."""
|
||||
durations = []
|
||||
for _ in range(n):
|
||||
t0 = time.monotonic()
|
||||
result = env.execute(command, timeout=10)
|
||||
elapsed = time.monotonic() - t0
|
||||
durations.append(elapsed)
|
||||
assert result.get("returncode", result.get("exit_code", -1)) == 0, \
|
||||
f"command failed: {result}"
|
||||
return durations
|
||||
|
||||
|
||||
def _report(label: str, durations: list[float]):
|
||||
"""Print timing stats."""
|
||||
med = statistics.median(durations)
|
||||
mean = statistics.mean(durations)
|
||||
p95 = sorted(durations)[int(len(durations) * 0.95)]
|
||||
print(f"\n {label}:")
|
||||
print(f" n={len(durations)} median={med*1000:.0f}ms mean={mean*1000:.0f}ms p95={p95*1000:.0f}ms")
|
||||
print(f" raw: {[f'{d*1000:.0f}ms' for d in durations]}")
|
||||
return med
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLocalPerf:
|
||||
"""Local baseline — no file sync, no network. Sets the floor."""
|
||||
|
||||
def test_echo_latency(self, local_env):
|
||||
durations = _time_executions(local_env, "echo hello", n=20)
|
||||
med = _report("local echo", durations)
|
||||
# Spawn-per-call overhead should be < 500ms
|
||||
assert med < 0.5, f"local echo median {med*1000:.0f}ms exceeds 500ms"
|
||||
|
||||
|
||||
@pytest.mark.ssh
|
||||
class TestSSHPerf:
|
||||
"""SSH with FileSyncManager — mtime skip should make sync ~0ms."""
|
||||
|
||||
def test_echo_latency(self, ssh_env):
|
||||
"""Sequential echo commands — measures per-command overhead including sync check."""
|
||||
durations = _time_executions(ssh_env, "echo hello", n=20)
|
||||
med = _report("ssh echo (with sync check)", durations)
|
||||
# SSH round-trip + spawn-per-call, but sync should be ~0ms (rate limited)
|
||||
assert med < 2.0, f"ssh echo median {med*1000:.0f}ms exceeds 2000ms"
|
||||
|
||||
def test_sync_overhead_after_interval(self, ssh_env):
|
||||
"""Measure sync cost when the rate-limit window has expired.
|
||||
|
||||
Sleep past the 5s interval, then time the next command which
|
||||
triggers a real sync cycle (but with mtime skip, should be fast).
|
||||
"""
|
||||
# Warm up
|
||||
ssh_env.execute("echo warmup", timeout=10)
|
||||
|
||||
# Wait for sync interval to expire
|
||||
time.sleep(6)
|
||||
|
||||
# This command will trigger a real sync cycle
|
||||
t0 = time.monotonic()
|
||||
result = ssh_env.execute("echo after-interval", timeout=10)
|
||||
elapsed = time.monotonic() - t0
|
||||
|
||||
print(f"\n ssh echo after 6s wait (sync triggered): {elapsed*1000:.0f}ms")
|
||||
assert result.get("returncode", result.get("exit_code", -1)) == 0
|
||||
|
||||
# Even with sync triggered, mtime skip should keep it fast
|
||||
# Old rsync approach: ~2-3s. New mtime skip: should be < 1.5s
|
||||
assert elapsed < 1.5, f"sync-triggered command took {elapsed*1000:.0f}ms (expected < 1500ms)"
|
||||
|
||||
def test_no_sync_within_interval(self, ssh_env):
|
||||
"""Rapid sequential commands within 5s window — no sync at all."""
|
||||
# First command triggers sync
|
||||
ssh_env.execute("echo prime", timeout=10)
|
||||
|
||||
# Immediately run 10 more — all within rate-limit window
|
||||
durations = _time_executions(ssh_env, "echo rapid", n=10)
|
||||
med = _report("ssh echo (within interval, no sync)", durations)
|
||||
|
||||
# Should be pure SSH overhead, no sync
|
||||
assert med < 1.5, f"within-interval median {med*1000:.0f}ms exceeds 1500ms"
|
||||
@@ -124,8 +124,8 @@ def _install_modal_test_modules(
|
||||
sys.modules["tools.interrupt"] = types.SimpleNamespace(is_interrupted=lambda: False)
|
||||
sys.modules["tools.credential_files"] = types.SimpleNamespace(
|
||||
get_credential_file_mounts=lambda: [],
|
||||
iter_skills_files=lambda: [],
|
||||
iter_cache_files=lambda: [],
|
||||
iter_skills_files=lambda **kw: [],
|
||||
iter_cache_files=lambda **kw: [],
|
||||
)
|
||||
|
||||
from_id_calls: list[str] = []
|
||||
|
||||
@@ -121,6 +121,10 @@ class TestSSHPreflight:
|
||||
called["count"] += 1
|
||||
|
||||
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", _fake_establish)
|
||||
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/home/alice")
|
||||
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
|
||||
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
|
||||
monkeypatch.setattr(ssh_env, "FileSyncManager", lambda **kw: type("M", (), {"sync": lambda self, **k: None})())
|
||||
|
||||
env = ssh_env.SSHEnvironment(host="example.com", user="alice")
|
||||
|
||||
|
||||
@@ -43,8 +43,6 @@ def get_sandbox_dir() -> Path:
|
||||
# Shared constants and utilities
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_SYNC_INTERVAL_SECONDS = 5.0
|
||||
|
||||
|
||||
def _pipe_stdin(proc: subprocess.Popen, data: str) -> None:
|
||||
"""Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks."""
|
||||
@@ -246,9 +244,6 @@ class BaseEnvironment(ABC):
|
||||
self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt"
|
||||
self._cwd_marker = _cwd_marker(self._session_id)
|
||||
self._snapshot_ready = False
|
||||
self._last_sync_time: float | None = (
|
||||
None # set to 0 by backends that need file sync
|
||||
)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Abstract methods
|
||||
@@ -477,22 +472,14 @@ class BaseEnvironment(ABC):
|
||||
# Hooks
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _before_execute(self):
|
||||
"""Rate-limited file sync before each command.
|
||||
def _before_execute(self) -> None:
|
||||
"""Hook called before each command execution.
|
||||
|
||||
Backends that need pre-command sync set ``self._last_sync_time = 0``
|
||||
in ``__init__`` and override :meth:`_sync_files`. Backends needing
|
||||
extra pre-exec logic (e.g. Daytona sandbox restart check) override
|
||||
this method and call ``super()._before_execute()``.
|
||||
Remote backends (SSH, Modal, Daytona) override this to trigger
|
||||
their FileSyncManager. Bind-mount backends (Docker, Singularity)
|
||||
and Local don't need file sync — the host filesystem is directly
|
||||
visible inside the container/process.
|
||||
"""
|
||||
if self._last_sync_time is not None:
|
||||
now = time.monotonic()
|
||||
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
|
||||
self._sync_files()
|
||||
self._last_sync_time = now
|
||||
|
||||
def _sync_files(self):
|
||||
"""Push files to remote environment. Called rate-limited by _before_execute."""
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@@ -11,13 +11,12 @@ import shlex
|
||||
import threading
|
||||
import warnings
|
||||
from pathlib import Path
|
||||
from typing import Dict, Optional
|
||||
|
||||
from tools.environments.base import (
|
||||
BaseEnvironment,
|
||||
_ThreadedProcessHandle,
|
||||
_file_mtime_key,
|
||||
)
|
||||
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -61,7 +60,6 @@ class DaytonaEnvironment(BaseEnvironment):
|
||||
self._daytona = Daytona()
|
||||
self._sandbox = None
|
||||
self._lock = threading.Lock()
|
||||
self._last_sync_time: float = 0
|
||||
|
||||
memory_gib = max(1, math.ceil(memory / 1024))
|
||||
disk_gib = max(1, math.ceil(disk / 1024))
|
||||
@@ -128,50 +126,40 @@ class DaytonaEnvironment(BaseEnvironment):
|
||||
pass
|
||||
logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
|
||||
|
||||
self._synced_files: Dict[str, tuple] = {}
|
||||
self._sync_files()
|
||||
self._sync_manager = FileSyncManager(
|
||||
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
|
||||
upload_fn=self._daytona_upload,
|
||||
delete_fn=self._daytona_delete,
|
||||
)
|
||||
self._sync_manager.sync(force=True)
|
||||
self.init_session()
|
||||
|
||||
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool:
|
||||
file_key = _file_mtime_key(host_path)
|
||||
if file_key is None:
|
||||
return False
|
||||
if self._synced_files.get(remote_path) == file_key:
|
||||
return False
|
||||
try:
|
||||
parent = str(Path(remote_path).parent)
|
||||
self._sandbox.process.exec(f"mkdir -p {parent}")
|
||||
self._sandbox.fs.upload_file(host_path, remote_path)
|
||||
self._synced_files[remote_path] = file_key
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug("Daytona: upload failed %s: %s", host_path, e)
|
||||
return False
|
||||
def _daytona_upload(self, host_path: str, remote_path: str) -> None:
|
||||
"""Upload a single file via Daytona SDK."""
|
||||
parent = str(Path(remote_path).parent)
|
||||
self._sandbox.process.exec(f"mkdir -p {parent}")
|
||||
self._sandbox.fs.upload_file(host_path, remote_path)
|
||||
|
||||
def _sync_files(self) -> None:
|
||||
container_base = f"{self._remote_home}/.hermes"
|
||||
try:
|
||||
from tools.credential_files import get_credential_file_mounts, iter_skills_files
|
||||
for mount_entry in get_credential_file_mounts():
|
||||
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
|
||||
self._upload_if_changed(mount_entry["host_path"], remote_path)
|
||||
for entry in iter_skills_files(container_base=container_base):
|
||||
self._upload_if_changed(entry["host_path"], entry["container_path"])
|
||||
except Exception as e:
|
||||
logger.debug("Daytona: could not sync skills/credentials: %s", e)
|
||||
def _daytona_delete(self, remote_paths: list[str]) -> None:
|
||||
"""Batch-delete remote files via SDK exec."""
|
||||
self._sandbox.process.exec(quoted_rm_command(remote_paths))
|
||||
|
||||
def _ensure_sandbox_ready(self):
|
||||
# ------------------------------------------------------------------
|
||||
# Sandbox lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _ensure_sandbox_ready(self) -> None:
|
||||
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
|
||||
self._sandbox.refresh_data()
|
||||
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
|
||||
self._sandbox.start()
|
||||
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
|
||||
|
||||
def _before_execute(self):
|
||||
"""Ensure sandbox is ready, then rate-limited file sync via base class."""
|
||||
def _before_execute(self) -> None:
|
||||
"""Ensure sandbox is ready, then sync files via FileSyncManager."""
|
||||
with self._lock:
|
||||
self._ensure_sandbox_ready()
|
||||
super()._before_execute()
|
||||
self._sync_manager.sync()
|
||||
|
||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||
timeout: int = 120,
|
||||
|
||||
150
tools/environments/file_sync.py
Normal file
150
tools/environments/file_sync.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Shared file sync manager for remote execution backends.
|
||||
|
||||
Tracks local file changes via mtime+size, detects deletions, and
|
||||
syncs to remote environments transactionally. Used by SSH, Modal,
|
||||
and Daytona. Docker and Singularity use bind mounts (live host FS
|
||||
view) and don't need this.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
from tools.environments.base import _file_mtime_key
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SYNC_INTERVAL_SECONDS = 5.0
|
||||
_FORCE_SYNC_ENV = "HERMES_FORCE_FILE_SYNC"
|
||||
|
||||
# Transport callbacks provided by each backend
|
||||
UploadFn = Callable[[str, str], None] # (host_path, remote_path) -> raises on failure
|
||||
DeleteFn = Callable[[list[str]], None] # (remote_paths) -> raises on failure
|
||||
GetFilesFn = Callable[[], list[tuple[str, str]]] # () -> [(host_path, remote_path), ...]
|
||||
|
||||
|
||||
def iter_sync_files(container_base: str = "/root/.hermes") -> list[tuple[str, str]]:
|
||||
"""Enumerate all files that should be synced to a remote environment.
|
||||
|
||||
Combines credentials, skills, and cache into a single flat list of
|
||||
(host_path, remote_path) pairs. Credential paths are remapped from
|
||||
the hardcoded /root/.hermes to *container_base* because the remote
|
||||
user's home may differ (e.g. /home/daytona, /home/user).
|
||||
"""
|
||||
# Late import: credential_files imports agent modules that create
|
||||
# circular dependencies if loaded at file_sync module level.
|
||||
from tools.credential_files import (
|
||||
get_credential_file_mounts,
|
||||
iter_cache_files,
|
||||
iter_skills_files,
|
||||
)
|
||||
|
||||
files: list[tuple[str, str]] = []
|
||||
for entry in get_credential_file_mounts():
|
||||
remote = entry["container_path"].replace(
|
||||
"/root/.hermes", container_base, 1
|
||||
)
|
||||
files.append((entry["host_path"], remote))
|
||||
for entry in iter_skills_files(container_base=container_base):
|
||||
files.append((entry["host_path"], entry["container_path"]))
|
||||
for entry in iter_cache_files(container_base=container_base):
|
||||
files.append((entry["host_path"], entry["container_path"]))
|
||||
return files
|
||||
|
||||
|
||||
def quoted_rm_command(remote_paths: list[str]) -> str:
|
||||
"""Build a shell ``rm -f`` command for a batch of remote paths."""
|
||||
return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths)
|
||||
|
||||
|
||||
class FileSyncManager:
|
||||
"""Tracks local file changes and syncs to a remote environment.
|
||||
|
||||
Backends instantiate this with transport callbacks (upload, delete)
|
||||
and a file-source callable. The manager handles mtime-based change
|
||||
detection, deletion tracking, rate limiting, and transactional state.
|
||||
|
||||
Not used by bind-mount backends (Docker, Singularity) — those get
|
||||
live host FS views and don't need file sync.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
get_files_fn: GetFilesFn,
|
||||
upload_fn: UploadFn,
|
||||
delete_fn: DeleteFn,
|
||||
sync_interval: float = _SYNC_INTERVAL_SECONDS,
|
||||
):
|
||||
self._get_files_fn = get_files_fn
|
||||
self._upload_fn = upload_fn
|
||||
self._delete_fn = delete_fn
|
||||
self._synced_files: dict[str, tuple[float, int]] = {} # remote_path -> (mtime, size)
|
||||
self._last_sync_time: float = 0.0 # monotonic; 0 ensures first sync runs
|
||||
self._sync_interval = sync_interval
|
||||
|
||||
def sync(self, *, force: bool = False) -> None:
|
||||
"""Run a sync cycle: upload changed files, delete removed files.
|
||||
|
||||
Rate-limited to once per ``sync_interval`` unless *force* is True
|
||||
or ``HERMES_FORCE_FILE_SYNC=1`` is set.
|
||||
|
||||
Transactional: state only committed if ALL operations succeed.
|
||||
On failure, state rolls back so the next cycle retries everything.
|
||||
"""
|
||||
if not force and not os.environ.get(_FORCE_SYNC_ENV):
|
||||
now = time.monotonic()
|
||||
if now - self._last_sync_time < self._sync_interval:
|
||||
return
|
||||
|
||||
current_files = self._get_files_fn()
|
||||
current_remote_paths = {remote for _, remote in current_files}
|
||||
|
||||
# --- Uploads: new or changed files ---
|
||||
to_upload: list[tuple[str, str]] = []
|
||||
new_files = dict(self._synced_files)
|
||||
for host_path, remote_path in current_files:
|
||||
file_key = _file_mtime_key(host_path)
|
||||
if file_key is None:
|
||||
continue
|
||||
if self._synced_files.get(remote_path) == file_key:
|
||||
continue
|
||||
to_upload.append((host_path, remote_path))
|
||||
new_files[remote_path] = file_key
|
||||
|
||||
# --- Deletes: synced paths no longer in current set ---
|
||||
to_delete = [p for p in self._synced_files if p not in current_remote_paths]
|
||||
|
||||
if not to_upload and not to_delete:
|
||||
self._last_sync_time = time.monotonic()
|
||||
return
|
||||
|
||||
# Snapshot for rollback (only when there's work to do)
|
||||
prev_files = dict(self._synced_files)
|
||||
|
||||
if to_upload:
|
||||
logger.debug("file_sync: uploading %d file(s)", len(to_upload))
|
||||
if to_delete:
|
||||
logger.debug("file_sync: deleting %d stale remote file(s)", len(to_delete))
|
||||
|
||||
try:
|
||||
for host_path, remote_path in to_upload:
|
||||
self._upload_fn(host_path, remote_path)
|
||||
logger.debug("file_sync: uploaded %s -> %s", host_path, remote_path)
|
||||
|
||||
if to_delete:
|
||||
self._delete_fn(to_delete)
|
||||
logger.debug("file_sync: deleted %s", to_delete)
|
||||
|
||||
# --- Commit (all succeeded) ---
|
||||
for p in to_delete:
|
||||
new_files.pop(p, None)
|
||||
|
||||
self._synced_files = new_files
|
||||
self._last_sync_time = time.monotonic()
|
||||
|
||||
except Exception as exc:
|
||||
self._synced_files = prev_files
|
||||
self._last_sync_time = time.monotonic()
|
||||
logger.warning("file_sync: sync failed, rolled back state: %s", exc)
|
||||
@@ -9,16 +9,16 @@ import logging
|
||||
import shlex
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from tools.environments.base import (
|
||||
BaseEnvironment,
|
||||
_ThreadedProcessHandle,
|
||||
_file_mtime_key,
|
||||
_load_json_store,
|
||||
_save_json_store,
|
||||
)
|
||||
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -150,7 +150,7 @@ class ModalEnvironment(BaseEnvironment):
|
||||
image: str,
|
||||
cwd: str = "/root",
|
||||
timeout: int = 60,
|
||||
modal_sandbox_kwargs: Optional[Dict[str, Any]] = None,
|
||||
modal_sandbox_kwargs: Optional[dict[str, Any]] = None,
|
||||
persistent_filesystem: bool = True,
|
||||
task_id: str = "default",
|
||||
):
|
||||
@@ -162,8 +162,7 @@ class ModalEnvironment(BaseEnvironment):
|
||||
self._sandbox = None
|
||||
self._app = None
|
||||
self._worker = _AsyncWorker()
|
||||
self._synced_files: Dict[str, tuple] = {}
|
||||
self._last_sync_time: float = 0
|
||||
self._sync_manager: FileSyncManager | None = None # initialized after sandbox creation
|
||||
|
||||
sandbox_kwargs = dict(modal_sandbox_kwargs or {})
|
||||
|
||||
@@ -256,26 +255,24 @@ class ModalEnvironment(BaseEnvironment):
|
||||
raise
|
||||
|
||||
logger.info("Modal: sandbox created (task=%s)", self._task_id)
|
||||
|
||||
self._sync_manager = FileSyncManager(
|
||||
get_files_fn=lambda: iter_sync_files("/root/.hermes"),
|
||||
upload_fn=self._modal_upload,
|
||||
delete_fn=self._modal_delete,
|
||||
)
|
||||
self._sync_manager.sync(force=True)
|
||||
self.init_session()
|
||||
|
||||
def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool:
|
||||
"""Push a single file into the sandbox if changed."""
|
||||
file_key = _file_mtime_key(host_path)
|
||||
if file_key is None:
|
||||
return False
|
||||
if self._synced_files.get(container_path) == file_key:
|
||||
return False
|
||||
try:
|
||||
content = Path(host_path).read_bytes()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _modal_upload(self, host_path: str, remote_path: str) -> None:
|
||||
"""Upload a single file via base64-over-exec."""
|
||||
import base64
|
||||
content = Path(host_path).read_bytes()
|
||||
b64 = base64.b64encode(content).decode("ascii")
|
||||
container_dir = str(Path(container_path).parent)
|
||||
container_dir = str(Path(remote_path).parent)
|
||||
cmd = (
|
||||
f"mkdir -p {shlex.quote(container_dir)} && "
|
||||
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(container_path)}"
|
||||
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
|
||||
)
|
||||
|
||||
async def _write():
|
||||
@@ -283,25 +280,24 @@ class ModalEnvironment(BaseEnvironment):
|
||||
await proc.wait.aio()
|
||||
|
||||
self._worker.run_coroutine(_write(), timeout=15)
|
||||
self._synced_files[container_path] = file_key
|
||||
return True
|
||||
|
||||
def _sync_files(self) -> None:
|
||||
"""Push credential, skill, and cache files into the running sandbox."""
|
||||
try:
|
||||
from tools.credential_files import (
|
||||
get_credential_file_mounts,
|
||||
iter_skills_files,
|
||||
iter_cache_files,
|
||||
)
|
||||
for entry in get_credential_file_mounts():
|
||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
||||
for entry in iter_skills_files():
|
||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
||||
for entry in iter_cache_files():
|
||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
||||
except Exception as e:
|
||||
logger.debug("Modal: file sync failed: %s", e)
|
||||
def _modal_delete(self, remote_paths: list[str]) -> None:
|
||||
"""Batch-delete remote files via exec."""
|
||||
rm_cmd = quoted_rm_command(remote_paths)
|
||||
|
||||
async def _rm():
|
||||
proc = await self._sandbox.exec.aio("bash", "-c", rm_cmd)
|
||||
await proc.wait.aio()
|
||||
|
||||
self._worker.run_coroutine(_rm(), timeout=15)
|
||||
|
||||
def _before_execute(self) -> None:
|
||||
"""Sync files to sandbox via FileSyncManager (rate-limited internally)."""
|
||||
self._sync_manager.sync()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||
timeout: int = 120,
|
||||
|
||||
@@ -8,6 +8,7 @@ import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
from tools.environments.base import BaseEnvironment, _popen_bash
|
||||
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -43,8 +44,14 @@ class SSHEnvironment(BaseEnvironment):
|
||||
_ensure_ssh_available()
|
||||
self._establish_connection()
|
||||
self._remote_home = self._detect_remote_home()
|
||||
self._last_sync_time: float = 0 # guarantees first _before_execute syncs
|
||||
self._sync_files()
|
||||
|
||||
self._ensure_remote_dirs()
|
||||
self._sync_manager = FileSyncManager(
|
||||
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
|
||||
upload_fn=self._scp_upload,
|
||||
delete_fn=self._ssh_delete,
|
||||
)
|
||||
self._sync_manager.sync(force=True)
|
||||
|
||||
self.init_session()
|
||||
|
||||
@@ -92,50 +99,53 @@ class SSHEnvironment(BaseEnvironment):
|
||||
return "/root"
|
||||
return f"/home/{self.user}"
|
||||
|
||||
def _sync_files(self) -> None:
|
||||
"""Rsync skills directory and credential files to the remote host."""
|
||||
try:
|
||||
container_base = f"{self._remote_home}/.hermes"
|
||||
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
|
||||
# ------------------------------------------------------------------
|
||||
# File sync (via FileSyncManager)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
rsync_base = ["rsync", "-az", "--timeout=30", "--safe-links"]
|
||||
ssh_opts = f"ssh -o ControlPath={self.control_socket} -o ControlMaster=auto"
|
||||
if self.port != 22:
|
||||
ssh_opts += f" -p {self.port}"
|
||||
if self.key_path:
|
||||
ssh_opts += f" -i {self.key_path}"
|
||||
rsync_base.extend(["-e", ssh_opts])
|
||||
dest_prefix = f"{self.user}@{self.host}"
|
||||
def _ensure_remote_dirs(self) -> None:
|
||||
"""Create base ~/.hermes directory tree on remote in one SSH call."""
|
||||
base = f"{self._remote_home}/.hermes"
|
||||
dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"]
|
||||
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(mkdir_cmd)
|
||||
subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||
|
||||
for mount_entry in get_credential_file_mounts():
|
||||
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
|
||||
parent_dir = str(Path(remote_path).parent)
|
||||
mkdir_cmd = self._build_ssh_command()
|
||||
mkdir_cmd.append(f"mkdir -p {parent_dir}")
|
||||
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
||||
cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
||||
if result.returncode == 0:
|
||||
logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path)
|
||||
else:
|
||||
logger.debug("SSH: rsync credential failed: %s", result.stderr.strip())
|
||||
# _get_sync_files provided via iter_sync_files in FileSyncManager init
|
||||
|
||||
for skills_mount in get_skills_directory_mount(container_base=container_base):
|
||||
remote_path = skills_mount["container_path"]
|
||||
mkdir_cmd = self._build_ssh_command()
|
||||
mkdir_cmd.append(f"mkdir -p {remote_path}")
|
||||
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
||||
cmd = rsync_base + [
|
||||
skills_mount["host_path"].rstrip("/") + "/",
|
||||
f"{dest_prefix}:{remote_path}/",
|
||||
]
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
||||
if result.returncode == 0:
|
||||
logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path)
|
||||
else:
|
||||
logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip())
|
||||
except Exception as e:
|
||||
logger.debug("SSH: could not sync skills/credentials: %s", e)
|
||||
def _scp_upload(self, host_path: str, remote_path: str) -> None:
|
||||
"""Upload a single file via scp over ControlMaster."""
|
||||
parent = str(Path(remote_path).parent)
|
||||
mkdir_cmd = self._build_ssh_command()
|
||||
mkdir_cmd.append(f"mkdir -p {shlex.quote(parent)}")
|
||||
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
||||
|
||||
scp_cmd = ["scp", "-o", f"ControlPath={self.control_socket}"]
|
||||
if self.port != 22:
|
||||
scp_cmd.extend(["-P", str(self.port)])
|
||||
if self.key_path:
|
||||
scp_cmd.extend(["-i", self.key_path])
|
||||
scp_cmd.extend([host_path, f"{self.user}@{self.host}:{remote_path}"])
|
||||
result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=30)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"scp failed: {result.stderr.strip()}")
|
||||
|
||||
def _ssh_delete(self, remote_paths: list[str]) -> None:
|
||||
"""Batch-delete remote files in one SSH call."""
|
||||
cmd = self._build_ssh_command()
|
||||
cmd.append(quoted_rm_command(remote_paths))
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"remote rm failed: {result.stderr.strip()}")
|
||||
|
||||
def _before_execute(self) -> None:
|
||||
"""Sync files to remote via FileSyncManager (rate-limited internally)."""
|
||||
self._sync_manager.sync()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Execution
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||
timeout: int = 120,
|
||||
|
||||
Reference in New Issue
Block a user