mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-14 06:09:11 +08:00
Compare commits
1 Commits
dependabot
...
fix/worktr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4a5919f25 |
187
cli.py
187
cli.py
@@ -1267,6 +1267,11 @@ def _setup_worktree(repo_root: str = None) -> Optional[Dict[str, str]]:
|
||||
print(f"\033[31m✗ Failed to create worktree: {e}\033[0m")
|
||||
return None
|
||||
|
||||
# Lock the worktree so concurrent/later hermes processes' pruning
|
||||
# leaves this session's work alone (locks survive crashes too).
|
||||
# Lock failure is non-fatal — _lock_worktree logs at debug level.
|
||||
_lock_worktree(repo_root, str(wt_path))
|
||||
|
||||
# Copy files listed in .worktreeinclude (gitignored files the agent needs)
|
||||
include_file = Path(repo_root) / ".worktreeinclude"
|
||||
if include_file.exists():
|
||||
@@ -1377,13 +1382,109 @@ def _worktree_has_unpushed_commits(worktree_path: str, timeout: int = 10) -> boo
|
||||
return True
|
||||
|
||||
|
||||
def _cleanup_worktree(info: Dict[str, str] = None) -> None:
|
||||
"""Remove a worktree and its branch on exit.
|
||||
def _lock_worktree(repo_root: str, wt_path: str, timeout: int = 10) -> bool:
|
||||
"""Lock a worktree using git's native lock mechanism.
|
||||
|
||||
Preserves the worktree only if it has unpushed commits (real work
|
||||
that hasn't been pushed to any remote). Uncommitted changes alone
|
||||
(untracked files, test artifacts) are not enough to keep it — agent
|
||||
work lives in commits/PRs, not the working tree.
|
||||
The lock marks the worktree as in-use by a live (or crashed) hermes
|
||||
session so that other hermes processes' pruning leaves it alone.
|
||||
Never raises; returns whether the lock was taken.
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "lock",
|
||||
"--reason", f"hermes session pid={os.getpid()}", str(wt_path)],
|
||||
capture_output=True, text=True, timeout=timeout, cwd=repo_root,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.debug(
|
||||
"Failed to lock worktree %s: %s", wt_path, result.stderr.strip()
|
||||
)
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug("Failed to lock worktree %s: %s", wt_path, e)
|
||||
return False
|
||||
|
||||
|
||||
def _unlock_worktree(repo_root: str, wt_path: str, timeout: int = 10) -> bool:
|
||||
"""Release a git worktree lock. Never raises."""
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "unlock", str(wt_path)],
|
||||
capture_output=True, text=True, timeout=timeout, cwd=repo_root,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
logger.debug(
|
||||
"Failed to unlock worktree %s: %s", wt_path, result.stderr.strip()
|
||||
)
|
||||
return False
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.debug("Failed to unlock worktree %s: %s", wt_path, e)
|
||||
return False
|
||||
|
||||
|
||||
def _worktree_is_locked(repo_root: str, wt_path: str, timeout: int = 10) -> bool:
|
||||
"""Return whether a worktree is locked (per ``git worktree list --porcelain``).
|
||||
|
||||
Fails SAFE: on any error (bad repo_root, git failure, timeout) returns
|
||||
True so callers treat the worktree as in-use and do not delete it.
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "list", "--porcelain"],
|
||||
capture_output=True, text=True, timeout=timeout, cwd=repo_root,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return True
|
||||
target = Path(wt_path).resolve()
|
||||
current_path: Optional[Path] = None
|
||||
for line in result.stdout.splitlines():
|
||||
if line.startswith("worktree "):
|
||||
current_path = Path(line[len("worktree "):].strip()).resolve()
|
||||
elif line == "locked" or line.startswith("locked "):
|
||||
if current_path == target:
|
||||
return True
|
||||
return False
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def _worktree_is_dirty(wt_path: str, timeout: int = 10) -> bool:
|
||||
"""Return whether a worktree has uncommitted changes (staged, unstaged,
|
||||
or untracked).
|
||||
|
||||
Fails SAFE: on any error returns True so callers do not delete a
|
||||
worktree whose state they cannot determine.
|
||||
"""
|
||||
import subprocess
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "status", "--porcelain"],
|
||||
capture_output=True, text=True, timeout=timeout, cwd=wt_path,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return True
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def _cleanup_worktree(info: Dict[str, str] = None) -> None:
|
||||
"""Remove a worktree and its branch on graceful exit.
|
||||
|
||||
Preserves the worktree (along with its branch and lock) if it has
|
||||
unpushed commits OR uncommitted changes — either may be work the user
|
||||
has not retrieved yet. Only clean, fully-pushed worktrees are
|
||||
removed, and the branch is only deleted after ``git worktree remove``
|
||||
actually succeeded.
|
||||
"""
|
||||
global _active_worktree
|
||||
info = info or _active_worktree
|
||||
@@ -1400,24 +1501,41 @@ def _cleanup_worktree(info: Dict[str, str] = None) -> None:
|
||||
return
|
||||
|
||||
has_unpushed = _worktree_has_unpushed_commits(wt_path, timeout=10)
|
||||
is_dirty = _worktree_is_dirty(wt_path)
|
||||
|
||||
if has_unpushed:
|
||||
print(f"\n\033[33m⚠ Worktree has unpushed commits, keeping: {wt_path}\033[0m")
|
||||
print(f" To clean up manually: git worktree remove --force {wt_path}")
|
||||
if has_unpushed or is_dirty:
|
||||
reason = "unpushed commits" if has_unpushed else "uncommitted changes"
|
||||
print(f"\n\033[33m⚠ Worktree has {reason}, keeping: {wt_path}\033[0m")
|
||||
print(f" To clean up manually: git worktree unlock {wt_path}")
|
||||
print(f" then: git worktree remove --force {wt_path}")
|
||||
_active_worktree = None
|
||||
return
|
||||
|
||||
# Remove worktree (even if working tree is dirty — uncommitted
|
||||
# changes without unpushed commits are just artifacts)
|
||||
# Clean and fully pushed — release our lock, then remove.
|
||||
_unlock_worktree(repo_root, wt_path)
|
||||
|
||||
removed = False
|
||||
try:
|
||||
subprocess.run(
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "remove", wt_path, "--force"],
|
||||
capture_output=True, text=True, timeout=15, cwd=repo_root,
|
||||
)
|
||||
removed = result.returncode == 0
|
||||
if not removed:
|
||||
logger.debug(
|
||||
"Failed to remove worktree %s: %s", wt_path, result.stderr.strip()
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to remove worktree: %s", e)
|
||||
|
||||
# Delete the branch
|
||||
if not removed:
|
||||
# Removal failed — keep the branch so the commits stay reachable.
|
||||
print(f"\033[33m⚠ Could not remove worktree, keeping it (and branch "
|
||||
f"{branch}): {wt_path}\033[0m")
|
||||
_active_worktree = None
|
||||
return
|
||||
|
||||
# Delete the branch only now that the worktree is actually gone.
|
||||
try:
|
||||
subprocess.run(
|
||||
["git", "branch", "-D", branch],
|
||||
@@ -1511,10 +1629,14 @@ def _run_checkpoint_auto_maintenance() -> None:
|
||||
def _prune_stale_worktrees(repo_root: str, max_age_hours: int = 24) -> None:
|
||||
"""Remove stale worktrees and orphaned branches on startup.
|
||||
|
||||
Age-based tiers:
|
||||
Pruning may only ever delete clean, unlocked, fully-pushed worktrees:
|
||||
- Under max_age_hours (24h): skip — session may still be active.
|
||||
- 24h–72h: remove if no unpushed commits.
|
||||
- Over 72h: force remove regardless (nothing should sit this long).
|
||||
- Locked (a live or crashed hermes session): skip at ANY age.
|
||||
- Dirty working tree (uncommitted changes): skip at ANY age.
|
||||
- Unpushed commits: skip at ANY age.
|
||||
|
||||
The branch is only deleted after ``git worktree remove`` actually
|
||||
succeeded, so commits never lose their easy reachability.
|
||||
|
||||
Also prunes orphaned ``hermes/*`` and ``pr-*`` local branches that
|
||||
have no corresponding worktree.
|
||||
@@ -1529,7 +1651,6 @@ def _prune_stale_worktrees(repo_root: str, max_age_hours: int = 24) -> None:
|
||||
|
||||
now = time.time()
|
||||
soft_cutoff = now - (max_age_hours * 3600) # 24h default
|
||||
hard_cutoff = now - (max_age_hours * 3 * 3600) # 72h default
|
||||
|
||||
for entry in worktrees_dir.iterdir():
|
||||
if not entry.is_dir() or not entry.name.startswith("hermes-"):
|
||||
@@ -1543,14 +1664,22 @@ def _prune_stale_worktrees(repo_root: str, max_age_hours: int = 24) -> None:
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
force = mtime <= hard_cutoff # Over 72h — force remove
|
||||
# A lock means a session (live, or crashed mid-work) owns this
|
||||
# worktree — never touch it, regardless of age.
|
||||
if _worktree_is_locked(repo_root, str(entry)):
|
||||
logger.debug("Skipping locked worktree: %s", entry.name)
|
||||
continue
|
||||
|
||||
if not force:
|
||||
# 24h–72h tier: only remove if no unpushed commits
|
||||
if _worktree_has_unpushed_commits(str(entry), timeout=5):
|
||||
continue # Has unpushed commits or can't check — skip
|
||||
# Uncommitted changes may be work the user hasn't retrieved.
|
||||
if _worktree_is_dirty(str(entry)):
|
||||
logger.debug("Skipping dirty worktree: %s", entry.name)
|
||||
continue
|
||||
|
||||
# Safe to remove
|
||||
# Unpushed commits are definitely work — keep at any age.
|
||||
if _worktree_has_unpushed_commits(str(entry), timeout=5):
|
||||
continue
|
||||
|
||||
# Safe to remove: clean, unlocked, fully pushed.
|
||||
try:
|
||||
branch_result = subprocess.run(
|
||||
["git", "branch", "--show-current"],
|
||||
@@ -1558,16 +1687,24 @@ def _prune_stale_worktrees(repo_root: str, max_age_hours: int = 24) -> None:
|
||||
)
|
||||
branch = branch_result.stdout.strip()
|
||||
|
||||
subprocess.run(
|
||||
remove_result = subprocess.run(
|
||||
["git", "worktree", "remove", str(entry), "--force"],
|
||||
capture_output=True, text=True, timeout=15, cwd=repo_root,
|
||||
)
|
||||
if remove_result.returncode != 0:
|
||||
# Removal failed — keep the branch so the commits stay
|
||||
# reachable.
|
||||
logger.debug(
|
||||
"Failed to remove worktree %s: %s",
|
||||
entry.name, remove_result.stderr.strip(),
|
||||
)
|
||||
continue
|
||||
if branch:
|
||||
subprocess.run(
|
||||
["git", "branch", "-D", branch],
|
||||
capture_output=True, text=True, timeout=10, cwd=repo_root,
|
||||
)
|
||||
logger.debug("Pruned stale worktree: %s (force=%s)", entry.name, force)
|
||||
logger.debug("Pruned stale worktree: %s", entry.name)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to prune worktree %s: %s", entry.name, e)
|
||||
|
||||
|
||||
@@ -162,11 +162,26 @@ def _has_unpushed_commits(worktree_path, timeout=10):
|
||||
return True
|
||||
|
||||
|
||||
def _is_dirty(wt_path, timeout=10):
|
||||
"""Test version of the worktree dirty-check helper (fail-safe True)."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "status", "--porcelain"],
|
||||
capture_output=True, text=True, timeout=timeout, cwd=wt_path,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return True
|
||||
return bool(result.stdout.strip())
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def _cleanup_worktree(info):
|
||||
"""Test version of _cleanup_worktree.
|
||||
|
||||
Preserves the worktree only if it has unpushed commits.
|
||||
Dirty working tree alone is not enough to keep it.
|
||||
Mirrors the cli.py contract: preserves the worktree if it has
|
||||
unpushed commits OR uncommitted changes; only deletes the branch
|
||||
after ``git worktree remove`` succeeded.
|
||||
"""
|
||||
wt_path = info["path"]
|
||||
branch = info["branch"]
|
||||
@@ -178,10 +193,16 @@ def _cleanup_worktree(info):
|
||||
if _has_unpushed_commits(wt_path, timeout=10):
|
||||
return False # Did not clean up — has unpushed commits
|
||||
|
||||
subprocess.run(
|
||||
if _is_dirty(wt_path):
|
||||
return False # Did not clean up — uncommitted changes
|
||||
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "remove", wt_path, "--force"],
|
||||
capture_output=True, text=True, timeout=15, cwd=repo_root,
|
||||
)
|
||||
if result.returncode != 0:
|
||||
return False # Removal failed — keep the branch
|
||||
|
||||
subprocess.run(
|
||||
["git", "branch", "-D", branch],
|
||||
capture_output=True, text=True, timeout=10, cwd=repo_root,
|
||||
@@ -283,17 +304,18 @@ class TestWorktreeCleanup:
|
||||
assert result is True
|
||||
assert not Path(info["path"]).exists()
|
||||
|
||||
def test_dirty_worktree_cleaned_when_no_unpushed(self, git_repo):
|
||||
"""Dirty working tree without unpushed commits is cleaned up.
|
||||
def test_dirty_worktree_preserved_on_cleanup(self, git_repo):
|
||||
"""Dirty working tree is preserved even without unpushed commits.
|
||||
|
||||
Agent sessions typically leave untracked files / artifacts behind.
|
||||
Since all real work is in pushed commits, these don't warrant
|
||||
keeping the worktree.
|
||||
Uncommitted changes may be work the user has not retrieved yet —
|
||||
cleanup must never destroy them.
|
||||
"""
|
||||
info = _setup_worktree(str(git_repo))
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
|
||||
# Make uncommitted changes (untracked file)
|
||||
# Make uncommitted changes (staged but uncommitted file)
|
||||
(Path(info["path"]) / "new-file.txt").write_text("uncommitted")
|
||||
subprocess.run(
|
||||
["git", "add", "new-file.txt"],
|
||||
@@ -301,10 +323,17 @@ class TestWorktreeCleanup:
|
||||
)
|
||||
|
||||
# The git_repo fixture already has a fake remote ref so the initial
|
||||
# commit is seen as "pushed". No unpushed commits → cleanup proceeds.
|
||||
result = _cleanup_worktree(info)
|
||||
assert result is True # Cleaned up despite dirty working tree
|
||||
assert not Path(info["path"]).exists()
|
||||
# commit is seen as "pushed" — only the dirty tree protects it.
|
||||
cli_mod._cleanup_worktree(info)
|
||||
assert Path(info["path"]).exists() # Preserved despite no unpushed commits
|
||||
|
||||
# Branch and lock are kept too
|
||||
result = subprocess.run(
|
||||
["git", "branch", "--list", info["branch"]],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
assert info["branch"] in result.stdout
|
||||
assert cli_mod._worktree_is_locked(str(git_repo), info["path"]) is True
|
||||
|
||||
def test_worktree_with_unpushed_commits_kept(self, git_repo):
|
||||
"""Worktree with unpushed commits is preserved."""
|
||||
@@ -728,47 +757,224 @@ class TestStaleWorktreePruning:
|
||||
assert not Path(info["path"]).exists()
|
||||
|
||||
def test_force_prunes_very_old_worktree(self, git_repo):
|
||||
"""Worktrees older than 72h should be force-pruned regardless."""
|
||||
"""Very old (>72h) CLEAN, unlocked, fully-pushed worktrees are pruned."""
|
||||
import time
|
||||
import cli as cli_mod
|
||||
|
||||
info = _setup_worktree(str(git_repo))
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
|
||||
# Make an unpushed commit (would normally protect it)
|
||||
(Path(info["path"]) / "work.txt").write_text("stale work")
|
||||
subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True)
|
||||
subprocess.run(
|
||||
["git", "commit", "-m", "old agent work"],
|
||||
cwd=info["path"], capture_output=True,
|
||||
)
|
||||
# _setup_worktree locks the worktree; unlock to simulate a worktree
|
||||
# whose owning session released it (clean + unlocked + pushed).
|
||||
assert cli_mod._unlock_worktree(str(git_repo), info["path"]) is True
|
||||
|
||||
# Make it very old (73h — beyond the 72h hard threshold)
|
||||
# Make it very old (73h)
|
||||
old_time = time.time() - (73 * 3600)
|
||||
os.utime(info["path"], (old_time, old_time))
|
||||
|
||||
# Simulate the force-prune tier check
|
||||
hard_cutoff = time.time() - (72 * 3600)
|
||||
mtime = Path(info["path"]).stat().st_mtime
|
||||
assert mtime <= hard_cutoff # Should qualify for force removal
|
||||
|
||||
# Actually remove it (simulates _prune_stale_worktrees force path)
|
||||
branch_result = subprocess.run(
|
||||
["git", "branch", "--show-current"],
|
||||
capture_output=True, text=True, timeout=5, cwd=info["path"],
|
||||
)
|
||||
branch = branch_result.stdout.strip()
|
||||
|
||||
subprocess.run(
|
||||
["git", "worktree", "remove", info["path"], "--force"],
|
||||
capture_output=True, text=True, timeout=15, cwd=str(git_repo),
|
||||
)
|
||||
if branch:
|
||||
subprocess.run(
|
||||
["git", "branch", "-D", branch],
|
||||
capture_output=True, text=True, timeout=10, cwd=str(git_repo),
|
||||
)
|
||||
cli_mod._prune_stale_worktrees(str(git_repo))
|
||||
|
||||
assert not Path(info["path"]).exists()
|
||||
# Branch should be gone too
|
||||
result = subprocess.run(
|
||||
["git", "branch", "--list", info["branch"]],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
assert info["branch"] not in result.stdout
|
||||
|
||||
|
||||
class TestWorktreeLocking:
|
||||
"""Test git-native worktree locks and the preserve-work contracts.
|
||||
|
||||
These tests exercise the REAL cli.py implementations (not the local
|
||||
reimplementations above), matching the pattern in
|
||||
test_worktree_security.py.
|
||||
"""
|
||||
|
||||
def test_setup_worktree_locks(self, git_repo):
|
||||
"""_setup_worktree leaves the new worktree locked."""
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
|
||||
# Verify via git worktree list --porcelain: the stanza for this
|
||||
# worktree must contain a "locked" line.
|
||||
result = subprocess.run(
|
||||
["git", "worktree", "list", "--porcelain"],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
target = Path(info["path"]).resolve()
|
||||
current = None
|
||||
locked = False
|
||||
for line in result.stdout.splitlines():
|
||||
if line.startswith("worktree "):
|
||||
current = Path(line[len("worktree "):].strip()).resolve()
|
||||
elif line == "locked" or line.startswith("locked "):
|
||||
if current == target:
|
||||
locked = True
|
||||
assert locked
|
||||
assert cli_mod._worktree_is_locked(str(git_repo), info["path"]) is True
|
||||
|
||||
def test_unlock_worktree(self, git_repo):
|
||||
"""_unlock_worktree releases the lock taken by _setup_worktree."""
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
assert cli_mod._worktree_is_locked(str(git_repo), info["path"]) is True
|
||||
|
||||
assert cli_mod._unlock_worktree(str(git_repo), info["path"]) is True
|
||||
assert cli_mod._worktree_is_locked(str(git_repo), info["path"]) is False
|
||||
|
||||
def test_prune_skips_locked_very_old_clean_worktree(self, git_repo):
|
||||
"""A locked worktree is never pruned, even >72h old and clean."""
|
||||
import time
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
# Still locked from _setup_worktree; clean; fully pushed.
|
||||
|
||||
old_time = time.time() - (80 * 3600)
|
||||
os.utime(info["path"], (old_time, old_time))
|
||||
|
||||
cli_mod._prune_stale_worktrees(str(git_repo))
|
||||
|
||||
assert Path(info["path"]).exists()
|
||||
|
||||
def test_prune_skips_old_dirty_unlocked_worktree(self, git_repo):
|
||||
"""An old dirty worktree is not pruned even when unlocked."""
|
||||
import time
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
assert cli_mod._unlock_worktree(str(git_repo), info["path"]) is True
|
||||
|
||||
# Uncommitted change (untracked file)
|
||||
(Path(info["path"]) / "wip.txt").write_text("uncommitted work")
|
||||
|
||||
old_time = time.time() - (25 * 3600)
|
||||
os.utime(info["path"], (old_time, old_time))
|
||||
|
||||
cli_mod._prune_stale_worktrees(str(git_repo))
|
||||
|
||||
assert Path(info["path"]).exists()
|
||||
assert (Path(info["path"]) / "wip.txt").exists()
|
||||
|
||||
def test_prune_preserves_very_old_worktree_with_unpushed_commits(self, git_repo):
|
||||
"""Unpushed commits protect a worktree at ANY age — the old >72h
|
||||
force-remove tier is gone."""
|
||||
import time
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
assert cli_mod._unlock_worktree(str(git_repo), info["path"]) is True
|
||||
|
||||
# Unpushed commit (clean tree afterwards)
|
||||
(Path(info["path"]) / "work.txt").write_text("real work")
|
||||
subprocess.run(["git", "add", "work.txt"], cwd=info["path"], capture_output=True)
|
||||
subprocess.run(
|
||||
["git", "commit", "-m", "agent work"],
|
||||
cwd=info["path"], capture_output=True,
|
||||
)
|
||||
|
||||
old_time = time.time() - (80 * 3600)
|
||||
os.utime(info["path"], (old_time, old_time))
|
||||
|
||||
cli_mod._prune_stale_worktrees(str(git_repo))
|
||||
|
||||
assert Path(info["path"]).exists()
|
||||
result = subprocess.run(
|
||||
["git", "branch", "--list", info["branch"]],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
assert info["branch"] in result.stdout
|
||||
|
||||
def test_cleanup_preserves_dirty_worktree(self, git_repo):
|
||||
"""_cleanup_worktree keeps a dirty worktree (untracked file)."""
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
|
||||
(Path(info["path"]) / "scratch.txt").write_text("not yet committed")
|
||||
|
||||
cli_mod._cleanup_worktree(info)
|
||||
|
||||
assert Path(info["path"]).exists()
|
||||
assert (Path(info["path"]) / "scratch.txt").exists()
|
||||
|
||||
def test_cleanup_removes_clean_locked_worktree(self, git_repo):
|
||||
"""_cleanup_worktree unlocks then removes a clean, pushed worktree."""
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
assert cli_mod._worktree_is_locked(str(git_repo), info["path"]) is True
|
||||
|
||||
cli_mod._cleanup_worktree(info)
|
||||
|
||||
assert not Path(info["path"]).exists()
|
||||
result = subprocess.run(
|
||||
["git", "branch", "--list", info["branch"]],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
assert info["branch"] not in result.stdout
|
||||
|
||||
def test_branch_kept_when_worktree_remove_fails(self, git_repo, monkeypatch):
|
||||
"""If `git worktree remove` fails, the branch must NOT be deleted."""
|
||||
import subprocess as sp
|
||||
import cli as cli_mod
|
||||
|
||||
info = cli_mod._setup_worktree(str(git_repo))
|
||||
assert info is not None
|
||||
|
||||
real_run = sp.run
|
||||
|
||||
def fake_run(cmd, *args, **kwargs):
|
||||
if (
|
||||
isinstance(cmd, (list, tuple))
|
||||
and list(cmd[:3]) == ["git", "worktree", "remove"]
|
||||
):
|
||||
return sp.CompletedProcess(
|
||||
cmd, returncode=1, stdout="", stderr="simulated removal failure"
|
||||
)
|
||||
return real_run(cmd, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(sp, "run", fake_run)
|
||||
|
||||
cli_mod._cleanup_worktree(info)
|
||||
|
||||
monkeypatch.undo()
|
||||
|
||||
# Worktree dir still present, branch NOT deleted
|
||||
assert Path(info["path"]).exists()
|
||||
result = subprocess.run(
|
||||
["git", "branch", "--list", info["branch"]],
|
||||
capture_output=True, text=True, cwd=str(git_repo),
|
||||
)
|
||||
assert info["branch"] in result.stdout
|
||||
|
||||
def test_worktree_is_locked_fail_safe(self, tmp_path):
|
||||
"""_worktree_is_locked returns True (fail safe) on a bogus repo_root."""
|
||||
import cli as cli_mod
|
||||
|
||||
bogus = tmp_path / "does-not-exist"
|
||||
assert cli_mod._worktree_is_locked(str(bogus), str(bogus / "wt")) is True
|
||||
|
||||
# An existing directory that is not a git repo is also an error case
|
||||
not_repo = tmp_path / "not-a-repo"
|
||||
not_repo.mkdir()
|
||||
assert cli_mod._worktree_is_locked(str(not_repo), str(not_repo / "wt")) is True
|
||||
|
||||
def test_worktree_is_dirty_fail_safe(self, tmp_path):
|
||||
"""_worktree_is_dirty returns True (fail safe) on a bogus path."""
|
||||
import cli as cli_mod
|
||||
|
||||
assert cli_mod._worktree_is_dirty(str(tmp_path / "missing")) is True
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
|
||||
Reference in New Issue
Block a user