mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 07:21:37 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
603599e982 |
@@ -290,6 +290,16 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
|
||||
left_lines.append(f"[dim {dim}]{cwd}[/]")
|
||||
if session_id:
|
||||
left_lines.append(f"[dim {session_color}]Session: {session_id}[/]")
|
||||
|
||||
# Show active profile if not default
|
||||
try:
|
||||
from hermes_cli.profiles import get_active_profile_name
|
||||
_profile_name = get_active_profile_name()
|
||||
if _profile_name != "default":
|
||||
left_lines.append(f"[dim {session_color}]Profile: {_profile_name}[/]")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
left_content = "\n".join(left_lines)
|
||||
|
||||
right_lines = [f"[bold {accent}]Available Tools[/]"]
|
||||
|
||||
@@ -26,7 +26,20 @@ from hermes_cli.colors import Colors, color
|
||||
# =============================================================================
|
||||
|
||||
def find_gateway_pids() -> list:
|
||||
"""Find PIDs of running gateway processes."""
|
||||
"""Find the PID of the gateway process for the current HERMES_HOME.
|
||||
|
||||
Uses the HERMES_HOME-scoped PID file (``{HERMES_HOME}/gateway.pid``)
|
||||
so that multiple profiles running gateways concurrently don't collide.
|
||||
Falls back to a global ``ps aux`` scan only if no PID file exists.
|
||||
"""
|
||||
from gateway.status import get_running_pid
|
||||
|
||||
# Primary: check the PID file scoped to this HERMES_HOME
|
||||
pid = get_running_pid()
|
||||
if pid is not None:
|
||||
return [pid]
|
||||
|
||||
# Fallback: global scan (legacy — covers cases where PID file wasn't written)
|
||||
pids = []
|
||||
patterns = [
|
||||
"hermes_cli.main gateway",
|
||||
@@ -36,12 +49,10 @@ def find_gateway_pids() -> list:
|
||||
|
||||
try:
|
||||
if is_windows():
|
||||
# Windows: use wmic to search command lines
|
||||
result = subprocess.run(
|
||||
["wmic", "process", "get", "ProcessId,CommandLine", "/FORMAT:LIST"],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
# Parse WMIC LIST output: blocks of "CommandLine=...\nProcessId=...\n"
|
||||
current_cmd = ""
|
||||
for line in result.stdout.split('\n'):
|
||||
line = line.strip()
|
||||
@@ -64,7 +75,6 @@ def find_gateway_pids() -> list:
|
||||
text=True
|
||||
)
|
||||
for line in result.stdout.split('\n'):
|
||||
# Skip grep and current process
|
||||
if 'grep' in line or str(os.getpid()) in line:
|
||||
continue
|
||||
for pattern in patterns:
|
||||
@@ -85,7 +95,10 @@ def find_gateway_pids() -> list:
|
||||
|
||||
|
||||
def kill_gateway_processes(force: bool = False) -> int:
|
||||
"""Kill any running gateway processes. Returns count killed."""
|
||||
"""Kill the gateway process for the current HERMES_HOME. Returns count killed.
|
||||
|
||||
Uses the scoped PID file first (profile-safe), falling back to global scan.
|
||||
"""
|
||||
pids = find_gateway_pids()
|
||||
killed = 0
|
||||
|
||||
|
||||
@@ -54,6 +54,43 @@ from typing import Optional
|
||||
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile override — MUST happen before any hermes module import.
|
||||
#
|
||||
# Many modules cache HERMES_HOME at import time (module-level constants).
|
||||
# We intercept --profile/-p from sys.argv here and set the env var so that
|
||||
# every subsequent ``os.getenv("HERMES_HOME", ...)`` resolves correctly.
|
||||
# The flag is stripped from sys.argv so argparse never sees it.
|
||||
# ---------------------------------------------------------------------------
|
||||
def _apply_profile_override() -> None:
|
||||
"""Pre-parse --profile/-p and set HERMES_HOME before module imports."""
|
||||
argv = sys.argv[1:]
|
||||
for i, arg in enumerate(argv):
|
||||
profile_name = None
|
||||
consume = 0 # how many argv slots to remove
|
||||
|
||||
if arg in ("--profile", "-p") and i + 1 < len(argv):
|
||||
profile_name = argv[i + 1]
|
||||
consume = 2
|
||||
elif arg.startswith("--profile="):
|
||||
profile_name = arg.split("=", 1)[1]
|
||||
consume = 1
|
||||
|
||||
if profile_name is not None:
|
||||
from hermes_cli.profiles import resolve_profile_env
|
||||
try:
|
||||
hermes_home = resolve_profile_env(profile_name)
|
||||
except (ValueError, FileNotFoundError) as exc:
|
||||
print(f"Error: {exc}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
os.environ["HERMES_HOME"] = hermes_home
|
||||
# Strip the flag from argv so argparse/subcommands don't choke
|
||||
start = i + 1 # +1 because argv is sys.argv[1:]
|
||||
sys.argv = sys.argv[:start] + sys.argv[start + consume:]
|
||||
return
|
||||
|
||||
_apply_profile_override()
|
||||
|
||||
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
|
||||
# User-managed env files should override stale shell exports on restart.
|
||||
from hermes_cli.config import get_hermes_home
|
||||
@@ -2510,7 +2547,7 @@ def _coalesce_session_name_args(argv: list) -> list:
|
||||
_SUBCOMMANDS = {
|
||||
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
|
||||
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
|
||||
"sessions", "insights", "version", "update", "uninstall",
|
||||
"sessions", "insights", "version", "update", "uninstall", "profile",
|
||||
}
|
||||
_SESSION_FLAGS = {"-c", "--continue", "-r", "--resume"}
|
||||
|
||||
@@ -2554,6 +2591,10 @@ Examples:
|
||||
hermes config edit Edit config in $EDITOR
|
||||
hermes config set model gpt-4 Set a config value
|
||||
hermes gateway Run messaging gateway
|
||||
hermes -p work Use the "work" profile
|
||||
hermes -p work gateway start Start gateway for "work" profile
|
||||
hermes profile create work Create a new profile
|
||||
hermes profile list List all profiles
|
||||
hermes -s hermes-agent-dev,github-auth
|
||||
hermes -w Start in isolated git worktree
|
||||
hermes gateway install Install gateway background service
|
||||
@@ -2605,6 +2646,15 @@ For more help on a command:
|
||||
default=False,
|
||||
help="Bypass all dangerous command approval prompts (use at your own risk)"
|
||||
)
|
||||
# NOTE: --profile/-p is pre-parsed before imports (see _apply_profile_override)
|
||||
# and stripped from sys.argv. We register it here only for --help visibility.
|
||||
parser.add_argument(
|
||||
"--profile", "-p",
|
||||
metavar="NAME",
|
||||
default=None,
|
||||
help="Use a named profile (isolated config, memory, gateway). "
|
||||
"See: hermes profile --help"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pass-session-id",
|
||||
action="store_true",
|
||||
@@ -3586,7 +3636,170 @@ For more help on a command:
|
||||
sys.exit(1)
|
||||
|
||||
acp_parser.set_defaults(func=cmd_acp)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# profile command
|
||||
# =========================================================================
|
||||
profile_parser = subparsers.add_parser(
|
||||
"profile",
|
||||
help="Manage isolated Hermes profiles",
|
||||
description=(
|
||||
"Create, list, and manage isolated Hermes profiles. "
|
||||
"Each profile has its own config, API keys, memory, sessions, "
|
||||
"skills, and gateway — fully independent from other profiles."
|
||||
),
|
||||
)
|
||||
profile_sub = profile_parser.add_subparsers(dest="profile_action")
|
||||
|
||||
# profile list
|
||||
profile_list_parser = profile_sub.add_parser(
|
||||
"list", help="List all profiles"
|
||||
)
|
||||
|
||||
# profile create
|
||||
profile_create_parser = profile_sub.add_parser(
|
||||
"create", help="Create a new profile"
|
||||
)
|
||||
profile_create_parser.add_argument(
|
||||
"name", help="Profile name (lowercase, alphanumeric, hyphens, underscores)"
|
||||
)
|
||||
profile_create_parser.add_argument(
|
||||
"--clone", metavar="SOURCE",
|
||||
help="Clone config and .env from an existing profile (e.g. 'default')"
|
||||
)
|
||||
profile_create_parser.add_argument(
|
||||
"--clone-data", action="store_true",
|
||||
help="Also clone memories, skills, and skins (requires --clone)"
|
||||
)
|
||||
|
||||
# profile delete
|
||||
profile_delete_parser = profile_sub.add_parser(
|
||||
"delete", help="Delete a profile"
|
||||
)
|
||||
profile_delete_parser.add_argument(
|
||||
"name", help="Profile name to delete"
|
||||
)
|
||||
profile_delete_parser.add_argument(
|
||||
"--yes", "-y", action="store_true",
|
||||
help="Skip confirmation prompt"
|
||||
)
|
||||
|
||||
# profile show
|
||||
profile_show_parser = profile_sub.add_parser(
|
||||
"show", help="Show details of a profile"
|
||||
)
|
||||
profile_show_parser.add_argument(
|
||||
"name", help="Profile name"
|
||||
)
|
||||
|
||||
def cmd_profile(args):
|
||||
"""Manage isolated Hermes profiles."""
|
||||
from hermes_cli.profiles import (
|
||||
create_profile, delete_profile, list_profiles,
|
||||
get_profile_dir, profile_exists, get_active_profile_name,
|
||||
)
|
||||
|
||||
action = args.profile_action
|
||||
|
||||
if action == "list" or action is None:
|
||||
profiles = list_profiles()
|
||||
active = get_active_profile_name()
|
||||
if not profiles:
|
||||
print("No profiles found.")
|
||||
return
|
||||
|
||||
print()
|
||||
print(" Profiles:")
|
||||
print()
|
||||
for p in profiles:
|
||||
marker = " ◆" if p.name == active else " "
|
||||
gw = " [gateway running]" if p.gateway_running else ""
|
||||
model_str = p.model or "(no model set)"
|
||||
env_str = "✓" if p.has_env else "✗"
|
||||
print(f" {marker} {p.name:<20s} {model_str:<35s} .env: {env_str}{gw}")
|
||||
print()
|
||||
if active != "default":
|
||||
print(f" Active profile: {active}")
|
||||
print()
|
||||
print(" Usage: hermes -p <name> [command]")
|
||||
print()
|
||||
return
|
||||
|
||||
if action == "create":
|
||||
name = args.name
|
||||
clone_from = args.clone
|
||||
clone_data = args.clone_data
|
||||
if clone_data and not clone_from:
|
||||
print("Error: --clone-data requires --clone <source>")
|
||||
sys.exit(1)
|
||||
try:
|
||||
profile_dir = create_profile(
|
||||
name, clone_from=clone_from, clone_data=clone_data,
|
||||
)
|
||||
except (ValueError, FileExistsError, FileNotFoundError) as exc:
|
||||
print(f"Error: {exc}")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"\n ✓ Profile '{name}' created at {profile_dir}\n")
|
||||
if clone_from:
|
||||
print(f" Cloned config from '{clone_from}'")
|
||||
if clone_data:
|
||||
print(f" Cloned memories, skills, and skins")
|
||||
print()
|
||||
|
||||
print(" Next steps:")
|
||||
if not clone_from:
|
||||
print(f" hermes -p {name} setup # Configure API keys and model")
|
||||
print(f" hermes -p {name} # Start chatting")
|
||||
print(f" hermes -p {name} gateway start # Start a gateway for this profile")
|
||||
print()
|
||||
return
|
||||
|
||||
if action == "delete":
|
||||
name = args.name
|
||||
if not profile_exists(name):
|
||||
print(f"Error: Profile '{name}' does not exist.")
|
||||
sys.exit(1)
|
||||
if name == "default":
|
||||
print("Error: Cannot delete the default profile (~/.hermes).")
|
||||
sys.exit(1)
|
||||
|
||||
if not args.yes:
|
||||
profile_dir = get_profile_dir(name)
|
||||
print(f"\n This will permanently delete profile '{name}' at:")
|
||||
print(f" {profile_dir}")
|
||||
print(f"\n All config, memory, sessions, and skills for this profile will be lost.")
|
||||
confirm = input(f"\n Type '{name}' to confirm: ").strip()
|
||||
if confirm != name:
|
||||
print(" Cancelled.")
|
||||
return
|
||||
|
||||
try:
|
||||
path = delete_profile(name)
|
||||
except (ValueError, FileNotFoundError, RuntimeError) as exc:
|
||||
print(f"Error: {exc}")
|
||||
sys.exit(1)
|
||||
print(f"\n ✓ Profile '{name}' deleted.\n")
|
||||
return
|
||||
|
||||
if action == "show":
|
||||
name = args.name
|
||||
if not profile_exists(name):
|
||||
print(f"Error: Profile '{name}' does not exist.")
|
||||
sys.exit(1)
|
||||
profile_dir = get_profile_dir(name)
|
||||
print(f"\n Profile: {name}")
|
||||
print(f" Path: {profile_dir}")
|
||||
print()
|
||||
# Show what's inside
|
||||
for item in sorted(profile_dir.iterdir()):
|
||||
kind = "dir " if item.is_dir() else "file"
|
||||
print(f" {kind} {item.name}")
|
||||
print()
|
||||
return
|
||||
|
||||
profile_parser.set_defaults(func=cmd_profile)
|
||||
|
||||
# =========================================================================
|
||||
# Parse and execute
|
||||
# =========================================================================
|
||||
|
||||
321
hermes_cli/profiles.py
Normal file
321
hermes_cli/profiles.py
Normal file
@@ -0,0 +1,321 @@
|
||||
"""
|
||||
Profile management for multiple isolated Hermes instances.
|
||||
|
||||
Each profile is a fully independent HERMES_HOME directory with its own
|
||||
config.yaml, .env, memory, sessions, skills, gateway, cron, and logs.
|
||||
Profiles live under ``~/.hermes/profiles/<name>/`` by default.
|
||||
|
||||
The "default" profile is ``~/.hermes`` itself — backward compatible,
|
||||
zero migration needed.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
_PROFILE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
|
||||
|
||||
# Directories bootstrapped inside every new profile
|
||||
_PROFILE_DIRS = [
|
||||
"memories",
|
||||
"sessions",
|
||||
"skills",
|
||||
"skins",
|
||||
"logs",
|
||||
"plans",
|
||||
"workspace",
|
||||
"audio_cache",
|
||||
"image_cache",
|
||||
]
|
||||
|
||||
# Files copied during clone (if they exist in the source)
|
||||
_CLONE_CONFIG_FILES = [
|
||||
"config.yaml",
|
||||
".env",
|
||||
"SOUL.md",
|
||||
]
|
||||
|
||||
# Optional data dirs to clone when --clone-data is requested
|
||||
_CLONE_DATA_DIRS = [
|
||||
"memories",
|
||||
"skills",
|
||||
"skins",
|
||||
]
|
||||
|
||||
|
||||
def _get_profiles_root() -> Path:
|
||||
"""Return the directory where profiles are stored.
|
||||
|
||||
Always ``~/.hermes/profiles/`` — anchored to the user's home,
|
||||
NOT to the current HERMES_HOME (which may itself be a profile).
|
||||
"""
|
||||
return Path.home() / ".hermes" / "profiles"
|
||||
|
||||
|
||||
def _get_default_hermes_home() -> Path:
|
||||
"""Return the default (pre-profile) HERMES_HOME path."""
|
||||
return Path.home() / ".hermes"
|
||||
|
||||
|
||||
def validate_profile_name(name: str) -> None:
|
||||
"""Raise ``ValueError`` if *name* is not a valid profile identifier."""
|
||||
if name == "default":
|
||||
return # special alias for ~/.hermes
|
||||
if not _PROFILE_ID_RE.match(name):
|
||||
raise ValueError(
|
||||
f"Invalid profile name {name!r}. Must match "
|
||||
f"[a-z0-9][a-z0-9_-]{{0,63}}"
|
||||
)
|
||||
|
||||
|
||||
def get_profile_dir(name: str) -> Path:
|
||||
"""Resolve a profile name to its HERMES_HOME directory."""
|
||||
if name == "default":
|
||||
return _get_default_hermes_home()
|
||||
return _get_profiles_root() / name
|
||||
|
||||
|
||||
def profile_exists(name: str) -> bool:
|
||||
"""Check whether a profile directory exists."""
|
||||
return get_profile_dir(name).is_dir()
|
||||
|
||||
|
||||
@dataclass
|
||||
class ProfileInfo:
|
||||
"""Summary information about a profile."""
|
||||
name: str
|
||||
path: Path
|
||||
is_default: bool
|
||||
gateway_running: bool
|
||||
model: Optional[str]
|
||||
provider: Optional[str]
|
||||
has_env: bool
|
||||
|
||||
|
||||
def _read_config_model(profile_dir: Path) -> tuple:
|
||||
"""Read model/provider from a profile's config.yaml. Returns (model, provider)."""
|
||||
config_path = profile_dir / "config.yaml"
|
||||
if not config_path.exists():
|
||||
return None, None
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path, "r") as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
model_cfg = cfg.get("model", {})
|
||||
if isinstance(model_cfg, str):
|
||||
return model_cfg, None
|
||||
if isinstance(model_cfg, dict):
|
||||
return model_cfg.get("model"), model_cfg.get("provider")
|
||||
return None, None
|
||||
except Exception:
|
||||
return None, None
|
||||
|
||||
|
||||
def _check_gateway_running(profile_dir: Path) -> bool:
|
||||
"""Check if a gateway is running for a given profile directory."""
|
||||
pid_file = profile_dir / "gateway.pid"
|
||||
if not pid_file.exists():
|
||||
return False
|
||||
try:
|
||||
raw = pid_file.read_text().strip()
|
||||
if not raw:
|
||||
return False
|
||||
data = json.loads(raw) if raw.startswith("{") else {"pid": int(raw)}
|
||||
pid = int(data["pid"])
|
||||
os.kill(pid, 0) # existence check
|
||||
return True
|
||||
except (json.JSONDecodeError, KeyError, ValueError, TypeError,
|
||||
ProcessLookupError, PermissionError, OSError):
|
||||
return False
|
||||
|
||||
|
||||
def list_profiles() -> List[ProfileInfo]:
|
||||
"""Return info for all profiles, including the default."""
|
||||
profiles = []
|
||||
|
||||
# Default profile
|
||||
default_home = _get_default_hermes_home()
|
||||
if default_home.is_dir():
|
||||
model, provider = _read_config_model(default_home)
|
||||
profiles.append(ProfileInfo(
|
||||
name="default",
|
||||
path=default_home,
|
||||
is_default=True,
|
||||
gateway_running=_check_gateway_running(default_home),
|
||||
model=model,
|
||||
provider=provider,
|
||||
has_env=(default_home / ".env").exists(),
|
||||
))
|
||||
|
||||
# Named profiles
|
||||
profiles_root = _get_profiles_root()
|
||||
if profiles_root.is_dir():
|
||||
for entry in sorted(profiles_root.iterdir()):
|
||||
if not entry.is_dir():
|
||||
continue
|
||||
name = entry.name
|
||||
if not _PROFILE_ID_RE.match(name):
|
||||
continue
|
||||
model, provider = _read_config_model(entry)
|
||||
profiles.append(ProfileInfo(
|
||||
name=name,
|
||||
path=entry,
|
||||
is_default=False,
|
||||
gateway_running=_check_gateway_running(entry),
|
||||
model=model,
|
||||
provider=provider,
|
||||
has_env=(entry / ".env").exists(),
|
||||
))
|
||||
|
||||
return profiles
|
||||
|
||||
|
||||
def create_profile(
|
||||
name: str,
|
||||
clone_from: Optional[str] = None,
|
||||
clone_data: bool = False,
|
||||
) -> Path:
|
||||
"""Create a new profile directory with bootstrapped structure.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name:
|
||||
Profile identifier (lowercase, alphanumeric, hyphens, underscores).
|
||||
clone_from:
|
||||
If set, copy config files from this existing profile.
|
||||
Use ``"default"`` to clone from the main ``~/.hermes``.
|
||||
clone_data:
|
||||
If True (and clone_from is set), also copy memories, skills, skins.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The newly created profile directory.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
|
||||
if name == "default":
|
||||
raise ValueError("Cannot create a profile named 'default' — it is the built-in profile (~/.hermes).")
|
||||
|
||||
profile_dir = get_profile_dir(name)
|
||||
if profile_dir.exists():
|
||||
raise FileExistsError(f"Profile '{name}' already exists at {profile_dir}")
|
||||
|
||||
# Bootstrap directory structure
|
||||
profile_dir.mkdir(parents=True, exist_ok=True)
|
||||
for subdir in _PROFILE_DIRS:
|
||||
(profile_dir / subdir).mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Clone from source profile
|
||||
if clone_from is not None:
|
||||
validate_profile_name(clone_from)
|
||||
source_dir = get_profile_dir(clone_from)
|
||||
if not source_dir.is_dir():
|
||||
raise FileNotFoundError(f"Source profile '{clone_from}' does not exist at {source_dir}")
|
||||
|
||||
# Copy config files
|
||||
for filename in _CLONE_CONFIG_FILES:
|
||||
src = source_dir / filename
|
||||
if src.exists():
|
||||
shutil.copy2(src, profile_dir / filename)
|
||||
|
||||
# Copy data directories
|
||||
if clone_data:
|
||||
for dirname in _CLONE_DATA_DIRS:
|
||||
src = source_dir / dirname
|
||||
if src.is_dir() and any(src.iterdir()):
|
||||
dst = profile_dir / dirname
|
||||
# Remove the empty bootstrapped dir, copy the full tree
|
||||
if dst.exists():
|
||||
shutil.rmtree(dst)
|
||||
shutil.copytree(src, dst)
|
||||
|
||||
return profile_dir
|
||||
|
||||
|
||||
def delete_profile(name: str) -> Path:
|
||||
"""Delete a profile directory.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
name:
|
||||
Profile identifier.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Path
|
||||
The path that was removed.
|
||||
|
||||
Raises
|
||||
------
|
||||
ValueError
|
||||
If trying to delete the default profile.
|
||||
FileNotFoundError
|
||||
If the profile does not exist.
|
||||
"""
|
||||
validate_profile_name(name)
|
||||
|
||||
if name == "default":
|
||||
raise ValueError("Cannot delete the default profile (~/.hermes).")
|
||||
|
||||
profile_dir = get_profile_dir(name)
|
||||
if not profile_dir.is_dir():
|
||||
raise FileNotFoundError(f"Profile '{name}' does not exist.")
|
||||
|
||||
# Safety: check if gateway is running
|
||||
if _check_gateway_running(profile_dir):
|
||||
raise RuntimeError(
|
||||
f"Profile '{name}' has a running gateway. "
|
||||
f"Stop it first: hermes -p {name} gateway stop"
|
||||
)
|
||||
|
||||
shutil.rmtree(profile_dir)
|
||||
return profile_dir
|
||||
|
||||
|
||||
def resolve_profile_env(profile_name: str) -> str:
|
||||
"""Resolve a profile name to a HERMES_HOME path string.
|
||||
|
||||
This is called early in the CLI entry point, before any hermes modules
|
||||
are imported, to set the HERMES_HOME environment variable.
|
||||
"""
|
||||
validate_profile_name(profile_name)
|
||||
profile_dir = get_profile_dir(profile_name)
|
||||
|
||||
if profile_name != "default" and not profile_dir.is_dir():
|
||||
raise FileNotFoundError(
|
||||
f"Profile '{profile_name}' does not exist. "
|
||||
f"Create it with: hermes profile create {profile_name}"
|
||||
)
|
||||
|
||||
return str(profile_dir)
|
||||
|
||||
|
||||
def get_active_profile_name() -> str:
|
||||
"""Infer the current profile name from HERMES_HOME.
|
||||
|
||||
Returns ``"default"`` if HERMES_HOME is not set or points to ``~/.hermes``.
|
||||
Returns the profile name if HERMES_HOME points into ``~/.hermes/profiles/<name>``.
|
||||
Returns ``"custom"`` if HERMES_HOME is set to an unrecognized path.
|
||||
"""
|
||||
hermes_home = Path(os.getenv("HERMES_HOME", str(_get_default_hermes_home())))
|
||||
resolved = hermes_home.resolve()
|
||||
|
||||
default_resolved = _get_default_hermes_home().resolve()
|
||||
if resolved == default_resolved:
|
||||
return "default"
|
||||
|
||||
profiles_root = _get_profiles_root().resolve()
|
||||
try:
|
||||
rel = resolved.relative_to(profiles_root)
|
||||
parts = rel.parts
|
||||
if len(parts) == 1 and _PROFILE_ID_RE.match(parts[0]):
|
||||
return parts[0]
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
return "custom"
|
||||
73
tests/hermes_cli/test_gateway_pid_scoping.py
Normal file
73
tests/hermes_cli/test_gateway_pid_scoping.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Tests for HERMES_HOME-scoped gateway PID lookup.
|
||||
|
||||
Verifies that find_gateway_pids() uses the PID file scoped to the current
|
||||
HERMES_HOME, preventing multi-profile gateway collisions.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fake_hermes_homes(tmp_path):
|
||||
"""Create two fake HERMES_HOME directories with different PID files."""
|
||||
home_a = tmp_path / "profile-a"
|
||||
home_b = tmp_path / "profile-b"
|
||||
home_a.mkdir()
|
||||
home_b.mkdir()
|
||||
return home_a, home_b
|
||||
|
||||
|
||||
class TestFindGatewayPidsScoping:
|
||||
"""find_gateway_pids should only return PIDs for the current HERMES_HOME."""
|
||||
|
||||
def test_returns_pid_from_scoped_file(self, fake_hermes_homes):
|
||||
"""When a PID file exists, find_gateway_pids should read from it."""
|
||||
home_a, _ = fake_hermes_homes
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(home_a)}):
|
||||
# Write a PID file for profile A
|
||||
pid_data = {"pid": os.getpid(), "kind": "hermes-gateway",
|
||||
"argv": ["hermes", "gateway", "run"]}
|
||||
(home_a / "gateway.pid").write_text(json.dumps(pid_data))
|
||||
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
pids = find_gateway_pids()
|
||||
assert os.getpid() in pids
|
||||
|
||||
def test_does_not_see_other_profile_pid(self, fake_hermes_homes):
|
||||
"""Profile B's gateway PID should not appear when HERMES_HOME points to A."""
|
||||
home_a, home_b = fake_hermes_homes
|
||||
|
||||
# Write PID file only in profile B
|
||||
pid_data = {"pid": os.getpid(), "kind": "hermes-gateway",
|
||||
"argv": ["hermes", "gateway", "run"]}
|
||||
(home_b / "gateway.pid").write_text(json.dumps(pid_data))
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(home_a)}):
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
# get_running_pid is imported locally from gateway.status,
|
||||
# so we patch at the source
|
||||
with patch("gateway.status.get_running_pid", return_value=None):
|
||||
# With no PID file in home_a, and mocking out the global scan,
|
||||
# we should get no PIDs
|
||||
with patch("subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(stdout="", returncode=0)
|
||||
pids = find_gateway_pids()
|
||||
assert pids == []
|
||||
|
||||
def test_empty_when_no_pid_file_and_no_processes(self, fake_hermes_homes):
|
||||
"""When no PID file exists and no gateway processes are found, returns empty."""
|
||||
home_a, _ = fake_hermes_homes
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(home_a)}):
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
with patch("gateway.status.get_running_pid", return_value=None):
|
||||
with patch("subprocess.run") as mock_run:
|
||||
mock_run.return_value = MagicMock(stdout="", returncode=0)
|
||||
pids = find_gateway_pids()
|
||||
assert pids == []
|
||||
379
tests/hermes_cli/test_profiles.py
Normal file
379
tests/hermes_cli/test_profiles.py
Normal file
@@ -0,0 +1,379 @@
|
||||
"""Tests for the profile management system (hermes_cli/profiles.py)."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.fixture
|
||||
def profiles_home(tmp_path):
|
||||
"""Create a fake ~/.hermes tree and patch Path.home() to use it."""
|
||||
fake_home = tmp_path / "fakehome"
|
||||
fake_home.mkdir()
|
||||
hermes_default = fake_home / ".hermes"
|
||||
hermes_default.mkdir()
|
||||
(hermes_default / "config.yaml").write_text("model:\n model: anthropic/claude-sonnet-4\n provider: openrouter\n")
|
||||
(hermes_default / ".env").write_text("OPENROUTER_API_KEY=sk-test-123\n")
|
||||
(hermes_default / "memories").mkdir()
|
||||
(hermes_default / "sessions").mkdir()
|
||||
(hermes_default / "skills").mkdir()
|
||||
|
||||
with patch("hermes_cli.profiles.Path.home", return_value=fake_home):
|
||||
# Also clear HERMES_HOME so get_active_profile_name sees the default
|
||||
old = os.environ.pop("HERMES_HOME", None)
|
||||
yield fake_home
|
||||
if old is not None:
|
||||
os.environ["HERMES_HOME"] = old
|
||||
else:
|
||||
os.environ.pop("HERMES_HOME", None)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def profiles_mod():
|
||||
"""Import the profiles module (deferred to avoid import-time side effects)."""
|
||||
import hermes_cli.profiles as mod
|
||||
return mod
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# validate_profile_name
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestValidateProfileName:
|
||||
def test_valid_names(self, profiles_mod):
|
||||
for name in ["work", "personal", "bot-1", "my_agent", "a", "x" * 64]:
|
||||
profiles_mod.validate_profile_name(name) # should not raise
|
||||
|
||||
def test_default_is_valid(self, profiles_mod):
|
||||
profiles_mod.validate_profile_name("default")
|
||||
|
||||
def test_invalid_names(self, profiles_mod):
|
||||
for name in ["", "Work", "has space", "-starts-hyphen", "_starts-under",
|
||||
"has.dot", "x" * 65, "UPPER"]:
|
||||
with pytest.raises(ValueError):
|
||||
profiles_mod.validate_profile_name(name)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_profile_dir
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetProfileDir:
|
||||
def test_default_returns_hermes_home(self, profiles_home, profiles_mod):
|
||||
result = profiles_mod.get_profile_dir("default")
|
||||
assert result == profiles_home / ".hermes"
|
||||
|
||||
def test_named_returns_profiles_subdir(self, profiles_home, profiles_mod):
|
||||
result = profiles_mod.get_profile_dir("work")
|
||||
assert result == profiles_home / ".hermes" / "profiles" / "work"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_profile
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCreateProfile:
|
||||
def test_basic_create(self, profiles_home, profiles_mod):
|
||||
path = profiles_mod.create_profile("mybot")
|
||||
assert path.is_dir()
|
||||
assert path == profiles_home / ".hermes" / "profiles" / "mybot"
|
||||
# Check bootstrapped directories
|
||||
for subdir in ["memories", "sessions", "skills", "skins", "logs",
|
||||
"plans", "workspace", "audio_cache", "image_cache"]:
|
||||
assert (path / subdir).is_dir(), f"Missing subdir: {subdir}"
|
||||
|
||||
def test_cannot_create_default(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(ValueError, match="default"):
|
||||
profiles_mod.create_profile("default")
|
||||
|
||||
def test_duplicate_raises(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("dup")
|
||||
with pytest.raises(FileExistsError):
|
||||
profiles_mod.create_profile("dup")
|
||||
|
||||
def test_invalid_name_raises(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(ValueError):
|
||||
profiles_mod.create_profile("Bad Name")
|
||||
|
||||
def test_clone_from_default(self, profiles_home, profiles_mod):
|
||||
path = profiles_mod.create_profile("cloned", clone_from="default")
|
||||
assert (path / "config.yaml").exists()
|
||||
assert (path / ".env").exists()
|
||||
# Verify content was actually copied
|
||||
assert "anthropic/claude-sonnet-4" in (path / "config.yaml").read_text()
|
||||
assert "sk-test-123" in (path / ".env").read_text()
|
||||
|
||||
def test_clone_from_nonexistent_raises(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
profiles_mod.create_profile("bad", clone_from="nonexistent")
|
||||
|
||||
def test_clone_with_data(self, profiles_home, profiles_mod):
|
||||
# Put some data in default profile
|
||||
default_home = profiles_home / ".hermes"
|
||||
(default_home / "memories" / "memory.md").write_text("I remember things")
|
||||
(default_home / "skills" / "test-skill").mkdir(parents=True)
|
||||
(default_home / "skills" / "test-skill" / "SKILL.md").write_text("---\nname: test\n---\n# Test")
|
||||
|
||||
path = profiles_mod.create_profile("full-clone", clone_from="default", clone_data=True)
|
||||
assert (path / "memories" / "memory.md").exists()
|
||||
assert (path / "skills" / "test-skill" / "SKILL.md").exists()
|
||||
|
||||
def test_clone_without_data_skips_memories(self, profiles_home, profiles_mod):
|
||||
default_home = profiles_home / ".hermes"
|
||||
(default_home / "memories" / "memory.md").write_text("secret")
|
||||
|
||||
path = profiles_mod.create_profile("config-only", clone_from="default")
|
||||
# memories dir exists (bootstrapped) but should be empty
|
||||
assert (path / "memories").is_dir()
|
||||
assert not (path / "memories" / "memory.md").exists()
|
||||
|
||||
def test_clone_from_named_profile(self, profiles_home, profiles_mod):
|
||||
# Create source profile first
|
||||
src = profiles_mod.create_profile("source")
|
||||
(src / "config.yaml").write_text("model:\n model: openai/gpt-4\n")
|
||||
(src / ".env").write_text("OPENAI_API_KEY=sk-source\n")
|
||||
|
||||
# Clone from it
|
||||
dst = profiles_mod.create_profile("derived", clone_from="source")
|
||||
assert "gpt-4" in (dst / "config.yaml").read_text()
|
||||
assert "sk-source" in (dst / ".env").read_text()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# delete_profile
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDeleteProfile:
|
||||
def test_delete_existing(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("doomed")
|
||||
path = profiles_mod.delete_profile("doomed")
|
||||
assert not path.exists()
|
||||
|
||||
def test_cannot_delete_default(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(ValueError, match="default"):
|
||||
profiles_mod.delete_profile("default")
|
||||
|
||||
def test_delete_nonexistent_raises(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(FileNotFoundError):
|
||||
profiles_mod.delete_profile("ghost")
|
||||
|
||||
def test_delete_with_running_gateway_raises(self, profiles_home, profiles_mod):
|
||||
path = profiles_mod.create_profile("running")
|
||||
# Write a fake PID file with our own PID (so os.kill(pid, 0) succeeds)
|
||||
pid_data = {"pid": os.getpid(), "kind": "hermes-gateway"}
|
||||
(path / "gateway.pid").write_text(json.dumps(pid_data))
|
||||
|
||||
with pytest.raises(RuntimeError, match="running gateway"):
|
||||
profiles_mod.delete_profile("running")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# list_profiles
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestListProfiles:
|
||||
def test_default_only(self, profiles_home, profiles_mod):
|
||||
profiles = profiles_mod.list_profiles()
|
||||
assert len(profiles) == 1
|
||||
assert profiles[0].name == "default"
|
||||
assert profiles[0].is_default
|
||||
assert profiles[0].model == "anthropic/claude-sonnet-4"
|
||||
assert profiles[0].has_env
|
||||
|
||||
def test_with_named_profiles(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("alpha")
|
||||
profiles_mod.create_profile("beta")
|
||||
profiles = profiles_mod.list_profiles()
|
||||
names = [p.name for p in profiles]
|
||||
assert "default" in names
|
||||
assert "alpha" in names
|
||||
assert "beta" in names
|
||||
assert len(profiles) == 3
|
||||
|
||||
def test_profiles_sorted(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("zebra")
|
||||
profiles_mod.create_profile("alpha")
|
||||
profiles = profiles_mod.list_profiles()
|
||||
named = [p.name for p in profiles if not p.is_default]
|
||||
assert named == ["alpha", "zebra"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# resolve_profile_env
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestResolveProfileEnv:
|
||||
def test_default_returns_hermes_home(self, profiles_home, profiles_mod):
|
||||
result = profiles_mod.resolve_profile_env("default")
|
||||
assert result == str(profiles_home / ".hermes")
|
||||
|
||||
def test_existing_named_profile(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("work")
|
||||
result = profiles_mod.resolve_profile_env("work")
|
||||
assert result == str(profiles_home / ".hermes" / "profiles" / "work")
|
||||
|
||||
def test_nonexistent_raises(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(FileNotFoundError, match="does not exist"):
|
||||
profiles_mod.resolve_profile_env("missing")
|
||||
|
||||
def test_invalid_name_raises(self, profiles_home, profiles_mod):
|
||||
with pytest.raises(ValueError):
|
||||
profiles_mod.resolve_profile_env("Bad Name!")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_active_profile_name
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetActiveProfileName:
|
||||
def test_default_when_no_env(self, profiles_home, profiles_mod):
|
||||
assert profiles_mod.get_active_profile_name() == "default"
|
||||
|
||||
def test_named_profile_from_env(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("test-profile")
|
||||
profile_path = str(profiles_home / ".hermes" / "profiles" / "test-profile")
|
||||
with patch.dict(os.environ, {"HERMES_HOME": profile_path}):
|
||||
assert profiles_mod.get_active_profile_name() == "test-profile"
|
||||
|
||||
def test_custom_when_unrecognized_path(self, profiles_home, profiles_mod):
|
||||
with patch.dict(os.environ, {"HERMES_HOME": "/opt/custom-hermes"}):
|
||||
assert profiles_mod.get_active_profile_name() == "custom"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# profile_exists
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProfileExists:
|
||||
def test_default_exists(self, profiles_home, profiles_mod):
|
||||
assert profiles_mod.profile_exists("default")
|
||||
|
||||
def test_created_exists(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("new")
|
||||
assert profiles_mod.profile_exists("new")
|
||||
|
||||
def test_uncreated_does_not_exist(self, profiles_home, profiles_mod):
|
||||
assert not profiles_mod.profile_exists("nope")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Profile isolation: verify each profile is a full HERMES_HOME
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProfileIsolation:
|
||||
"""Verify that setting HERMES_HOME to a profile dir gives full isolation."""
|
||||
|
||||
def test_config_isolation(self, profiles_home, profiles_mod):
|
||||
"""Two profiles should have independent config.yaml files."""
|
||||
p1 = profiles_mod.create_profile("iso1", clone_from="default")
|
||||
p2 = profiles_mod.create_profile("iso2", clone_from="default")
|
||||
|
||||
# Modify p1's config
|
||||
(p1 / "config.yaml").write_text("model:\n model: openai/gpt-4\n")
|
||||
# p2 should still have the original
|
||||
assert "claude" in (p2 / "config.yaml").read_text()
|
||||
assert "gpt-4" in (p1 / "config.yaml").read_text()
|
||||
|
||||
def test_env_isolation(self, profiles_home, profiles_mod):
|
||||
"""Two profiles should have independent .env files."""
|
||||
p1 = profiles_mod.create_profile("env1", clone_from="default")
|
||||
p2 = profiles_mod.create_profile("env2", clone_from="default")
|
||||
|
||||
(p1 / ".env").write_text("OPENROUTER_API_KEY=sk-work\n")
|
||||
(p2 / ".env").write_text("OPENROUTER_API_KEY=sk-personal\n")
|
||||
|
||||
assert "sk-work" in (p1 / ".env").read_text()
|
||||
assert "sk-personal" in (p2 / ".env").read_text()
|
||||
|
||||
def test_memory_isolation(self, profiles_home, profiles_mod):
|
||||
"""Two profiles should have independent memory directories."""
|
||||
p1 = profiles_mod.create_profile("mem1")
|
||||
p2 = profiles_mod.create_profile("mem2")
|
||||
|
||||
(p1 / "memories" / "memory.md").write_text("Profile 1 memory")
|
||||
assert not (p2 / "memories" / "memory.md").exists()
|
||||
|
||||
def test_session_isolation(self, profiles_home, profiles_mod):
|
||||
"""Two profiles should have independent session directories."""
|
||||
p1 = profiles_mod.create_profile("ses1")
|
||||
p2 = profiles_mod.create_profile("ses2")
|
||||
|
||||
(p1 / "sessions" / "test.json").write_text("{}")
|
||||
assert not (p2 / "sessions" / "test.json").exists()
|
||||
|
||||
def test_skills_isolation(self, profiles_home, profiles_mod):
|
||||
"""Two profiles should have independent skill directories."""
|
||||
p1 = profiles_mod.create_profile("sk1")
|
||||
p2 = profiles_mod.create_profile("sk2")
|
||||
|
||||
skill_dir = p1 / "skills" / "custom-skill"
|
||||
skill_dir.mkdir(parents=True)
|
||||
(skill_dir / "SKILL.md").write_text("# Custom")
|
||||
assert not (p2 / "skills" / "custom-skill").exists()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway collision prevention
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGatewayIsolation:
|
||||
def test_pid_files_are_separate(self, profiles_home, profiles_mod):
|
||||
"""Each profile should have its own gateway.pid path."""
|
||||
p1 = profiles_mod.create_profile("gw1")
|
||||
p2 = profiles_mod.create_profile("gw2")
|
||||
|
||||
pid1_path = p1 / "gateway.pid"
|
||||
pid2_path = p2 / "gateway.pid"
|
||||
|
||||
# They should be different paths
|
||||
assert pid1_path != pid2_path
|
||||
|
||||
# Writing to one doesn't affect the other
|
||||
pid1_path.write_text(json.dumps({"pid": 12345}))
|
||||
assert not pid2_path.exists()
|
||||
|
||||
def test_systemd_service_names_differ(self, profiles_home, profiles_mod):
|
||||
"""Different profiles should get different systemd service names."""
|
||||
p1 = profiles_mod.create_profile("svc1")
|
||||
p2 = profiles_mod.create_profile("svc2")
|
||||
|
||||
from hermes_cli.gateway import get_service_name
|
||||
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(p1)}):
|
||||
name1 = get_service_name()
|
||||
with patch.dict(os.environ, {"HERMES_HOME": str(p2)}):
|
||||
name2 = get_service_name()
|
||||
|
||||
assert name1 != name2
|
||||
# Both should start with hermes-gateway
|
||||
assert name1.startswith("hermes-gateway")
|
||||
assert name2.startswith("hermes-gateway")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _apply_profile_override (from main.py)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestApplyProfileOverride:
|
||||
"""Test that --profile/-p pre-parsing sets HERMES_HOME correctly."""
|
||||
|
||||
def test_profile_flag_sets_env(self, profiles_home, profiles_mod):
|
||||
profiles_mod.create_profile("test-pre")
|
||||
expected = str(profiles_home / ".hermes" / "profiles" / "test-pre")
|
||||
|
||||
from hermes_cli.profiles import resolve_profile_env
|
||||
result = resolve_profile_env("test-pre")
|
||||
assert result == expected
|
||||
|
||||
def test_default_profile_resolves_to_hermes_home(self, profiles_home, profiles_mod):
|
||||
from hermes_cli.profiles import resolve_profile_env
|
||||
result = resolve_profile_env("default")
|
||||
assert result == str(profiles_home / ".hermes")
|
||||
Reference in New Issue
Block a user