mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-28 06:51:16 +08:00
feat: add hermes backup and hermes import commands (#7997)
* feat: add `hermes backup` and `hermes import` commands hermes backup — creates a zip of ~/.hermes/ (config, skills, sessions, profiles, memories, skins, cron jobs, etc.) excluding the hermes-agent codebase, __pycache__, and runtime PID files. Defaults to ~/hermes-backup-<timestamp>.zip, customizable with -o. hermes import <zipfile> — restores from a backup zip, validating it looks like a hermes backup before extracting. Handles .hermes/ prefix stripping, path traversal protection, and confirmation prompts (skip with --force). 29 tests covering exclusion rules, backup creation, import validation, prefix detection, path traversal blocking, confirmation flow, and a full round-trip test. * test: improve backup/import coverage to 97% Add 17 additional tests covering: - _format_size helper (bytes through terabytes) - Nonexistent hermes home error exit - Output path is a directory (auto-names inside it) - Output without .zip suffix (auto-appends) - Empty hermes home (all files excluded) - Permission errors during backup and import - Output zip inside hermes root (skips itself) - Not-a-zip file rejection - EOFError and KeyboardInterrupt during confirmation - 500+ file progress display - Directory-only zip prefix detection Remove dead code branch in _detect_prefix (unreachable guard). * feat: auto-restore profile wrapper scripts on import After extracting backup files, hermes import now scans profiles/ for subdirectories with config.yaml or .env and recreates the ~/.local/bin wrapper scripts so profile aliases (e.g. 'coder chat') work immediately. Also prints guidance for re-installing gateway services per profile. Handles edge cases: - Skips profile dirs without config (not real profiles) - Skips aliases that collide with existing commands - Gracefully degrades if hermes_cli.profiles isn't available (fresh install) - Shows PATH hint if ~/.local/bin isn't in PATH 3 new profile restoration tests (49 total).
This commit is contained in:
399
hermes_cli/backup.py
Normal file
399
hermes_cli/backup.py
Normal file
@@ -0,0 +1,399 @@
|
||||
"""
|
||||
Backup and import commands for hermes CLI.
|
||||
|
||||
`hermes backup` creates a zip archive of the entire ~/.hermes/ directory
|
||||
(excluding the hermes-agent repo and transient files).
|
||||
|
||||
`hermes import` restores from a backup zip, overlaying onto the current
|
||||
HERMES_HOME root.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
from hermes_constants import get_default_hermes_root, display_hermes_home
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Exclusion rules
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Directory names to skip entirely (matched against each path component)
|
||||
_EXCLUDED_DIRS = {
|
||||
"hermes-agent", # the codebase repo — re-clone instead
|
||||
"__pycache__", # bytecode caches — regenerated on import
|
||||
".git", # nested git dirs (profiles shouldn't have these, but safety)
|
||||
"node_modules", # js deps if website/ somehow leaks in
|
||||
}
|
||||
|
||||
# File-name suffixes to skip
|
||||
_EXCLUDED_SUFFIXES = (
|
||||
".pyc",
|
||||
".pyo",
|
||||
)
|
||||
|
||||
# File names to skip (runtime state that's meaningless on another machine)
|
||||
_EXCLUDED_NAMES = {
|
||||
"gateway.pid",
|
||||
"cron.pid",
|
||||
}
|
||||
|
||||
|
||||
def _should_exclude(rel_path: Path) -> bool:
|
||||
"""Return True if *rel_path* (relative to hermes root) should be skipped."""
|
||||
parts = rel_path.parts
|
||||
|
||||
# Any path component matches an excluded dir name
|
||||
for part in parts:
|
||||
if part in _EXCLUDED_DIRS:
|
||||
return True
|
||||
|
||||
name = rel_path.name
|
||||
|
||||
if name in _EXCLUDED_NAMES:
|
||||
return True
|
||||
|
||||
if name.endswith(_EXCLUDED_SUFFIXES):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backup
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _format_size(nbytes: int) -> str:
|
||||
"""Human-readable file size."""
|
||||
for unit in ("B", "KB", "MB", "GB"):
|
||||
if nbytes < 1024:
|
||||
return f"{nbytes:.1f} {unit}" if unit != "B" else f"{nbytes} {unit}"
|
||||
nbytes /= 1024
|
||||
return f"{nbytes:.1f} TB"
|
||||
|
||||
|
||||
def run_backup(args) -> None:
|
||||
"""Create a zip backup of the Hermes home directory."""
|
||||
hermes_root = get_default_hermes_root()
|
||||
|
||||
if not hermes_root.is_dir():
|
||||
print(f"Error: Hermes home directory not found at {hermes_root}")
|
||||
sys.exit(1)
|
||||
|
||||
# Determine output path
|
||||
if args.output:
|
||||
out_path = Path(args.output).expanduser().resolve()
|
||||
# If user gave a directory, put the zip inside it
|
||||
if out_path.is_dir():
|
||||
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
||||
out_path = out_path / f"hermes-backup-{stamp}.zip"
|
||||
else:
|
||||
stamp = datetime.now().strftime("%Y-%m-%d-%H%M%S")
|
||||
out_path = Path.home() / f"hermes-backup-{stamp}.zip"
|
||||
|
||||
# Ensure the suffix is .zip
|
||||
if out_path.suffix.lower() != ".zip":
|
||||
out_path = out_path.with_suffix(out_path.suffix + ".zip")
|
||||
|
||||
# Ensure parent directory exists
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Collect files
|
||||
print(f"Scanning {display_hermes_home()} ...")
|
||||
files_to_add: list[tuple[Path, Path]] = [] # (absolute, relative)
|
||||
skipped_dirs = set()
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(hermes_root, followlinks=False):
|
||||
dp = Path(dirpath)
|
||||
rel_dir = dp.relative_to(hermes_root)
|
||||
|
||||
# Prune excluded directories in-place so os.walk doesn't descend
|
||||
orig_dirnames = dirnames[:]
|
||||
dirnames[:] = [
|
||||
d for d in dirnames
|
||||
if d not in _EXCLUDED_DIRS
|
||||
]
|
||||
for removed in set(orig_dirnames) - set(dirnames):
|
||||
skipped_dirs.add(str(rel_dir / removed))
|
||||
|
||||
for fname in filenames:
|
||||
fpath = dp / fname
|
||||
rel = fpath.relative_to(hermes_root)
|
||||
|
||||
if _should_exclude(rel):
|
||||
continue
|
||||
|
||||
# Skip the output zip itself if it happens to be inside hermes root
|
||||
try:
|
||||
if fpath.resolve() == out_path.resolve():
|
||||
continue
|
||||
except (OSError, ValueError):
|
||||
pass
|
||||
|
||||
files_to_add.append((fpath, rel))
|
||||
|
||||
if not files_to_add:
|
||||
print("No files to back up.")
|
||||
return
|
||||
|
||||
# Create the zip
|
||||
file_count = len(files_to_add)
|
||||
print(f"Backing up {file_count} files ...")
|
||||
|
||||
total_bytes = 0
|
||||
errors = []
|
||||
t0 = time.monotonic()
|
||||
|
||||
with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED, compresslevel=6) as zf:
|
||||
for i, (abs_path, rel_path) in enumerate(files_to_add, 1):
|
||||
try:
|
||||
zf.write(abs_path, arcname=str(rel_path))
|
||||
total_bytes += abs_path.stat().st_size
|
||||
except (PermissionError, OSError) as exc:
|
||||
errors.append(f" {rel_path}: {exc}")
|
||||
continue
|
||||
|
||||
# Progress every 500 files
|
||||
if i % 500 == 0:
|
||||
print(f" {i}/{file_count} files ...")
|
||||
|
||||
elapsed = time.monotonic() - t0
|
||||
zip_size = out_path.stat().st_size
|
||||
|
||||
# Summary
|
||||
print()
|
||||
print(f"Backup complete: {out_path}")
|
||||
print(f" Files: {file_count}")
|
||||
print(f" Original: {_format_size(total_bytes)}")
|
||||
print(f" Compressed: {_format_size(zip_size)}")
|
||||
print(f" Time: {elapsed:.1f}s")
|
||||
|
||||
if skipped_dirs:
|
||||
print(f"\n Excluded directories:")
|
||||
for d in sorted(skipped_dirs):
|
||||
print(f" {d}/")
|
||||
|
||||
if errors:
|
||||
print(f"\n Warnings ({len(errors)} files skipped):")
|
||||
for e in errors[:10]:
|
||||
print(e)
|
||||
if len(errors) > 10:
|
||||
print(f" ... and {len(errors) - 10} more")
|
||||
|
||||
print(f"\nRestore with: hermes import {out_path.name}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Import
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _validate_backup_zip(zf: zipfile.ZipFile) -> tuple[bool, str]:
|
||||
"""Check that a zip looks like a Hermes backup.
|
||||
|
||||
Returns (ok, reason).
|
||||
"""
|
||||
names = zf.namelist()
|
||||
if not names:
|
||||
return False, "zip archive is empty"
|
||||
|
||||
# Look for telltale files that a hermes home would have
|
||||
markers = {"config.yaml", ".env", "hermes_state.db", "memory_store.db"}
|
||||
found = set()
|
||||
for n in names:
|
||||
# Could be at the root or one level deep (if someone zipped the directory)
|
||||
basename = Path(n).name
|
||||
if basename in markers:
|
||||
found.add(basename)
|
||||
|
||||
if not found:
|
||||
return False, (
|
||||
"zip does not appear to be a Hermes backup "
|
||||
"(no config.yaml, .env, or state databases found)"
|
||||
)
|
||||
|
||||
return True, ""
|
||||
|
||||
|
||||
def _detect_prefix(zf: zipfile.ZipFile) -> str:
|
||||
"""Detect if the zip has a common directory prefix wrapping all entries.
|
||||
|
||||
Some tools zip as `.hermes/config.yaml` instead of `config.yaml`.
|
||||
Returns the prefix to strip (empty string if none).
|
||||
"""
|
||||
names = [n for n in zf.namelist() if not n.endswith("/")]
|
||||
if not names:
|
||||
return ""
|
||||
|
||||
# Find common prefix
|
||||
parts_list = [Path(n).parts for n in names]
|
||||
|
||||
# Check if all entries share a common first directory
|
||||
first_parts = {p[0] for p in parts_list if len(p) > 1}
|
||||
if len(first_parts) == 1:
|
||||
prefix = first_parts.pop()
|
||||
# Only strip if it looks like a hermes dir name
|
||||
if prefix in (".hermes", "hermes"):
|
||||
return prefix + "/"
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def run_import(args) -> None:
|
||||
"""Restore a Hermes backup from a zip file."""
|
||||
zip_path = Path(args.zipfile).expanduser().resolve()
|
||||
|
||||
if not zip_path.is_file():
|
||||
print(f"Error: File not found: {zip_path}")
|
||||
sys.exit(1)
|
||||
|
||||
if not zipfile.is_zipfile(zip_path):
|
||||
print(f"Error: Not a valid zip file: {zip_path}")
|
||||
sys.exit(1)
|
||||
|
||||
hermes_root = get_default_hermes_root()
|
||||
|
||||
with zipfile.ZipFile(zip_path, "r") as zf:
|
||||
# Validate
|
||||
ok, reason = _validate_backup_zip(zf)
|
||||
if not ok:
|
||||
print(f"Error: {reason}")
|
||||
sys.exit(1)
|
||||
|
||||
prefix = _detect_prefix(zf)
|
||||
members = [n for n in zf.namelist() if not n.endswith("/")]
|
||||
file_count = len(members)
|
||||
|
||||
print(f"Backup contains {file_count} files")
|
||||
print(f"Target: {display_hermes_home()}")
|
||||
|
||||
if prefix:
|
||||
print(f"Detected archive prefix: {prefix!r} (will be stripped)")
|
||||
|
||||
# Check for existing installation
|
||||
has_config = (hermes_root / "config.yaml").exists()
|
||||
has_env = (hermes_root / ".env").exists()
|
||||
|
||||
if (has_config or has_env) and not args.force:
|
||||
print()
|
||||
print("Warning: Target directory already has Hermes configuration.")
|
||||
print("Importing will overwrite existing files with backup contents.")
|
||||
print()
|
||||
try:
|
||||
answer = input("Continue? [y/N] ").strip().lower()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\nAborted.")
|
||||
sys.exit(1)
|
||||
if answer not in ("y", "yes"):
|
||||
print("Aborted.")
|
||||
return
|
||||
|
||||
# Extract
|
||||
print(f"\nImporting {file_count} files ...")
|
||||
hermes_root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
errors = []
|
||||
restored = 0
|
||||
t0 = time.monotonic()
|
||||
|
||||
for member in members:
|
||||
# Strip prefix if detected
|
||||
if prefix and member.startswith(prefix):
|
||||
rel = member[len(prefix):]
|
||||
else:
|
||||
rel = member
|
||||
|
||||
if not rel:
|
||||
continue
|
||||
|
||||
target = hermes_root / rel
|
||||
|
||||
# Security: reject absolute paths and traversals
|
||||
try:
|
||||
target.resolve().relative_to(hermes_root.resolve())
|
||||
except ValueError:
|
||||
errors.append(f" {rel}: path traversal blocked")
|
||||
continue
|
||||
|
||||
try:
|
||||
target.parent.mkdir(parents=True, exist_ok=True)
|
||||
with zf.open(member) as src, open(target, "wb") as dst:
|
||||
dst.write(src.read())
|
||||
restored += 1
|
||||
except (PermissionError, OSError) as exc:
|
||||
errors.append(f" {rel}: {exc}")
|
||||
|
||||
if restored % 500 == 0:
|
||||
print(f" {restored}/{file_count} files ...")
|
||||
|
||||
elapsed = time.monotonic() - t0
|
||||
|
||||
# Summary
|
||||
print()
|
||||
print(f"Import complete: {restored} files restored in {elapsed:.1f}s")
|
||||
print(f" Target: {display_hermes_home()}")
|
||||
|
||||
if errors:
|
||||
print(f"\n Warnings ({len(errors)} files skipped):")
|
||||
for e in errors[:10]:
|
||||
print(e)
|
||||
if len(errors) > 10:
|
||||
print(f" ... and {len(errors) - 10} more")
|
||||
|
||||
# Post-import: restore profile wrapper scripts
|
||||
profiles_dir = hermes_root / "profiles"
|
||||
restored_profiles = []
|
||||
if profiles_dir.is_dir():
|
||||
try:
|
||||
from hermes_cli.profiles import (
|
||||
create_wrapper_script, check_alias_collision,
|
||||
_is_wrapper_dir_in_path, _get_wrapper_dir,
|
||||
)
|
||||
for entry in sorted(profiles_dir.iterdir()):
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
profile_name = entry.name
|
||||
# Only create wrappers for directories with config
|
||||
if not (entry / "config.yaml").exists() and not (entry / ".env").exists():
|
||||
continue
|
||||
collision = check_alias_collision(profile_name)
|
||||
if collision:
|
||||
print(f" Skipped alias '{profile_name}': {collision}")
|
||||
restored_profiles.append((profile_name, False))
|
||||
else:
|
||||
wrapper = create_wrapper_script(profile_name)
|
||||
restored_profiles.append((profile_name, wrapper is not None))
|
||||
|
||||
if restored_profiles:
|
||||
created = [n for n, ok in restored_profiles if ok]
|
||||
skipped = [n for n, ok in restored_profiles if not ok]
|
||||
if created:
|
||||
print(f"\n Profile aliases restored: {', '.join(created)}")
|
||||
if skipped:
|
||||
print(f" Profile aliases skipped: {', '.join(skipped)}")
|
||||
if not _is_wrapper_dir_in_path():
|
||||
print(f"\n Note: {_get_wrapper_dir()} is not in your PATH.")
|
||||
print(' Add to your shell config (~/.bashrc or ~/.zshrc):')
|
||||
print(' export PATH="$HOME/.local/bin:$PATH"')
|
||||
except ImportError:
|
||||
# hermes_cli.profiles might not be available (fresh install)
|
||||
if any(profiles_dir.iterdir()):
|
||||
print(f"\n Profiles detected but aliases could not be created.")
|
||||
print(f" Run: hermes profile list (after installing hermes)")
|
||||
|
||||
# Guidance
|
||||
print()
|
||||
if not (hermes_root / "hermes-agent").is_dir():
|
||||
print("Note: The hermes-agent codebase was not included in the backup.")
|
||||
print(" If this is a fresh install, run: hermes update")
|
||||
|
||||
if restored_profiles:
|
||||
gw_profiles = [n for n, _ in restored_profiles]
|
||||
print("\nTo re-enable gateway services for profiles:")
|
||||
for pname in gw_profiles:
|
||||
print(f" hermes -p {pname} gateway install")
|
||||
|
||||
print("Done. Your Hermes configuration has been restored.")
|
||||
Reference in New Issue
Block a user