mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-30 16:01:49 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
74d4944010 |
@@ -1,9 +1,6 @@
|
|||||||
"""Tests for tools/checkpoint_manager.py — CheckpointManager."""
|
"""Tests for tools/checkpoint_manager.py — CheckpointManager."""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import json
|
|
||||||
import shutil
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import pytest
|
import pytest
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -42,6 +39,19 @@ def checkpoint_base(tmp_path):
|
|||||||
return tmp_path / "checkpoints"
|
return tmp_path / "checkpoints"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture()
|
||||||
|
def fake_home(tmp_path, monkeypatch):
|
||||||
|
"""Set a deterministic fake home for expanduser/path-home behavior."""
|
||||||
|
home = tmp_path / "home"
|
||||||
|
home.mkdir()
|
||||||
|
monkeypatch.setenv("HOME", str(home))
|
||||||
|
monkeypatch.setenv("USERPROFILE", str(home))
|
||||||
|
monkeypatch.delenv("HOMEDRIVE", raising=False)
|
||||||
|
monkeypatch.delenv("HOMEPATH", raising=False)
|
||||||
|
monkeypatch.setattr(Path, "home", classmethod(lambda cls: home))
|
||||||
|
return home
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
def mgr(work_dir, checkpoint_base, monkeypatch):
|
def mgr(work_dir, checkpoint_base, monkeypatch):
|
||||||
"""CheckpointManager with redirected checkpoint base."""
|
"""CheckpointManager with redirected checkpoint base."""
|
||||||
@@ -78,6 +88,16 @@ class TestShadowRepoPath:
|
|||||||
p = _shadow_repo_path(str(work_dir))
|
p = _shadow_repo_path(str(work_dir))
|
||||||
assert str(p).startswith(str(checkpoint_base))
|
assert str(p).startswith(str(checkpoint_base))
|
||||||
|
|
||||||
|
def test_tilde_and_expanded_home_share_shadow_repo(self, fake_home, checkpoint_base, monkeypatch):
|
||||||
|
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
|
||||||
|
project = fake_home / "project"
|
||||||
|
project.mkdir()
|
||||||
|
|
||||||
|
tilde_path = f"~/{project.name}"
|
||||||
|
expanded_path = str(project)
|
||||||
|
|
||||||
|
assert _shadow_repo_path(tilde_path) == _shadow_repo_path(expanded_path)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# Shadow repo init
|
# Shadow repo init
|
||||||
@@ -221,6 +241,20 @@ class TestListCheckpoints:
|
|||||||
assert result[0]["reason"] == "third"
|
assert result[0]["reason"] == "third"
|
||||||
assert result[2]["reason"] == "first"
|
assert result[2]["reason"] == "first"
|
||||||
|
|
||||||
|
def test_tilde_path_lists_same_checkpoints_as_expanded_path(self, checkpoint_base, fake_home, monkeypatch):
|
||||||
|
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
|
||||||
|
mgr = CheckpointManager(enabled=True, max_snapshots=50)
|
||||||
|
project = fake_home / "project"
|
||||||
|
project.mkdir()
|
||||||
|
(project / "main.py").write_text("v1\n")
|
||||||
|
|
||||||
|
tilde_path = f"~/{project.name}"
|
||||||
|
assert mgr.ensure_checkpoint(tilde_path, "initial") is True
|
||||||
|
|
||||||
|
listed = mgr.list_checkpoints(str(project))
|
||||||
|
assert len(listed) == 1
|
||||||
|
assert listed[0]["reason"] == "initial"
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# CheckpointManager — restoring
|
# CheckpointManager — restoring
|
||||||
@@ -271,6 +305,28 @@ class TestRestore:
|
|||||||
assert len(all_cps) >= 2
|
assert len(all_cps) >= 2
|
||||||
assert "pre-rollback" in all_cps[0]["reason"]
|
assert "pre-rollback" in all_cps[0]["reason"]
|
||||||
|
|
||||||
|
def test_tilde_path_supports_diff_and_restore_flow(self, checkpoint_base, fake_home, monkeypatch):
|
||||||
|
monkeypatch.setattr("tools.checkpoint_manager.CHECKPOINT_BASE", checkpoint_base)
|
||||||
|
mgr = CheckpointManager(enabled=True, max_snapshots=50)
|
||||||
|
project = fake_home / "project"
|
||||||
|
project.mkdir()
|
||||||
|
file_path = project / "main.py"
|
||||||
|
file_path.write_text("original\n")
|
||||||
|
|
||||||
|
tilde_path = f"~/{project.name}"
|
||||||
|
assert mgr.ensure_checkpoint(tilde_path, "initial") is True
|
||||||
|
mgr.new_turn()
|
||||||
|
|
||||||
|
file_path.write_text("changed\n")
|
||||||
|
checkpoints = mgr.list_checkpoints(str(project))
|
||||||
|
diff_result = mgr.diff(tilde_path, checkpoints[0]["hash"])
|
||||||
|
assert diff_result["success"] is True
|
||||||
|
assert "main.py" in diff_result["diff"]
|
||||||
|
|
||||||
|
restore_result = mgr.restore(tilde_path, checkpoints[0]["hash"])
|
||||||
|
assert restore_result["success"] is True
|
||||||
|
assert file_path.read_text() == "original\n"
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# CheckpointManager — working dir resolution
|
# CheckpointManager — working dir resolution
|
||||||
@@ -310,6 +366,19 @@ class TestWorkingDirResolution:
|
|||||||
result = mgr.get_working_dir_for_path(str(filepath))
|
result = mgr.get_working_dir_for_path(str(filepath))
|
||||||
assert result == str(filepath.parent)
|
assert result == str(filepath.parent)
|
||||||
|
|
||||||
|
def test_resolves_tilde_path_to_project_root(self, fake_home):
|
||||||
|
mgr = CheckpointManager(enabled=True)
|
||||||
|
project = fake_home / "myproject"
|
||||||
|
project.mkdir()
|
||||||
|
(project / "pyproject.toml").write_text("[project]\n")
|
||||||
|
subdir = project / "src"
|
||||||
|
subdir.mkdir()
|
||||||
|
filepath = subdir / "main.py"
|
||||||
|
filepath.write_text("x\n")
|
||||||
|
|
||||||
|
result = mgr.get_working_dir_for_path(f"~/{project.name}/src/main.py")
|
||||||
|
assert result == str(project)
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# Git env isolation
|
# Git env isolation
|
||||||
@@ -333,6 +402,14 @@ class TestGitEnvIsolation:
|
|||||||
env = _git_env(shadow, str(tmp_path))
|
env = _git_env(shadow, str(tmp_path))
|
||||||
assert "GIT_INDEX_FILE" not in env
|
assert "GIT_INDEX_FILE" not in env
|
||||||
|
|
||||||
|
def test_expands_tilde_in_work_tree(self, fake_home, tmp_path):
|
||||||
|
shadow = tmp_path / "shadow"
|
||||||
|
work = fake_home / "work"
|
||||||
|
work.mkdir()
|
||||||
|
|
||||||
|
env = _git_env(shadow, f"~/{work.name}")
|
||||||
|
assert env["GIT_WORK_TREE"] == str(work.resolve())
|
||||||
|
|
||||||
|
|
||||||
# =========================================================================
|
# =========================================================================
|
||||||
# format_checkpoint_list
|
# format_checkpoint_list
|
||||||
@@ -384,6 +461,8 @@ class TestErrorResilience:
|
|||||||
assert result is False
|
assert result is False
|
||||||
|
|
||||||
def test_run_git_allows_expected_nonzero_without_error_log(self, tmp_path, caplog):
|
def test_run_git_allows_expected_nonzero_without_error_log(self, tmp_path, caplog):
|
||||||
|
work = tmp_path / "work"
|
||||||
|
work.mkdir()
|
||||||
completed = subprocess.CompletedProcess(
|
completed = subprocess.CompletedProcess(
|
||||||
args=["git", "diff", "--cached", "--quiet"],
|
args=["git", "diff", "--cached", "--quiet"],
|
||||||
returncode=1,
|
returncode=1,
|
||||||
@@ -395,7 +474,7 @@ class TestErrorResilience:
|
|||||||
ok, stdout, stderr = _run_git(
|
ok, stdout, stderr = _run_git(
|
||||||
["diff", "--cached", "--quiet"],
|
["diff", "--cached", "--quiet"],
|
||||||
tmp_path / "shadow",
|
tmp_path / "shadow",
|
||||||
str(tmp_path / "work"),
|
str(work),
|
||||||
allowed_returncodes={1},
|
allowed_returncodes={1},
|
||||||
)
|
)
|
||||||
assert ok is False
|
assert ok is False
|
||||||
@@ -403,6 +482,38 @@ class TestErrorResilience:
|
|||||||
assert stderr == ""
|
assert stderr == ""
|
||||||
assert not caplog.records
|
assert not caplog.records
|
||||||
|
|
||||||
|
def test_run_git_invalid_working_dir_reports_path_error(self, tmp_path, caplog):
|
||||||
|
missing = tmp_path / "missing"
|
||||||
|
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
|
||||||
|
ok, stdout, stderr = _run_git(
|
||||||
|
["status"],
|
||||||
|
tmp_path / "shadow",
|
||||||
|
str(missing),
|
||||||
|
)
|
||||||
|
assert ok is False
|
||||||
|
assert stdout == ""
|
||||||
|
assert "working directory not found" in stderr
|
||||||
|
assert not any("Git executable not found" in r.getMessage() for r in caplog.records)
|
||||||
|
|
||||||
|
def test_run_git_missing_git_reports_git_not_found(self, tmp_path, monkeypatch, caplog):
|
||||||
|
work = tmp_path / "work"
|
||||||
|
work.mkdir()
|
||||||
|
|
||||||
|
def raise_missing_git(*args, **kwargs):
|
||||||
|
raise FileNotFoundError(2, "No such file or directory", "git")
|
||||||
|
|
||||||
|
monkeypatch.setattr("tools.checkpoint_manager.subprocess.run", raise_missing_git)
|
||||||
|
with caplog.at_level(logging.ERROR, logger="tools.checkpoint_manager"):
|
||||||
|
ok, stdout, stderr = _run_git(
|
||||||
|
["status"],
|
||||||
|
tmp_path / "shadow",
|
||||||
|
str(work),
|
||||||
|
)
|
||||||
|
assert ok is False
|
||||||
|
assert stdout == ""
|
||||||
|
assert stderr == "git not found"
|
||||||
|
assert any("Git executable not found" in r.getMessage() for r in caplog.records)
|
||||||
|
|
||||||
def test_checkpoint_failure_does_not_raise(self, mgr, work_dir, monkeypatch):
|
def test_checkpoint_failure_does_not_raise(self, mgr, work_dir, monkeypatch):
|
||||||
"""Checkpoint failures should never raise — they're silently logged."""
|
"""Checkpoint failures should never raise — they're silently logged."""
|
||||||
def broken_run_git(*args, **kwargs):
|
def broken_run_git(*args, **kwargs):
|
||||||
|
|||||||
@@ -100,7 +100,7 @@ def _validate_file_path(file_path: str, working_dir: str) -> Optional[str]:
|
|||||||
if os.path.isabs(file_path):
|
if os.path.isabs(file_path):
|
||||||
return f"File path must be relative, got absolute path: {file_path!r}"
|
return f"File path must be relative, got absolute path: {file_path!r}"
|
||||||
# Resolve and check containment within working_dir
|
# Resolve and check containment within working_dir
|
||||||
abs_workdir = Path(working_dir).resolve()
|
abs_workdir = _normalize_path(working_dir)
|
||||||
resolved = (abs_workdir / file_path).resolve()
|
resolved = (abs_workdir / file_path).resolve()
|
||||||
try:
|
try:
|
||||||
resolved.relative_to(abs_workdir)
|
resolved.relative_to(abs_workdir)
|
||||||
@@ -113,18 +113,24 @@ def _validate_file_path(file_path: str, working_dir: str) -> Optional[str]:
|
|||||||
# Shadow repo helpers
|
# Shadow repo helpers
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def _normalize_path(path_value: str) -> Path:
|
||||||
|
"""Return a canonical absolute path for checkpoint operations."""
|
||||||
|
return Path(path_value).expanduser().resolve()
|
||||||
|
|
||||||
|
|
||||||
def _shadow_repo_path(working_dir: str) -> Path:
|
def _shadow_repo_path(working_dir: str) -> Path:
|
||||||
"""Deterministic shadow repo path: sha256(abs_path)[:16]."""
|
"""Deterministic shadow repo path: sha256(abs_path)[:16]."""
|
||||||
abs_path = str(Path(working_dir).resolve())
|
abs_path = str(_normalize_path(working_dir))
|
||||||
dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
|
dir_hash = hashlib.sha256(abs_path.encode()).hexdigest()[:16]
|
||||||
return CHECKPOINT_BASE / dir_hash
|
return CHECKPOINT_BASE / dir_hash
|
||||||
|
|
||||||
|
|
||||||
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
|
def _git_env(shadow_repo: Path, working_dir: str) -> dict:
|
||||||
"""Build env dict that redirects git to the shadow repo."""
|
"""Build env dict that redirects git to the shadow repo."""
|
||||||
|
normalized_working_dir = _normalize_path(working_dir)
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
env["GIT_DIR"] = str(shadow_repo)
|
env["GIT_DIR"] = str(shadow_repo)
|
||||||
env["GIT_WORK_TREE"] = str(Path(working_dir).resolve())
|
env["GIT_WORK_TREE"] = str(normalized_working_dir)
|
||||||
env.pop("GIT_INDEX_FILE", None)
|
env.pop("GIT_INDEX_FILE", None)
|
||||||
env.pop("GIT_NAMESPACE", None)
|
env.pop("GIT_NAMESPACE", None)
|
||||||
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
|
env.pop("GIT_ALTERNATE_OBJECT_DIRECTORIES", None)
|
||||||
@@ -144,7 +150,17 @@ def _run_git(
|
|||||||
exits while preserving the normal ``ok = (returncode == 0)`` contract.
|
exits while preserving the normal ``ok = (returncode == 0)`` contract.
|
||||||
Example: ``git diff --cached --quiet`` returns 1 when changes exist.
|
Example: ``git diff --cached --quiet`` returns 1 when changes exist.
|
||||||
"""
|
"""
|
||||||
env = _git_env(shadow_repo, working_dir)
|
normalized_working_dir = _normalize_path(working_dir)
|
||||||
|
if not normalized_working_dir.exists():
|
||||||
|
msg = f"working directory not found: {normalized_working_dir}"
|
||||||
|
logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
|
||||||
|
return False, "", msg
|
||||||
|
if not normalized_working_dir.is_dir():
|
||||||
|
msg = f"working directory is not a directory: {normalized_working_dir}"
|
||||||
|
logger.error("Git command skipped: %s (%s)", " ".join(["git"] + list(args)), msg)
|
||||||
|
return False, "", msg
|
||||||
|
|
||||||
|
env = _git_env(shadow_repo, str(normalized_working_dir))
|
||||||
cmd = ["git"] + list(args)
|
cmd = ["git"] + list(args)
|
||||||
allowed_returncodes = allowed_returncodes or set()
|
allowed_returncodes = allowed_returncodes or set()
|
||||||
try:
|
try:
|
||||||
@@ -154,7 +170,7 @@ def _run_git(
|
|||||||
text=True,
|
text=True,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
env=env,
|
env=env,
|
||||||
cwd=str(Path(working_dir).resolve()),
|
cwd=str(normalized_working_dir),
|
||||||
)
|
)
|
||||||
ok = result.returncode == 0
|
ok = result.returncode == 0
|
||||||
stdout = result.stdout.strip()
|
stdout = result.stdout.strip()
|
||||||
@@ -169,9 +185,14 @@ def _run_git(
|
|||||||
msg = f"git timed out after {timeout}s: {' '.join(cmd)}"
|
msg = f"git timed out after {timeout}s: {' '.join(cmd)}"
|
||||||
logger.error(msg, exc_info=True)
|
logger.error(msg, exc_info=True)
|
||||||
return False, "", msg
|
return False, "", msg
|
||||||
except FileNotFoundError:
|
except FileNotFoundError as exc:
|
||||||
|
missing_target = getattr(exc, "filename", None)
|
||||||
|
if missing_target == "git":
|
||||||
logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
|
logger.error("Git executable not found: %s", " ".join(cmd), exc_info=True)
|
||||||
return False, "", "git not found"
|
return False, "", "git not found"
|
||||||
|
msg = f"working directory not found: {normalized_working_dir}"
|
||||||
|
logger.error("Git command failed before execution: %s (%s)", " ".join(cmd), msg, exc_info=True)
|
||||||
|
return False, "", msg
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error("Unexpected git error running %s: %s", " ".join(cmd), exc, exc_info=True)
|
logger.error("Unexpected git error running %s: %s", " ".join(cmd), exc, exc_info=True)
|
||||||
return False, "", str(exc)
|
return False, "", str(exc)
|
||||||
@@ -198,7 +219,7 @@ def _init_shadow_repo(shadow_repo: Path, working_dir: str) -> Optional[str]:
|
|||||||
)
|
)
|
||||||
|
|
||||||
(shadow_repo / "HERMES_WORKDIR").write_text(
|
(shadow_repo / "HERMES_WORKDIR").write_text(
|
||||||
str(Path(working_dir).resolve()) + "\n", encoding="utf-8"
|
str(_normalize_path(working_dir)) + "\n", encoding="utf-8"
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
|
logger.debug("Initialised checkpoint repo at %s for %s", shadow_repo, working_dir)
|
||||||
@@ -273,7 +294,7 @@ class CheckpointManager:
|
|||||||
if not self._git_available:
|
if not self._git_available:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
abs_dir = str(Path(working_dir).resolve())
|
abs_dir = str(_normalize_path(working_dir))
|
||||||
|
|
||||||
# Skip root, home, and other overly broad directories
|
# Skip root, home, and other overly broad directories
|
||||||
if abs_dir in ("/", str(Path.home())):
|
if abs_dir in ("/", str(Path.home())):
|
||||||
@@ -298,7 +319,7 @@ class CheckpointManager:
|
|||||||
Returns a list of dicts with keys: hash, short_hash, timestamp, reason,
|
Returns a list of dicts with keys: hash, short_hash, timestamp, reason,
|
||||||
files_changed, insertions, deletions. Most recent first.
|
files_changed, insertions, deletions. Most recent first.
|
||||||
"""
|
"""
|
||||||
abs_dir = str(Path(working_dir).resolve())
|
abs_dir = str(_normalize_path(working_dir))
|
||||||
shadow = _shadow_repo_path(abs_dir)
|
shadow = _shadow_repo_path(abs_dir)
|
||||||
|
|
||||||
if not (shadow / "HEAD").exists():
|
if not (shadow / "HEAD").exists():
|
||||||
@@ -360,7 +381,7 @@ class CheckpointManager:
|
|||||||
if hash_err:
|
if hash_err:
|
||||||
return {"success": False, "error": hash_err}
|
return {"success": False, "error": hash_err}
|
||||||
|
|
||||||
abs_dir = str(Path(working_dir).resolve())
|
abs_dir = str(_normalize_path(working_dir))
|
||||||
shadow = _shadow_repo_path(abs_dir)
|
shadow = _shadow_repo_path(abs_dir)
|
||||||
|
|
||||||
if not (shadow / "HEAD").exists():
|
if not (shadow / "HEAD").exists():
|
||||||
@@ -418,7 +439,7 @@ class CheckpointManager:
|
|||||||
if hash_err:
|
if hash_err:
|
||||||
return {"success": False, "error": hash_err}
|
return {"success": False, "error": hash_err}
|
||||||
|
|
||||||
abs_dir = str(Path(working_dir).resolve())
|
abs_dir = str(_normalize_path(working_dir))
|
||||||
|
|
||||||
# Validate file_path to prevent path traversal outside the working dir
|
# Validate file_path to prevent path traversal outside the working dir
|
||||||
if file_path:
|
if file_path:
|
||||||
@@ -474,7 +495,7 @@ class CheckpointManager:
|
|||||||
(directory containing .git, pyproject.toml, package.json, etc.).
|
(directory containing .git, pyproject.toml, package.json, etc.).
|
||||||
Falls back to the file's parent directory.
|
Falls back to the file's parent directory.
|
||||||
"""
|
"""
|
||||||
path = Path(file_path).resolve()
|
path = _normalize_path(file_path)
|
||||||
if path.is_dir():
|
if path.is_dir():
|
||||||
candidate = path
|
candidate = path
|
||||||
else:
|
else:
|
||||||
|
|||||||
Reference in New Issue
Block a user