mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 16:01:49 +08:00
Compare commits
3 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d91228c2a | ||
|
|
6eba279556 | ||
|
|
517ea7ed45 |
257
tests/tools/test_file_sync.py
Normal file
257
tests/tools/test_file_sync.py
Normal file
@@ -0,0 +1,257 @@
|
|||||||
|
"""Tests for FileSyncManager — mtime tracking, deletion detection, transactional rollback."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tools.environments.file_sync import FileSyncManager, _FORCE_SYNC_ENV
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_files(tmp_path):
|
||||||
|
"""Create a few temp files to use as sync sources."""
|
||||||
|
files = {}
|
||||||
|
for name in ("cred_a.json", "cred_b.json", "skill_main.py"):
|
||||||
|
p = tmp_path / name
|
||||||
|
p.write_text(f"content of {name}")
|
||||||
|
files[name] = str(p)
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def _make_get_files(tmp_files, remote_base="/root/.hermes"):
|
||||||
|
"""Return a get_files_fn that maps local files to remote paths."""
|
||||||
|
mapping = [(hp, f"{remote_base}/{name}") for name, hp in tmp_files.items()]
|
||||||
|
|
||||||
|
def get_files():
|
||||||
|
return [(hp, rp) for hp, rp in mapping if Path(hp).exists()]
|
||||||
|
|
||||||
|
return get_files
|
||||||
|
|
||||||
|
|
||||||
|
def _make_manager(tmp_files, remote_base="/root/.hermes", upload=None, delete=None):
|
||||||
|
"""Create a FileSyncManager with test callbacks."""
|
||||||
|
return FileSyncManager(
|
||||||
|
get_files_fn=_make_get_files(tmp_files, remote_base),
|
||||||
|
upload_fn=upload or MagicMock(),
|
||||||
|
delete_fn=delete or MagicMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMtimeSkip:
|
||||||
|
def test_unchanged_files_not_re_uploaded(self, tmp_files):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = _make_manager(tmp_files, upload=upload)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 3
|
||||||
|
|
||||||
|
upload.reset_mock()
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 0, "unchanged files should not be re-uploaded"
|
||||||
|
|
||||||
|
def test_changed_file_re_uploaded(self, tmp_files):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = _make_manager(tmp_files, upload=upload)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
upload.reset_mock()
|
||||||
|
|
||||||
|
# Touch one file
|
||||||
|
time.sleep(0.05)
|
||||||
|
Path(tmp_files["cred_a.json"]).write_text("updated content")
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 1
|
||||||
|
assert tmp_files["cred_a.json"] in upload.call_args[0][0]
|
||||||
|
|
||||||
|
def test_new_file_detected(self, tmp_files, tmp_path):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=_make_get_files(tmp_files),
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=MagicMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 3
|
||||||
|
|
||||||
|
# Add a new file
|
||||||
|
new_file = tmp_path / "new_skill.py"
|
||||||
|
new_file.write_text("new content")
|
||||||
|
tmp_files["new_skill.py"] = str(new_file)
|
||||||
|
# Recreate manager with updated file list
|
||||||
|
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||||
|
|
||||||
|
upload.reset_mock()
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestDeletion:
|
||||||
|
def test_removed_file_triggers_delete(self, tmp_files):
|
||||||
|
upload = MagicMock()
|
||||||
|
delete = MagicMock()
|
||||||
|
mgr = _make_manager(tmp_files, upload=upload, delete=delete)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
delete.assert_not_called()
|
||||||
|
|
||||||
|
# Remove a file locally
|
||||||
|
os.unlink(tmp_files["cred_b.json"])
|
||||||
|
del tmp_files["cred_b.json"]
|
||||||
|
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
delete.assert_called_once()
|
||||||
|
deleted_paths = delete.call_args[0][0]
|
||||||
|
assert any("cred_b.json" in p for p in deleted_paths)
|
||||||
|
|
||||||
|
def test_no_delete_when_no_removals(self, tmp_files):
|
||||||
|
delete = MagicMock()
|
||||||
|
mgr = _make_manager(tmp_files, delete=delete)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
mgr.sync(force=True)
|
||||||
|
delete.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
|
class TestTransactionalRollback:
|
||||||
|
def test_upload_failure_rolls_back(self, tmp_files):
|
||||||
|
call_count = 0
|
||||||
|
|
||||||
|
def failing_upload(host_path, remote_path):
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
if call_count == 2:
|
||||||
|
raise RuntimeError("upload failed")
|
||||||
|
|
||||||
|
mgr = _make_manager(tmp_files, upload=failing_upload)
|
||||||
|
|
||||||
|
# First sync fails (swallowed, logged, state rolled back)
|
||||||
|
mgr.sync(force=True)
|
||||||
|
|
||||||
|
# State should be empty (rolled back) — next sync retries all files
|
||||||
|
good_upload = MagicMock()
|
||||||
|
mgr._upload_fn = good_upload
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert good_upload.call_count == 3, "all files should be retried after rollback"
|
||||||
|
|
||||||
|
def test_delete_failure_rolls_back(self, tmp_files):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = _make_manager(tmp_files, upload=upload)
|
||||||
|
|
||||||
|
# Initial sync
|
||||||
|
mgr.sync(force=True)
|
||||||
|
|
||||||
|
# Remove a file
|
||||||
|
os.unlink(tmp_files["skill_main.py"])
|
||||||
|
del tmp_files["skill_main.py"]
|
||||||
|
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||||
|
|
||||||
|
# Delete fails (swallowed, state rolled back)
|
||||||
|
mgr._delete_fn = MagicMock(side_effect=RuntimeError("delete failed"))
|
||||||
|
mgr.sync(force=True)
|
||||||
|
|
||||||
|
# Next sync should retry the delete
|
||||||
|
good_delete = MagicMock()
|
||||||
|
mgr._delete_fn = good_delete
|
||||||
|
upload.reset_mock()
|
||||||
|
mgr.sync(force=True)
|
||||||
|
good_delete.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestRateLimiting:
|
||||||
|
def test_sync_skipped_within_interval(self, tmp_files):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=_make_get_files(tmp_files),
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=MagicMock(),
|
||||||
|
sync_interval=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 3
|
||||||
|
|
||||||
|
upload.reset_mock()
|
||||||
|
# Without force, should skip due to rate limit
|
||||||
|
mgr.sync()
|
||||||
|
assert upload.call_count == 0
|
||||||
|
|
||||||
|
def test_force_bypasses_rate_limit(self, tmp_files, tmp_path):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=_make_get_files(tmp_files),
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=MagicMock(),
|
||||||
|
sync_interval=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
upload.reset_mock()
|
||||||
|
|
||||||
|
# Add a new file and force sync
|
||||||
|
new_file = tmp_path / "forced.txt"
|
||||||
|
new_file.write_text("forced")
|
||||||
|
tmp_files["forced.txt"] = str(new_file)
|
||||||
|
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
assert upload.call_count == 1
|
||||||
|
|
||||||
|
def test_env_var_forces_sync(self, tmp_files, tmp_path):
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=_make_get_files(tmp_files),
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=MagicMock(),
|
||||||
|
sync_interval=10.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
upload.reset_mock()
|
||||||
|
|
||||||
|
new_file = tmp_path / "env_forced.txt"
|
||||||
|
new_file.write_text("env forced")
|
||||||
|
tmp_files["env_forced.txt"] = str(new_file)
|
||||||
|
mgr._get_files_fn = _make_get_files(tmp_files)
|
||||||
|
|
||||||
|
with patch.dict(os.environ, {_FORCE_SYNC_ENV: "1"}):
|
||||||
|
mgr.sync()
|
||||||
|
assert upload.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
def test_empty_file_list(self):
|
||||||
|
upload = MagicMock()
|
||||||
|
delete = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=lambda: [],
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=delete,
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
upload.assert_not_called()
|
||||||
|
delete.assert_not_called()
|
||||||
|
|
||||||
|
def test_file_disappears_between_list_and_upload(self, tmp_path):
|
||||||
|
"""File listed by get_files but deleted before _file_mtime_key reads it."""
|
||||||
|
f = tmp_path / "ephemeral.txt"
|
||||||
|
f.write_text("here now")
|
||||||
|
|
||||||
|
upload = MagicMock()
|
||||||
|
mgr = FileSyncManager(
|
||||||
|
get_files_fn=lambda: [(str(f), "/root/.hermes/ephemeral.txt")],
|
||||||
|
upload_fn=upload,
|
||||||
|
delete_fn=MagicMock(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete the file before sync can stat it
|
||||||
|
os.unlink(str(f))
|
||||||
|
|
||||||
|
mgr.sync(force=True)
|
||||||
|
upload.assert_not_called() # _file_mtime_key returns None, skipped
|
||||||
127
tests/tools/test_file_sync_perf.py
Normal file
127
tests/tools/test_file_sync_perf.py
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
"""Reproducible perf benchmark for file sync overhead.
|
||||||
|
|
||||||
|
Measures actual env.execute() wall-clock time, no LLM in the loop.
|
||||||
|
Run with: uv run pytest tests/tools/test_file_sync_perf.py -v -o "addopts=" -s
|
||||||
|
|
||||||
|
Requires backends to be configured (SSH host, Modal creds, etc).
|
||||||
|
Skip markers gate each backend.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import statistics
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Backend fixtures
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def local_env():
|
||||||
|
from tools.environments.local import LocalEnvironment
|
||||||
|
env = LocalEnvironment(cwd="/tmp", timeout=30)
|
||||||
|
yield env
|
||||||
|
env.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def ssh_env():
|
||||||
|
import os
|
||||||
|
host = os.environ.get("TERMINAL_SSH_HOST")
|
||||||
|
user = os.environ.get("TERMINAL_SSH_USER")
|
||||||
|
if not host or not user:
|
||||||
|
pytest.skip("TERMINAL_SSH_HOST and TERMINAL_SSH_USER required")
|
||||||
|
from tools.environments.ssh import SSHEnvironment
|
||||||
|
env = SSHEnvironment(host=host, user=user, cwd="/tmp", timeout=30)
|
||||||
|
yield env
|
||||||
|
env.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _time_executions(env, command: str, n: int = 10) -> list[float]:
|
||||||
|
"""Run *command* n times and return per-call wall-clock durations."""
|
||||||
|
durations = []
|
||||||
|
for _ in range(n):
|
||||||
|
t0 = time.monotonic()
|
||||||
|
result = env.execute(command, timeout=10)
|
||||||
|
elapsed = time.monotonic() - t0
|
||||||
|
durations.append(elapsed)
|
||||||
|
assert result.get("returncode", result.get("exit_code", -1)) == 0, \
|
||||||
|
f"command failed: {result}"
|
||||||
|
return durations
|
||||||
|
|
||||||
|
|
||||||
|
def _report(label: str, durations: list[float]):
|
||||||
|
"""Print timing stats."""
|
||||||
|
med = statistics.median(durations)
|
||||||
|
mean = statistics.mean(durations)
|
||||||
|
p95 = sorted(durations)[int(len(durations) * 0.95)]
|
||||||
|
print(f"\n {label}:")
|
||||||
|
print(f" n={len(durations)} median={med*1000:.0f}ms mean={mean*1000:.0f}ms p95={p95*1000:.0f}ms")
|
||||||
|
print(f" raw: {[f'{d*1000:.0f}ms' for d in durations]}")
|
||||||
|
return med
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tests
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestLocalPerf:
|
||||||
|
"""Local baseline — no file sync, no network. Sets the floor."""
|
||||||
|
|
||||||
|
def test_echo_latency(self, local_env):
|
||||||
|
durations = _time_executions(local_env, "echo hello", n=20)
|
||||||
|
med = _report("local echo", durations)
|
||||||
|
# Spawn-per-call overhead should be < 500ms
|
||||||
|
assert med < 0.5, f"local echo median {med*1000:.0f}ms exceeds 500ms"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.ssh
|
||||||
|
class TestSSHPerf:
|
||||||
|
"""SSH with FileSyncManager — mtime skip should make sync ~0ms."""
|
||||||
|
|
||||||
|
def test_echo_latency(self, ssh_env):
|
||||||
|
"""Sequential echo commands — measures per-command overhead including sync check."""
|
||||||
|
durations = _time_executions(ssh_env, "echo hello", n=20)
|
||||||
|
med = _report("ssh echo (with sync check)", durations)
|
||||||
|
# SSH round-trip + spawn-per-call, but sync should be ~0ms (rate limited)
|
||||||
|
assert med < 2.0, f"ssh echo median {med*1000:.0f}ms exceeds 2000ms"
|
||||||
|
|
||||||
|
def test_sync_overhead_after_interval(self, ssh_env):
|
||||||
|
"""Measure sync cost when the rate-limit window has expired.
|
||||||
|
|
||||||
|
Sleep past the 5s interval, then time the next command which
|
||||||
|
triggers a real sync cycle (but with mtime skip, should be fast).
|
||||||
|
"""
|
||||||
|
# Warm up
|
||||||
|
ssh_env.execute("echo warmup", timeout=10)
|
||||||
|
|
||||||
|
# Wait for sync interval to expire
|
||||||
|
time.sleep(6)
|
||||||
|
|
||||||
|
# This command will trigger a real sync cycle
|
||||||
|
t0 = time.monotonic()
|
||||||
|
result = ssh_env.execute("echo after-interval", timeout=10)
|
||||||
|
elapsed = time.monotonic() - t0
|
||||||
|
|
||||||
|
print(f"\n ssh echo after 6s wait (sync triggered): {elapsed*1000:.0f}ms")
|
||||||
|
assert result.get("returncode", result.get("exit_code", -1)) == 0
|
||||||
|
|
||||||
|
# Even with sync triggered, mtime skip should keep it fast
|
||||||
|
# Old rsync approach: ~2-3s. New mtime skip: should be < 1.5s
|
||||||
|
assert elapsed < 1.5, f"sync-triggered command took {elapsed*1000:.0f}ms (expected < 1500ms)"
|
||||||
|
|
||||||
|
def test_no_sync_within_interval(self, ssh_env):
|
||||||
|
"""Rapid sequential commands within 5s window — no sync at all."""
|
||||||
|
# First command triggers sync
|
||||||
|
ssh_env.execute("echo prime", timeout=10)
|
||||||
|
|
||||||
|
# Immediately run 10 more — all within rate-limit window
|
||||||
|
durations = _time_executions(ssh_env, "echo rapid", n=10)
|
||||||
|
med = _report("ssh echo (within interval, no sync)", durations)
|
||||||
|
|
||||||
|
# Should be pure SSH overhead, no sync
|
||||||
|
assert med < 1.5, f"within-interval median {med*1000:.0f}ms exceeds 1500ms"
|
||||||
@@ -124,8 +124,8 @@ def _install_modal_test_modules(
|
|||||||
sys.modules["tools.interrupt"] = types.SimpleNamespace(is_interrupted=lambda: False)
|
sys.modules["tools.interrupt"] = types.SimpleNamespace(is_interrupted=lambda: False)
|
||||||
sys.modules["tools.credential_files"] = types.SimpleNamespace(
|
sys.modules["tools.credential_files"] = types.SimpleNamespace(
|
||||||
get_credential_file_mounts=lambda: [],
|
get_credential_file_mounts=lambda: [],
|
||||||
iter_skills_files=lambda: [],
|
iter_skills_files=lambda **kw: [],
|
||||||
iter_cache_files=lambda: [],
|
iter_cache_files=lambda **kw: [],
|
||||||
)
|
)
|
||||||
|
|
||||||
from_id_calls: list[str] = []
|
from_id_calls: list[str] = []
|
||||||
|
|||||||
@@ -121,6 +121,10 @@ class TestSSHPreflight:
|
|||||||
called["count"] += 1
|
called["count"] += 1
|
||||||
|
|
||||||
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", _fake_establish)
|
monkeypatch.setattr(ssh_env.SSHEnvironment, "_establish_connection", _fake_establish)
|
||||||
|
monkeypatch.setattr(ssh_env.SSHEnvironment, "_detect_remote_home", lambda self: "/home/alice")
|
||||||
|
monkeypatch.setattr(ssh_env.SSHEnvironment, "_ensure_remote_dirs", lambda self: None)
|
||||||
|
monkeypatch.setattr(ssh_env.SSHEnvironment, "init_session", lambda self: None)
|
||||||
|
monkeypatch.setattr(ssh_env, "FileSyncManager", lambda **kw: type("M", (), {"sync": lambda self, **k: None})())
|
||||||
|
|
||||||
env = ssh_env.SSHEnvironment(host="example.com", user="alice")
|
env = ssh_env.SSHEnvironment(host="example.com", user="alice")
|
||||||
|
|
||||||
|
|||||||
@@ -43,8 +43,6 @@ def get_sandbox_dir() -> Path:
|
|||||||
# Shared constants and utilities
|
# Shared constants and utilities
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
_SYNC_INTERVAL_SECONDS = 5.0
|
|
||||||
|
|
||||||
|
|
||||||
def _pipe_stdin(proc: subprocess.Popen, data: str) -> None:
|
def _pipe_stdin(proc: subprocess.Popen, data: str) -> None:
|
||||||
"""Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks."""
|
"""Write *data* to proc.stdin on a daemon thread to avoid pipe-buffer deadlocks."""
|
||||||
@@ -246,9 +244,6 @@ class BaseEnvironment(ABC):
|
|||||||
self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt"
|
self._cwd_file = f"{temp_dir}/hermes-cwd-{self._session_id}.txt"
|
||||||
self._cwd_marker = _cwd_marker(self._session_id)
|
self._cwd_marker = _cwd_marker(self._session_id)
|
||||||
self._snapshot_ready = False
|
self._snapshot_ready = False
|
||||||
self._last_sync_time: float | None = (
|
|
||||||
None # set to 0 by backends that need file sync
|
|
||||||
)
|
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
# Abstract methods
|
# Abstract methods
|
||||||
@@ -477,22 +472,14 @@ class BaseEnvironment(ABC):
|
|||||||
# Hooks
|
# Hooks
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def _before_execute(self):
|
def _before_execute(self) -> None:
|
||||||
"""Rate-limited file sync before each command.
|
"""Hook called before each command execution.
|
||||||
|
|
||||||
Backends that need pre-command sync set ``self._last_sync_time = 0``
|
Remote backends (SSH, Modal, Daytona) override this to trigger
|
||||||
in ``__init__`` and override :meth:`_sync_files`. Backends needing
|
their FileSyncManager. Bind-mount backends (Docker, Singularity)
|
||||||
extra pre-exec logic (e.g. Daytona sandbox restart check) override
|
and Local don't need file sync — the host filesystem is directly
|
||||||
this method and call ``super()._before_execute()``.
|
visible inside the container/process.
|
||||||
"""
|
"""
|
||||||
if self._last_sync_time is not None:
|
|
||||||
now = time.monotonic()
|
|
||||||
if now - self._last_sync_time >= _SYNC_INTERVAL_SECONDS:
|
|
||||||
self._sync_files()
|
|
||||||
self._last_sync_time = now
|
|
||||||
|
|
||||||
def _sync_files(self):
|
|
||||||
"""Push files to remote environment. Called rate-limited by _before_execute."""
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|||||||
@@ -11,13 +11,12 @@ import shlex
|
|||||||
import threading
|
import threading
|
||||||
import warnings
|
import warnings
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Dict, Optional
|
|
||||||
|
|
||||||
from tools.environments.base import (
|
from tools.environments.base import (
|
||||||
BaseEnvironment,
|
BaseEnvironment,
|
||||||
_ThreadedProcessHandle,
|
_ThreadedProcessHandle,
|
||||||
_file_mtime_key,
|
|
||||||
)
|
)
|
||||||
|
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -61,7 +60,6 @@ class DaytonaEnvironment(BaseEnvironment):
|
|||||||
self._daytona = Daytona()
|
self._daytona = Daytona()
|
||||||
self._sandbox = None
|
self._sandbox = None
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._last_sync_time: float = 0
|
|
||||||
|
|
||||||
memory_gib = max(1, math.ceil(memory / 1024))
|
memory_gib = max(1, math.ceil(memory / 1024))
|
||||||
disk_gib = max(1, math.ceil(disk / 1024))
|
disk_gib = max(1, math.ceil(disk / 1024))
|
||||||
@@ -128,50 +126,40 @@ class DaytonaEnvironment(BaseEnvironment):
|
|||||||
pass
|
pass
|
||||||
logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
|
logger.info("Daytona: resolved home to %s, cwd to %s", self._remote_home, self.cwd)
|
||||||
|
|
||||||
self._synced_files: Dict[str, tuple] = {}
|
self._sync_manager = FileSyncManager(
|
||||||
self._sync_files()
|
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
|
||||||
|
upload_fn=self._daytona_upload,
|
||||||
|
delete_fn=self._daytona_delete,
|
||||||
|
)
|
||||||
|
self._sync_manager.sync(force=True)
|
||||||
self.init_session()
|
self.init_session()
|
||||||
|
|
||||||
def _upload_if_changed(self, host_path: str, remote_path: str) -> bool:
|
def _daytona_upload(self, host_path: str, remote_path: str) -> None:
|
||||||
file_key = _file_mtime_key(host_path)
|
"""Upload a single file via Daytona SDK."""
|
||||||
if file_key is None:
|
|
||||||
return False
|
|
||||||
if self._synced_files.get(remote_path) == file_key:
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
parent = str(Path(remote_path).parent)
|
parent = str(Path(remote_path).parent)
|
||||||
self._sandbox.process.exec(f"mkdir -p {parent}")
|
self._sandbox.process.exec(f"mkdir -p {parent}")
|
||||||
self._sandbox.fs.upload_file(host_path, remote_path)
|
self._sandbox.fs.upload_file(host_path, remote_path)
|
||||||
self._synced_files[remote_path] = file_key
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("Daytona: upload failed %s: %s", host_path, e)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _sync_files(self) -> None:
|
def _daytona_delete(self, remote_paths: list[str]) -> None:
|
||||||
container_base = f"{self._remote_home}/.hermes"
|
"""Batch-delete remote files via SDK exec."""
|
||||||
try:
|
self._sandbox.process.exec(quoted_rm_command(remote_paths))
|
||||||
from tools.credential_files import get_credential_file_mounts, iter_skills_files
|
|
||||||
for mount_entry in get_credential_file_mounts():
|
|
||||||
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
|
|
||||||
self._upload_if_changed(mount_entry["host_path"], remote_path)
|
|
||||||
for entry in iter_skills_files(container_base=container_base):
|
|
||||||
self._upload_if_changed(entry["host_path"], entry["container_path"])
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("Daytona: could not sync skills/credentials: %s", e)
|
|
||||||
|
|
||||||
def _ensure_sandbox_ready(self):
|
# ------------------------------------------------------------------
|
||||||
|
# Sandbox lifecycle
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _ensure_sandbox_ready(self) -> None:
|
||||||
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
|
"""Restart sandbox if it was stopped (e.g., by a previous interrupt)."""
|
||||||
self._sandbox.refresh_data()
|
self._sandbox.refresh_data()
|
||||||
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
|
if self._sandbox.state in (self._SandboxState.STOPPED, self._SandboxState.ARCHIVED):
|
||||||
self._sandbox.start()
|
self._sandbox.start()
|
||||||
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
|
logger.info("Daytona: restarted sandbox %s", self._sandbox.id)
|
||||||
|
|
||||||
def _before_execute(self):
|
def _before_execute(self) -> None:
|
||||||
"""Ensure sandbox is ready, then rate-limited file sync via base class."""
|
"""Ensure sandbox is ready, then sync files via FileSyncManager."""
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._ensure_sandbox_ready()
|
self._ensure_sandbox_ready()
|
||||||
super()._before_execute()
|
self._sync_manager.sync()
|
||||||
|
|
||||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||||
timeout: int = 120,
|
timeout: int = 120,
|
||||||
|
|||||||
150
tools/environments/file_sync.py
Normal file
150
tools/environments/file_sync.py
Normal file
@@ -0,0 +1,150 @@
|
|||||||
|
"""Shared file sync manager for remote execution backends.
|
||||||
|
|
||||||
|
Tracks local file changes via mtime+size, detects deletions, and
|
||||||
|
syncs to remote environments transactionally. Used by SSH, Modal,
|
||||||
|
and Daytona. Docker and Singularity use bind mounts (live host FS
|
||||||
|
view) and don't need this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import shlex
|
||||||
|
import time
|
||||||
|
from typing import Callable
|
||||||
|
|
||||||
|
from tools.environments.base import _file_mtime_key
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_SYNC_INTERVAL_SECONDS = 5.0
|
||||||
|
_FORCE_SYNC_ENV = "HERMES_FORCE_FILE_SYNC"
|
||||||
|
|
||||||
|
# Transport callbacks provided by each backend
|
||||||
|
UploadFn = Callable[[str, str], None] # (host_path, remote_path) -> raises on failure
|
||||||
|
DeleteFn = Callable[[list[str]], None] # (remote_paths) -> raises on failure
|
||||||
|
GetFilesFn = Callable[[], list[tuple[str, str]]] # () -> [(host_path, remote_path), ...]
|
||||||
|
|
||||||
|
|
||||||
|
def iter_sync_files(container_base: str = "/root/.hermes") -> list[tuple[str, str]]:
|
||||||
|
"""Enumerate all files that should be synced to a remote environment.
|
||||||
|
|
||||||
|
Combines credentials, skills, and cache into a single flat list of
|
||||||
|
(host_path, remote_path) pairs. Credential paths are remapped from
|
||||||
|
the hardcoded /root/.hermes to *container_base* because the remote
|
||||||
|
user's home may differ (e.g. /home/daytona, /home/user).
|
||||||
|
"""
|
||||||
|
# Late import: credential_files imports agent modules that create
|
||||||
|
# circular dependencies if loaded at file_sync module level.
|
||||||
|
from tools.credential_files import (
|
||||||
|
get_credential_file_mounts,
|
||||||
|
iter_cache_files,
|
||||||
|
iter_skills_files,
|
||||||
|
)
|
||||||
|
|
||||||
|
files: list[tuple[str, str]] = []
|
||||||
|
for entry in get_credential_file_mounts():
|
||||||
|
remote = entry["container_path"].replace(
|
||||||
|
"/root/.hermes", container_base, 1
|
||||||
|
)
|
||||||
|
files.append((entry["host_path"], remote))
|
||||||
|
for entry in iter_skills_files(container_base=container_base):
|
||||||
|
files.append((entry["host_path"], entry["container_path"]))
|
||||||
|
for entry in iter_cache_files(container_base=container_base):
|
||||||
|
files.append((entry["host_path"], entry["container_path"]))
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def quoted_rm_command(remote_paths: list[str]) -> str:
|
||||||
|
"""Build a shell ``rm -f`` command for a batch of remote paths."""
|
||||||
|
return "rm -f " + " ".join(shlex.quote(p) for p in remote_paths)
|
||||||
|
|
||||||
|
|
||||||
|
class FileSyncManager:
|
||||||
|
"""Tracks local file changes and syncs to a remote environment.
|
||||||
|
|
||||||
|
Backends instantiate this with transport callbacks (upload, delete)
|
||||||
|
and a file-source callable. The manager handles mtime-based change
|
||||||
|
detection, deletion tracking, rate limiting, and transactional state.
|
||||||
|
|
||||||
|
Not used by bind-mount backends (Docker, Singularity) — those get
|
||||||
|
live host FS views and don't need file sync.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
get_files_fn: GetFilesFn,
|
||||||
|
upload_fn: UploadFn,
|
||||||
|
delete_fn: DeleteFn,
|
||||||
|
sync_interval: float = _SYNC_INTERVAL_SECONDS,
|
||||||
|
):
|
||||||
|
self._get_files_fn = get_files_fn
|
||||||
|
self._upload_fn = upload_fn
|
||||||
|
self._delete_fn = delete_fn
|
||||||
|
self._synced_files: dict[str, tuple[float, int]] = {} # remote_path -> (mtime, size)
|
||||||
|
self._last_sync_time: float = 0.0 # monotonic; 0 ensures first sync runs
|
||||||
|
self._sync_interval = sync_interval
|
||||||
|
|
||||||
|
def sync(self, *, force: bool = False) -> None:
|
||||||
|
"""Run a sync cycle: upload changed files, delete removed files.
|
||||||
|
|
||||||
|
Rate-limited to once per ``sync_interval`` unless *force* is True
|
||||||
|
or ``HERMES_FORCE_FILE_SYNC=1`` is set.
|
||||||
|
|
||||||
|
Transactional: state only committed if ALL operations succeed.
|
||||||
|
On failure, state rolls back so the next cycle retries everything.
|
||||||
|
"""
|
||||||
|
if not force and not os.environ.get(_FORCE_SYNC_ENV):
|
||||||
|
now = time.monotonic()
|
||||||
|
if now - self._last_sync_time < self._sync_interval:
|
||||||
|
return
|
||||||
|
|
||||||
|
current_files = self._get_files_fn()
|
||||||
|
current_remote_paths = {remote for _, remote in current_files}
|
||||||
|
|
||||||
|
# --- Uploads: new or changed files ---
|
||||||
|
to_upload: list[tuple[str, str]] = []
|
||||||
|
new_files = dict(self._synced_files)
|
||||||
|
for host_path, remote_path in current_files:
|
||||||
|
file_key = _file_mtime_key(host_path)
|
||||||
|
if file_key is None:
|
||||||
|
continue
|
||||||
|
if self._synced_files.get(remote_path) == file_key:
|
||||||
|
continue
|
||||||
|
to_upload.append((host_path, remote_path))
|
||||||
|
new_files[remote_path] = file_key
|
||||||
|
|
||||||
|
# --- Deletes: synced paths no longer in current set ---
|
||||||
|
to_delete = [p for p in self._synced_files if p not in current_remote_paths]
|
||||||
|
|
||||||
|
if not to_upload and not to_delete:
|
||||||
|
self._last_sync_time = time.monotonic()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Snapshot for rollback (only when there's work to do)
|
||||||
|
prev_files = dict(self._synced_files)
|
||||||
|
|
||||||
|
if to_upload:
|
||||||
|
logger.debug("file_sync: uploading %d file(s)", len(to_upload))
|
||||||
|
if to_delete:
|
||||||
|
logger.debug("file_sync: deleting %d stale remote file(s)", len(to_delete))
|
||||||
|
|
||||||
|
try:
|
||||||
|
for host_path, remote_path in to_upload:
|
||||||
|
self._upload_fn(host_path, remote_path)
|
||||||
|
logger.debug("file_sync: uploaded %s -> %s", host_path, remote_path)
|
||||||
|
|
||||||
|
if to_delete:
|
||||||
|
self._delete_fn(to_delete)
|
||||||
|
logger.debug("file_sync: deleted %s", to_delete)
|
||||||
|
|
||||||
|
# --- Commit (all succeeded) ---
|
||||||
|
for p in to_delete:
|
||||||
|
new_files.pop(p, None)
|
||||||
|
|
||||||
|
self._synced_files = new_files
|
||||||
|
self._last_sync_time = time.monotonic()
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
self._synced_files = prev_files
|
||||||
|
self._last_sync_time = time.monotonic()
|
||||||
|
logger.warning("file_sync: sync failed, rolled back state: %s", exc)
|
||||||
@@ -9,16 +9,16 @@ import logging
|
|||||||
import shlex
|
import shlex
|
||||||
import threading
|
import threading
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
from hermes_constants import get_hermes_home
|
from hermes_constants import get_hermes_home
|
||||||
from tools.environments.base import (
|
from tools.environments.base import (
|
||||||
BaseEnvironment,
|
BaseEnvironment,
|
||||||
_ThreadedProcessHandle,
|
_ThreadedProcessHandle,
|
||||||
_file_mtime_key,
|
|
||||||
_load_json_store,
|
_load_json_store,
|
||||||
_save_json_store,
|
_save_json_store,
|
||||||
)
|
)
|
||||||
|
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -150,7 +150,7 @@ class ModalEnvironment(BaseEnvironment):
|
|||||||
image: str,
|
image: str,
|
||||||
cwd: str = "/root",
|
cwd: str = "/root",
|
||||||
timeout: int = 60,
|
timeout: int = 60,
|
||||||
modal_sandbox_kwargs: Optional[Dict[str, Any]] = None,
|
modal_sandbox_kwargs: Optional[dict[str, Any]] = None,
|
||||||
persistent_filesystem: bool = True,
|
persistent_filesystem: bool = True,
|
||||||
task_id: str = "default",
|
task_id: str = "default",
|
||||||
):
|
):
|
||||||
@@ -162,8 +162,7 @@ class ModalEnvironment(BaseEnvironment):
|
|||||||
self._sandbox = None
|
self._sandbox = None
|
||||||
self._app = None
|
self._app = None
|
||||||
self._worker = _AsyncWorker()
|
self._worker = _AsyncWorker()
|
||||||
self._synced_files: Dict[str, tuple] = {}
|
self._sync_manager: FileSyncManager | None = None # initialized after sandbox creation
|
||||||
self._last_sync_time: float = 0
|
|
||||||
|
|
||||||
sandbox_kwargs = dict(modal_sandbox_kwargs or {})
|
sandbox_kwargs = dict(modal_sandbox_kwargs or {})
|
||||||
|
|
||||||
@@ -256,26 +255,24 @@ class ModalEnvironment(BaseEnvironment):
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
logger.info("Modal: sandbox created (task=%s)", self._task_id)
|
logger.info("Modal: sandbox created (task=%s)", self._task_id)
|
||||||
|
|
||||||
|
self._sync_manager = FileSyncManager(
|
||||||
|
get_files_fn=lambda: iter_sync_files("/root/.hermes"),
|
||||||
|
upload_fn=self._modal_upload,
|
||||||
|
delete_fn=self._modal_delete,
|
||||||
|
)
|
||||||
|
self._sync_manager.sync(force=True)
|
||||||
self.init_session()
|
self.init_session()
|
||||||
|
|
||||||
def _push_file_to_sandbox(self, host_path: str, container_path: str) -> bool:
|
def _modal_upload(self, host_path: str, remote_path: str) -> None:
|
||||||
"""Push a single file into the sandbox if changed."""
|
"""Upload a single file via base64-over-exec."""
|
||||||
file_key = _file_mtime_key(host_path)
|
|
||||||
if file_key is None:
|
|
||||||
return False
|
|
||||||
if self._synced_files.get(container_path) == file_key:
|
|
||||||
return False
|
|
||||||
try:
|
|
||||||
content = Path(host_path).read_bytes()
|
|
||||||
except Exception:
|
|
||||||
return False
|
|
||||||
|
|
||||||
import base64
|
import base64
|
||||||
|
content = Path(host_path).read_bytes()
|
||||||
b64 = base64.b64encode(content).decode("ascii")
|
b64 = base64.b64encode(content).decode("ascii")
|
||||||
container_dir = str(Path(container_path).parent)
|
container_dir = str(Path(remote_path).parent)
|
||||||
cmd = (
|
cmd = (
|
||||||
f"mkdir -p {shlex.quote(container_dir)} && "
|
f"mkdir -p {shlex.quote(container_dir)} && "
|
||||||
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(container_path)}"
|
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(remote_path)}"
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _write():
|
async def _write():
|
||||||
@@ -283,25 +280,24 @@ class ModalEnvironment(BaseEnvironment):
|
|||||||
await proc.wait.aio()
|
await proc.wait.aio()
|
||||||
|
|
||||||
self._worker.run_coroutine(_write(), timeout=15)
|
self._worker.run_coroutine(_write(), timeout=15)
|
||||||
self._synced_files[container_path] = file_key
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _sync_files(self) -> None:
|
def _modal_delete(self, remote_paths: list[str]) -> None:
|
||||||
"""Push credential, skill, and cache files into the running sandbox."""
|
"""Batch-delete remote files via exec."""
|
||||||
try:
|
rm_cmd = quoted_rm_command(remote_paths)
|
||||||
from tools.credential_files import (
|
|
||||||
get_credential_file_mounts,
|
async def _rm():
|
||||||
iter_skills_files,
|
proc = await self._sandbox.exec.aio("bash", "-c", rm_cmd)
|
||||||
iter_cache_files,
|
await proc.wait.aio()
|
||||||
)
|
|
||||||
for entry in get_credential_file_mounts():
|
self._worker.run_coroutine(_rm(), timeout=15)
|
||||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
|
||||||
for entry in iter_skills_files():
|
def _before_execute(self) -> None:
|
||||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
"""Sync files to sandbox via FileSyncManager (rate-limited internally)."""
|
||||||
for entry in iter_cache_files():
|
self._sync_manager.sync()
|
||||||
self._push_file_to_sandbox(entry["host_path"], entry["container_path"])
|
|
||||||
except Exception as e:
|
# ------------------------------------------------------------------
|
||||||
logger.debug("Modal: file sync failed: %s", e)
|
# Execution
|
||||||
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||||
timeout: int = 120,
|
timeout: int = 120,
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import tempfile
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from tools.environments.base import BaseEnvironment, _popen_bash
|
from tools.environments.base import BaseEnvironment, _popen_bash
|
||||||
|
from tools.environments.file_sync import FileSyncManager, iter_sync_files, quoted_rm_command
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -43,8 +44,14 @@ class SSHEnvironment(BaseEnvironment):
|
|||||||
_ensure_ssh_available()
|
_ensure_ssh_available()
|
||||||
self._establish_connection()
|
self._establish_connection()
|
||||||
self._remote_home = self._detect_remote_home()
|
self._remote_home = self._detect_remote_home()
|
||||||
self._last_sync_time: float = 0 # guarantees first _before_execute syncs
|
|
||||||
self._sync_files()
|
self._ensure_remote_dirs()
|
||||||
|
self._sync_manager = FileSyncManager(
|
||||||
|
get_files_fn=lambda: iter_sync_files(f"{self._remote_home}/.hermes"),
|
||||||
|
upload_fn=self._scp_upload,
|
||||||
|
delete_fn=self._ssh_delete,
|
||||||
|
)
|
||||||
|
self._sync_manager.sync(force=True)
|
||||||
|
|
||||||
self.init_session()
|
self.init_session()
|
||||||
|
|
||||||
@@ -92,50 +99,53 @@ class SSHEnvironment(BaseEnvironment):
|
|||||||
return "/root"
|
return "/root"
|
||||||
return f"/home/{self.user}"
|
return f"/home/{self.user}"
|
||||||
|
|
||||||
def _sync_files(self) -> None:
|
# ------------------------------------------------------------------
|
||||||
"""Rsync skills directory and credential files to the remote host."""
|
# File sync (via FileSyncManager)
|
||||||
try:
|
# ------------------------------------------------------------------
|
||||||
container_base = f"{self._remote_home}/.hermes"
|
|
||||||
from tools.credential_files import get_credential_file_mounts, get_skills_directory_mount
|
|
||||||
|
|
||||||
rsync_base = ["rsync", "-az", "--timeout=30", "--safe-links"]
|
def _ensure_remote_dirs(self) -> None:
|
||||||
ssh_opts = f"ssh -o ControlPath={self.control_socket} -o ControlMaster=auto"
|
"""Create base ~/.hermes directory tree on remote in one SSH call."""
|
||||||
|
base = f"{self._remote_home}/.hermes"
|
||||||
|
dirs = [base, f"{base}/skills", f"{base}/credentials", f"{base}/cache"]
|
||||||
|
mkdir_cmd = "mkdir -p " + " ".join(shlex.quote(d) for d in dirs)
|
||||||
|
cmd = self._build_ssh_command()
|
||||||
|
cmd.append(mkdir_cmd)
|
||||||
|
subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
|
# _get_sync_files provided via iter_sync_files in FileSyncManager init
|
||||||
|
|
||||||
|
def _scp_upload(self, host_path: str, remote_path: str) -> None:
|
||||||
|
"""Upload a single file via scp over ControlMaster."""
|
||||||
|
parent = str(Path(remote_path).parent)
|
||||||
|
mkdir_cmd = self._build_ssh_command()
|
||||||
|
mkdir_cmd.append(f"mkdir -p {shlex.quote(parent)}")
|
||||||
|
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
||||||
|
|
||||||
|
scp_cmd = ["scp", "-o", f"ControlPath={self.control_socket}"]
|
||||||
if self.port != 22:
|
if self.port != 22:
|
||||||
ssh_opts += f" -p {self.port}"
|
scp_cmd.extend(["-P", str(self.port)])
|
||||||
if self.key_path:
|
if self.key_path:
|
||||||
ssh_opts += f" -i {self.key_path}"
|
scp_cmd.extend(["-i", self.key_path])
|
||||||
rsync_base.extend(["-e", ssh_opts])
|
scp_cmd.extend([host_path, f"{self.user}@{self.host}:{remote_path}"])
|
||||||
dest_prefix = f"{self.user}@{self.host}"
|
result = subprocess.run(scp_cmd, capture_output=True, text=True, timeout=30)
|
||||||
|
if result.returncode != 0:
|
||||||
|
raise RuntimeError(f"scp failed: {result.stderr.strip()}")
|
||||||
|
|
||||||
for mount_entry in get_credential_file_mounts():
|
def _ssh_delete(self, remote_paths: list[str]) -> None:
|
||||||
remote_path = mount_entry["container_path"].replace("/root/.hermes", container_base, 1)
|
"""Batch-delete remote files in one SSH call."""
|
||||||
parent_dir = str(Path(remote_path).parent)
|
cmd = self._build_ssh_command()
|
||||||
mkdir_cmd = self._build_ssh_command()
|
cmd.append(quoted_rm_command(remote_paths))
|
||||||
mkdir_cmd.append(f"mkdir -p {parent_dir}")
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
|
||||||
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
if result.returncode != 0:
|
||||||
cmd = rsync_base + [mount_entry["host_path"], f"{dest_prefix}:{remote_path}"]
|
raise RuntimeError(f"remote rm failed: {result.stderr.strip()}")
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
||||||
if result.returncode == 0:
|
|
||||||
logger.info("SSH: synced credential %s -> %s", mount_entry["host_path"], remote_path)
|
|
||||||
else:
|
|
||||||
logger.debug("SSH: rsync credential failed: %s", result.stderr.strip())
|
|
||||||
|
|
||||||
for skills_mount in get_skills_directory_mount(container_base=container_base):
|
def _before_execute(self) -> None:
|
||||||
remote_path = skills_mount["container_path"]
|
"""Sync files to remote via FileSyncManager (rate-limited internally)."""
|
||||||
mkdir_cmd = self._build_ssh_command()
|
self._sync_manager.sync()
|
||||||
mkdir_cmd.append(f"mkdir -p {remote_path}")
|
|
||||||
subprocess.run(mkdir_cmd, capture_output=True, text=True, timeout=10)
|
# ------------------------------------------------------------------
|
||||||
cmd = rsync_base + [
|
# Execution
|
||||||
skills_mount["host_path"].rstrip("/") + "/",
|
# ------------------------------------------------------------------
|
||||||
f"{dest_prefix}:{remote_path}/",
|
|
||||||
]
|
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
|
|
||||||
if result.returncode == 0:
|
|
||||||
logger.info("SSH: synced skills dir %s -> %s", skills_mount["host_path"], remote_path)
|
|
||||||
else:
|
|
||||||
logger.debug("SSH: rsync skills dir failed: %s", result.stderr.strip())
|
|
||||||
except Exception as e:
|
|
||||||
logger.debug("SSH: could not sync skills/credentials: %s", e)
|
|
||||||
|
|
||||||
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
def _run_bash(self, cmd_string: str, *, login: bool = False,
|
||||||
timeout: int = 120,
|
timeout: int = 120,
|
||||||
|
|||||||
Reference in New Issue
Block a user