mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-02 08:47:26 +08:00
Compare commits
6 Commits
fix/analyt
...
hermes/cur
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13ca9ee665 | ||
|
|
83b22637af | ||
|
|
0be51452fa | ||
|
|
76df76477f | ||
|
|
f40ccece11 | ||
|
|
9dd59cb637 |
456
agent/curator.py
Normal file
456
agent/curator.py
Normal file
@@ -0,0 +1,456 @@
|
||||
"""Curator — background skill maintenance orchestrator.
|
||||
|
||||
The curator is an auxiliary-model task that periodically reviews agent-created
|
||||
skills and maintains the collection. It runs inactivity-triggered (no cron
|
||||
daemon): when the agent is idle and the last curator run was longer than
|
||||
``interval_hours`` ago, ``maybe_run_curator()`` spawns a forked AIAgent to do
|
||||
the review.
|
||||
|
||||
Responsibilities:
|
||||
- Auto-transition lifecycle states based on last_used_at timestamps
|
||||
- Spawn a background review agent that can pin / archive / consolidate /
|
||||
patch agent-created skills via skill_manage
|
||||
- Persist curator state (last_run_at, paused, etc.) in .curator_state
|
||||
|
||||
Strict invariants:
|
||||
- Only touches agent-created skills (see tools/skill_usage.is_agent_created)
|
||||
- Never auto-deletes — only archives. Archive is recoverable.
|
||||
- Pinned skills bypass all auto-transitions
|
||||
- Uses the auxiliary client; never touches the main session's prompt cache
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from tools import skill_usage
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
DEFAULT_INTERVAL_HOURS = 24 * 7 # 7 days
|
||||
DEFAULT_MIN_IDLE_HOURS = 2
|
||||
DEFAULT_STALE_AFTER_DAYS = 30
|
||||
DEFAULT_ARCHIVE_AFTER_DAYS = 90
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# .curator_state — persistent scheduler + status
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _state_file() -> Path:
|
||||
return get_hermes_home() / "skills" / ".curator_state"
|
||||
|
||||
|
||||
def _default_state() -> Dict[str, Any]:
|
||||
return {
|
||||
"last_run_at": None,
|
||||
"last_run_duration_seconds": None,
|
||||
"last_run_summary": None,
|
||||
"paused": False,
|
||||
"run_count": 0,
|
||||
}
|
||||
|
||||
|
||||
def load_state() -> Dict[str, Any]:
|
||||
path = _state_file()
|
||||
if not path.exists():
|
||||
return _default_state()
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
if isinstance(data, dict):
|
||||
base = _default_state()
|
||||
base.update({k: v for k, v in data.items() if k in base or k.startswith("_")})
|
||||
return base
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
logger.debug("Failed to read curator state: %s", e)
|
||||
return _default_state()
|
||||
|
||||
|
||||
def save_state(data: Dict[str, Any]) -> None:
|
||||
path = _state_file()
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp = tempfile.mkstemp(dir=str(path.parent), prefix=".curator_state_", suffix=".tmp")
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, path)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.debug("Failed to save curator state: %s", e, exc_info=True)
|
||||
|
||||
|
||||
def set_paused(paused: bool) -> None:
|
||||
state = load_state()
|
||||
state["paused"] = bool(paused)
|
||||
save_state(state)
|
||||
|
||||
|
||||
def is_paused() -> bool:
|
||||
return bool(load_state().get("paused"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config access
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _load_config() -> Dict[str, Any]:
|
||||
"""Read curator.* config from ~/.hermes/config.yaml. Tolerates missing file."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
except Exception as e:
|
||||
logger.debug("Failed to load config for curator: %s", e)
|
||||
return {}
|
||||
if not isinstance(cfg, dict):
|
||||
return {}
|
||||
cur = cfg.get("curator") or {}
|
||||
if not isinstance(cur, dict):
|
||||
return {}
|
||||
return cur
|
||||
|
||||
|
||||
def is_enabled() -> bool:
|
||||
"""Default ON when no config says otherwise."""
|
||||
cfg = _load_config()
|
||||
return bool(cfg.get("enabled", True))
|
||||
|
||||
|
||||
def get_interval_hours() -> int:
|
||||
cfg = _load_config()
|
||||
try:
|
||||
return int(cfg.get("interval_hours", DEFAULT_INTERVAL_HOURS))
|
||||
except (TypeError, ValueError):
|
||||
return DEFAULT_INTERVAL_HOURS
|
||||
|
||||
|
||||
def get_min_idle_hours() -> float:
|
||||
cfg = _load_config()
|
||||
try:
|
||||
return float(cfg.get("min_idle_hours", DEFAULT_MIN_IDLE_HOURS))
|
||||
except (TypeError, ValueError):
|
||||
return DEFAULT_MIN_IDLE_HOURS
|
||||
|
||||
|
||||
def get_stale_after_days() -> int:
|
||||
cfg = _load_config()
|
||||
try:
|
||||
return int(cfg.get("stale_after_days", DEFAULT_STALE_AFTER_DAYS))
|
||||
except (TypeError, ValueError):
|
||||
return DEFAULT_STALE_AFTER_DAYS
|
||||
|
||||
|
||||
def get_archive_after_days() -> int:
|
||||
cfg = _load_config()
|
||||
try:
|
||||
return int(cfg.get("archive_after_days", DEFAULT_ARCHIVE_AFTER_DAYS))
|
||||
except (TypeError, ValueError):
|
||||
return DEFAULT_ARCHIVE_AFTER_DAYS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Idle / interval check
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _parse_iso(ts: Optional[str]) -> Optional[datetime]:
|
||||
if not ts:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(ts)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def should_run_now(now: Optional[datetime] = None) -> bool:
|
||||
"""Return True if the curator should run immediately.
|
||||
|
||||
Gates:
|
||||
- curator.enabled == True
|
||||
- not paused
|
||||
- last_run_at missing, OR older than interval_hours
|
||||
|
||||
The idle check (min_idle_hours) is applied at the call site where we know
|
||||
whether an agent is actively running — here we only enforce the static
|
||||
gates.
|
||||
"""
|
||||
if not is_enabled():
|
||||
return False
|
||||
if is_paused():
|
||||
return False
|
||||
|
||||
state = load_state()
|
||||
last = _parse_iso(state.get("last_run_at"))
|
||||
if last is None:
|
||||
return True
|
||||
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
if last.tzinfo is None:
|
||||
last = last.replace(tzinfo=timezone.utc)
|
||||
interval = timedelta(hours=get_interval_hours())
|
||||
return (now - last) >= interval
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Automatic state transitions (pure function, no LLM)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def apply_automatic_transitions(now: Optional[datetime] = None) -> Dict[str, int]:
|
||||
"""Walk every agent-created skill and move active/stale/archived based on
|
||||
last_used_at. Pinned skills are never touched. Returns a counter dict
|
||||
describing what changed."""
|
||||
from tools import skill_usage as _u
|
||||
|
||||
if now is None:
|
||||
now = datetime.now(timezone.utc)
|
||||
stale_cutoff = now - timedelta(days=get_stale_after_days())
|
||||
archive_cutoff = now - timedelta(days=get_archive_after_days())
|
||||
|
||||
counts = {"marked_stale": 0, "archived": 0, "reactivated": 0, "checked": 0}
|
||||
|
||||
for row in _u.agent_created_report():
|
||||
counts["checked"] += 1
|
||||
name = row["name"]
|
||||
if row.get("pinned"):
|
||||
continue
|
||||
|
||||
last_used = _parse_iso(row.get("last_used_at"))
|
||||
# If never used, treat as using created_at as the anchor so new skills
|
||||
# don't immediately archive themselves.
|
||||
anchor = last_used or _parse_iso(row.get("created_at")) or now
|
||||
if anchor.tzinfo is None:
|
||||
anchor = anchor.replace(tzinfo=timezone.utc)
|
||||
|
||||
current = row.get("state", _u.STATE_ACTIVE)
|
||||
|
||||
if anchor <= archive_cutoff and current != _u.STATE_ARCHIVED:
|
||||
ok, _msg = _u.archive_skill(name)
|
||||
if ok:
|
||||
counts["archived"] += 1
|
||||
elif anchor <= stale_cutoff and current == _u.STATE_ACTIVE:
|
||||
_u.set_state(name, _u.STATE_STALE)
|
||||
counts["marked_stale"] += 1
|
||||
elif anchor > stale_cutoff and current == _u.STATE_STALE:
|
||||
# Skill got used again after being marked stale — reactivate.
|
||||
_u.set_state(name, _u.STATE_ACTIVE)
|
||||
counts["reactivated"] += 1
|
||||
|
||||
return counts
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Review prompt for the forked agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
CURATOR_REVIEW_PROMPT = (
|
||||
"You are running as Hermes' background skill CURATOR.\n\n"
|
||||
"Your job is to maintain the collection of AGENT-CREATED skills. Review "
|
||||
"each candidate below and decide what to do.\n\n"
|
||||
"Rules — all load-bearing, do not violate:\n"
|
||||
"1. You MUST NOT touch bundled or hub-installed skills. The candidate list "
|
||||
"below is already filtered to agent-created skills only.\n"
|
||||
"2. You MUST NOT delete any skill. Archiving (moving the skill's directory "
|
||||
"into ~/.hermes/skills/.archive/) is the maximum action. Archives are "
|
||||
"recoverable; deletion is not.\n"
|
||||
"3. You MUST NOT touch skills shown as pinned=yes. Skip them.\n"
|
||||
"4. Prefer GENERALIZING overlapping skills by patching the stronger one "
|
||||
"and archiving the weaker, rather than leaving two narrow skills in the "
|
||||
"collection.\n\n"
|
||||
"Your toolset:\n"
|
||||
" - skills_list, skill_view — read the current landscape\n"
|
||||
" - skill_manage action=patch — fix stale commands, wrong paths, or "
|
||||
"merge two overlapping skills by broadening the stronger one\n"
|
||||
" - terminal — move a skill directory into the archive, "
|
||||
"e.g. mv ~/.hermes/skills/<skill-dir> ~/.hermes/skills/.archive/\n\n"
|
||||
"For each candidate, decide one of:\n"
|
||||
" keep — leave as-is (most common default; don't over-curate)\n"
|
||||
" patch — skill_manage action=patch to fix stale commands, wrong "
|
||||
"paths, or env-specific claims that are no longer true\n"
|
||||
" consolidate — two skills overlap: patch the stronger one to absorb "
|
||||
"the weaker (skill_manage), then mv the weaker directory to .archive/\n"
|
||||
" archive — the skill is genuinely obsolete and has not been used "
|
||||
"recently: mv its directory to ~/.hermes/skills/.archive/\n\n"
|
||||
"Start by calling skills_list and skill_view on anything you consider "
|
||||
"patching or consolidating. Be conservative — if in doubt, keep. "
|
||||
"When you are done, write a one-sentence summary of what you changed."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Orchestrator — spawn a forked AIAgent for the LLM review pass
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _render_candidate_list() -> str:
|
||||
"""Human/agent-readable list of agent-created skills with usage stats."""
|
||||
rows = skill_usage.agent_created_report()
|
||||
if not rows:
|
||||
return "No agent-created skills to review."
|
||||
lines = [f"Agent-created skills ({len(rows)}):\n"]
|
||||
for r in rows:
|
||||
lines.append(
|
||||
f"- {r['name']} "
|
||||
f"state={r['state']} "
|
||||
f"pinned={'yes' if r.get('pinned') else 'no'} "
|
||||
f"use={r.get('use_count', 0)} "
|
||||
f"view={r.get('view_count', 0)} "
|
||||
f"patches={r.get('patch_count', 0)} "
|
||||
f"last_used={r.get('last_used_at') or 'never'}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def run_curator_review(
|
||||
on_summary: Optional[Callable[[str], None]] = None,
|
||||
synchronous: bool = False,
|
||||
) -> Dict[str, Any]:
|
||||
"""Execute a single curator review pass.
|
||||
|
||||
Steps:
|
||||
1. Apply automatic state transitions (pure, no LLM).
|
||||
2. If there are agent-created skills, spawn a forked AIAgent that runs
|
||||
the LLM review prompt against the current candidate list.
|
||||
3. Update .curator_state with last_run_at and a one-line summary.
|
||||
4. Invoke *on_summary* with a user-visible description.
|
||||
|
||||
If *synchronous* is True, the LLM review runs in the calling thread; the
|
||||
default is to spawn a daemon thread so the caller returns immediately.
|
||||
"""
|
||||
start = datetime.now(timezone.utc)
|
||||
counts = apply_automatic_transitions(now=start)
|
||||
|
||||
auto_summary_parts = []
|
||||
if counts["marked_stale"]:
|
||||
auto_summary_parts.append(f"{counts['marked_stale']} marked stale")
|
||||
if counts["archived"]:
|
||||
auto_summary_parts.append(f"{counts['archived']} archived")
|
||||
if counts["reactivated"]:
|
||||
auto_summary_parts.append(f"{counts['reactivated']} reactivated")
|
||||
auto_summary = ", ".join(auto_summary_parts) if auto_summary_parts else "no changes"
|
||||
|
||||
# Persist state before the LLM pass so a crash mid-review still records
|
||||
# the run and doesn't immediately re-trigger.
|
||||
state = load_state()
|
||||
state["last_run_at"] = start.isoformat()
|
||||
state["run_count"] = int(state.get("run_count", 0)) + 1
|
||||
state["last_run_summary"] = f"auto: {auto_summary}"
|
||||
save_state(state)
|
||||
|
||||
def _llm_pass():
|
||||
nonlocal auto_summary
|
||||
try:
|
||||
candidate_list = _render_candidate_list()
|
||||
if "No agent-created skills" in candidate_list:
|
||||
final_summary = f"auto: {auto_summary}; llm: skipped (no candidates)"
|
||||
else:
|
||||
prompt = f"{CURATOR_REVIEW_PROMPT}\n\n{candidate_list}"
|
||||
llm_summary = _run_llm_review(prompt)
|
||||
final_summary = f"auto: {auto_summary}; llm: {llm_summary}"
|
||||
except Exception as e:
|
||||
logger.debug("Curator LLM pass failed: %s", e, exc_info=True)
|
||||
final_summary = f"auto: {auto_summary}; llm: error ({e})"
|
||||
|
||||
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
|
||||
state2 = load_state()
|
||||
state2["last_run_duration_seconds"] = elapsed
|
||||
state2["last_run_summary"] = final_summary
|
||||
save_state(state2)
|
||||
|
||||
if on_summary:
|
||||
try:
|
||||
on_summary(f"curator: {final_summary}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if synchronous:
|
||||
_llm_pass()
|
||||
else:
|
||||
t = threading.Thread(target=_llm_pass, daemon=True, name="curator-review")
|
||||
t.start()
|
||||
|
||||
return {
|
||||
"started_at": start.isoformat(),
|
||||
"auto_transitions": counts,
|
||||
"summary_so_far": auto_summary,
|
||||
}
|
||||
|
||||
|
||||
def _run_llm_review(prompt: str) -> str:
|
||||
"""Spawn an AIAgent fork to run the curator review prompt. Returns a short
|
||||
summary of what the model said in its final response."""
|
||||
import contextlib
|
||||
try:
|
||||
from run_agent import AIAgent
|
||||
except Exception as e:
|
||||
return f"AIAgent import failed: {e}"
|
||||
|
||||
review_agent = None
|
||||
try:
|
||||
with open(os.devnull, "w") as _devnull, \
|
||||
contextlib.redirect_stdout(_devnull), \
|
||||
contextlib.redirect_stderr(_devnull):
|
||||
review_agent = AIAgent(
|
||||
max_iterations=8,
|
||||
quiet_mode=True,
|
||||
platform="curator",
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
)
|
||||
# Disable recursive nudges — the curator must never spawn its own review.
|
||||
review_agent._memory_nudge_interval = 0
|
||||
review_agent._skill_nudge_interval = 0
|
||||
|
||||
result = review_agent.run_conversation(user_message=prompt)
|
||||
|
||||
final = ""
|
||||
if isinstance(result, dict):
|
||||
final = str(result.get("final_response") or "").strip()
|
||||
return (final[:240] + "…") if len(final) > 240 else (final or "no change")
|
||||
except Exception as e:
|
||||
return f"error: {e}"
|
||||
finally:
|
||||
if review_agent is not None:
|
||||
try:
|
||||
review_agent.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entrypoint for the session-start hook
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def maybe_run_curator(
|
||||
*,
|
||||
idle_for_seconds: Optional[float] = None,
|
||||
on_summary: Optional[Callable[[str], None]] = None,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Best-effort: run a curator pass if all gates pass. Returns the result
|
||||
dict if a pass was started, else None. Never raises."""
|
||||
try:
|
||||
if not should_run_now():
|
||||
return None
|
||||
# Idle gating: only enforce when the caller provided a measurement.
|
||||
if idle_for_seconds is not None:
|
||||
min_idle_s = get_min_idle_hours() * 3600.0
|
||||
if idle_for_seconds < min_idle_s:
|
||||
return None
|
||||
return run_curator_review(on_summary=on_summary)
|
||||
except Exception as e:
|
||||
logger.debug("maybe_run_curator failed: %s", e, exc_info=True)
|
||||
return None
|
||||
41
cli.py
41
cli.py
@@ -5818,7 +5818,29 @@ class HermesCLI:
|
||||
|
||||
print(f"(._.) Unknown cron command: {subcommand}")
|
||||
print(" Available: list, add, edit, pause, resume, run, remove")
|
||||
|
||||
|
||||
def _handle_curator_command(self, cmd: str):
|
||||
"""Handle /curator slash command.
|
||||
|
||||
Delegates to hermes_cli.curator so the CLI and the `hermes curator`
|
||||
subcommand share the same handler set.
|
||||
"""
|
||||
import shlex
|
||||
|
||||
tokens = shlex.split(cmd)[1:] if cmd else []
|
||||
if not tokens:
|
||||
tokens = ["status"]
|
||||
|
||||
try:
|
||||
from hermes_cli.curator import cli_main
|
||||
cli_main(tokens)
|
||||
except SystemExit:
|
||||
# argparse calls sys.exit() on --help or errors; swallow so we
|
||||
# don't kill the interactive session.
|
||||
pass
|
||||
except Exception as exc:
|
||||
print(f"(._.) curator: {exc}")
|
||||
|
||||
def _handle_skills_command(self, cmd: str):
|
||||
"""Handle /skills slash command — delegates to hermes_cli.skills_hub."""
|
||||
from hermes_cli.skills_hub import handle_skills_slash
|
||||
@@ -6055,6 +6077,8 @@ class HermesCLI:
|
||||
self.save_conversation()
|
||||
elif canonical == "cron":
|
||||
self._handle_cron_command(cmd_original)
|
||||
elif canonical == "curator":
|
||||
self._handle_curator_command(cmd_original)
|
||||
elif canonical == "skills":
|
||||
with self._busy_command(self._slow_command_status(cmd_original)):
|
||||
self._handle_skills_command(cmd_original)
|
||||
@@ -9002,6 +9026,21 @@ class HermesCLI:
|
||||
self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
|
||||
except Exception:
|
||||
pass # Tips are non-critical — never break startup
|
||||
|
||||
# Curator — kick off a background skill-maintenance pass on startup
|
||||
# if the schedule says we're due. Runs in a daemon thread so it
|
||||
# never blocks the interactive loop. Best-effort; any failure is
|
||||
# swallowed to avoid breaking session startup.
|
||||
try:
|
||||
from agent.curator import maybe_run_curator
|
||||
maybe_run_curator(
|
||||
idle_for_seconds=float("inf"), # CLI startup = fully idle
|
||||
on_summary=lambda msg: self._console_print(
|
||||
f"[dim #6b7684]💾 {msg}[/]"
|
||||
),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
if self.preloaded_skills and not self._startup_skills_line_shown:
|
||||
skills_label = ", ".join(self.preloaded_skills)
|
||||
self._console_print(
|
||||
|
||||
@@ -2054,6 +2054,7 @@ class GatewayRunner:
|
||||
|
||||
# Discover and load event hooks
|
||||
self.hooks.discover_and_load()
|
||||
|
||||
|
||||
# Recover background processes from checkpoint (crash recovery)
|
||||
try:
|
||||
@@ -10834,6 +10835,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
||||
|
||||
IMAGE_CACHE_EVERY = 60 # ticks — once per hour at default 60s interval
|
||||
CHANNEL_DIR_EVERY = 5 # ticks — every 5 minutes
|
||||
CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence)
|
||||
|
||||
logger.info("Cron ticker started (interval=%ds)", interval)
|
||||
tick_count = 0
|
||||
@@ -10866,6 +10868,21 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
|
||||
except Exception as e:
|
||||
logger.debug("Document cache cleanup error: %s", e)
|
||||
|
||||
# Curator — piggy-back on the existing cron ticker so long-running
|
||||
# gateways get weekly skill maintenance without needing restarts.
|
||||
# maybe_run_curator() is internally gated by config.interval_hours
|
||||
# (7 days by default), so CURATOR_EVERY is just the poll rate — the
|
||||
# real work only fires once per config interval.
|
||||
if tick_count % CURATOR_EVERY == 0:
|
||||
try:
|
||||
from agent.curator import maybe_run_curator
|
||||
maybe_run_curator(
|
||||
idle_for_seconds=float("inf"),
|
||||
on_summary=lambda msg: logger.info("curator: %s", msg),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug("Curator tick error: %s", e)
|
||||
|
||||
stop_event.wait(timeout=interval)
|
||||
logger.info("Cron ticker stopped")
|
||||
|
||||
|
||||
@@ -140,6 +140,9 @@ COMMAND_REGISTRY: list[CommandDef] = [
|
||||
CommandDef("cron", "Manage scheduled tasks", "Tools & Skills",
|
||||
cli_only=True, args_hint="[subcommand]",
|
||||
subcommands=("list", "add", "create", "edit", "pause", "resume", "run", "remove")),
|
||||
CommandDef("curator", "Background skill maintenance (status, run, pin, archive)",
|
||||
"Tools & Skills", args_hint="[subcommand]",
|
||||
subcommands=("status", "run", "pause", "resume", "pin", "unpin", "restore")),
|
||||
CommandDef("reload", "Reload .env variables into the running session", "Tools & Skills",
|
||||
cli_only=True),
|
||||
CommandDef("reload-mcp", "Reload MCP servers from config", "Tools & Skills",
|
||||
|
||||
@@ -823,6 +823,35 @@ DEFAULT_CONFIG = {
|
||||
"guard_agent_created": False,
|
||||
},
|
||||
|
||||
# Curator — background skill maintenance.
|
||||
#
|
||||
# Periodically reviews AGENT-CREATED skills (never bundled or
|
||||
# hub-installed) and keeps the collection tidy: marks long-unused skills
|
||||
# as stale, archives genuinely obsolete ones (archive only, never
|
||||
# deletes), and spawns a forked aux-model agent to consolidate overlaps
|
||||
# and patch drift. Runs inactivity-triggered from session start — no
|
||||
# cron daemon.
|
||||
#
|
||||
# See `hermes curator status` for the last run summary.
|
||||
"curator": {
|
||||
"enabled": True,
|
||||
# How long to wait between curator runs (hours). Default: 7 days.
|
||||
"interval_hours": 24 * 7,
|
||||
# Only run when the agent has been idle at least this long (hours).
|
||||
"min_idle_hours": 2,
|
||||
# Mark a skill as "stale" after this many days without use.
|
||||
"stale_after_days": 30,
|
||||
# Archive a skill (move to skills/.archive/) after this many days
|
||||
# without use. Archived skills are recoverable — no auto-deletion.
|
||||
"archive_after_days": 90,
|
||||
# Optional per-task override for the curator's aux model. Leave null
|
||||
# to use Hermes' main auxiliary client resolution.
|
||||
"auxiliary": {
|
||||
"provider": None,
|
||||
"model": None,
|
||||
},
|
||||
},
|
||||
|
||||
# Honcho AI-native memory -- reads ~/.honcho/config.json as single source of truth.
|
||||
# This section is only needed for hermes-specific overrides; everything else
|
||||
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.
|
||||
|
||||
232
hermes_cli/curator.py
Normal file
232
hermes_cli/curator.py
Normal file
@@ -0,0 +1,232 @@
|
||||
"""CLI subcommand: `hermes curator <subcommand>`.
|
||||
|
||||
Thin shell around agent/curator.py and tools/skill_usage.py. Renders a status
|
||||
table, triggers a run, pauses/resumes, and pins/unpins skills.
|
||||
|
||||
This module intentionally has no side effects at import time — main.py wires
|
||||
the argparse subparsers on demand.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def _fmt_ts(ts: Optional[str]) -> str:
|
||||
if not ts:
|
||||
return "never"
|
||||
try:
|
||||
dt = datetime.fromisoformat(ts)
|
||||
except (TypeError, ValueError):
|
||||
return str(ts)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
delta = datetime.now(timezone.utc) - dt
|
||||
secs = int(delta.total_seconds())
|
||||
if secs < 60:
|
||||
return f"{secs}s ago"
|
||||
if secs < 3600:
|
||||
return f"{secs // 60}m ago"
|
||||
if secs < 86400:
|
||||
return f"{secs // 3600}h ago"
|
||||
return f"{secs // 86400}d ago"
|
||||
|
||||
|
||||
def _cmd_status(args) -> int:
|
||||
from agent import curator
|
||||
from tools import skill_usage
|
||||
|
||||
state = curator.load_state()
|
||||
enabled = curator.is_enabled()
|
||||
paused = state.get("paused", False)
|
||||
last_run = state.get("last_run_at")
|
||||
summary = state.get("last_run_summary") or "(none)"
|
||||
runs = state.get("run_count", 0)
|
||||
|
||||
status_line = (
|
||||
"ENABLED" if enabled and not paused else
|
||||
"PAUSED" if paused else
|
||||
"DISABLED"
|
||||
)
|
||||
print(f"curator: {status_line}")
|
||||
print(f" runs: {runs}")
|
||||
print(f" last run: {_fmt_ts(last_run)}")
|
||||
print(f" last summary: {summary}")
|
||||
_ih = curator.get_interval_hours()
|
||||
_interval_label = (
|
||||
f"{_ih // 24}d" if _ih % 24 == 0 and _ih >= 24
|
||||
else f"{_ih}h"
|
||||
)
|
||||
print(f" interval: every {_interval_label}")
|
||||
print(f" stale after: {curator.get_stale_after_days()}d unused")
|
||||
print(f" archive after: {curator.get_archive_after_days()}d unused")
|
||||
|
||||
rows = skill_usage.agent_created_report()
|
||||
if not rows:
|
||||
print("\nno agent-created skills")
|
||||
return 0
|
||||
|
||||
by_state = {"active": [], "stale": [], "archived": []}
|
||||
pinned = []
|
||||
for r in rows:
|
||||
state_name = r.get("state", "active")
|
||||
by_state.setdefault(state_name, []).append(r)
|
||||
if r.get("pinned"):
|
||||
pinned.append(r["name"])
|
||||
|
||||
print(f"\nagent-created skills: {len(rows)} total")
|
||||
for state_name in ("active", "stale", "archived"):
|
||||
bucket = by_state.get(state_name, [])
|
||||
print(f" {state_name:10s} {len(bucket)}")
|
||||
|
||||
if pinned:
|
||||
print(f"\npinned ({len(pinned)}): {', '.join(pinned)}")
|
||||
|
||||
# Show top 5 least-recently-used active skills
|
||||
active = sorted(
|
||||
by_state.get("active", []),
|
||||
key=lambda r: r.get("last_used_at") or r.get("created_at") or "",
|
||||
)[:5]
|
||||
if active:
|
||||
print("\nleast recently used (top 5):")
|
||||
for r in active:
|
||||
last = _fmt_ts(r.get("last_used_at"))
|
||||
print(f" {r['name']:40s} use={r.get('use_count', 0):3d} last_used={last}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_run(args) -> int:
|
||||
from agent import curator
|
||||
if not curator.is_enabled():
|
||||
print("curator: disabled via config; enable with `curator.enabled: true`")
|
||||
return 1
|
||||
|
||||
print("curator: running review pass...")
|
||||
|
||||
def _on_summary(msg: str) -> None:
|
||||
print(msg)
|
||||
|
||||
result = curator.run_curator_review(
|
||||
on_summary=_on_summary,
|
||||
synchronous=bool(args.synchronous),
|
||||
)
|
||||
auto = result.get("auto_transitions", {})
|
||||
if auto:
|
||||
print(
|
||||
f"auto: checked={auto.get('checked', 0)} "
|
||||
f"stale={auto.get('marked_stale', 0)} "
|
||||
f"archived={auto.get('archived', 0)} "
|
||||
f"reactivated={auto.get('reactivated', 0)}"
|
||||
)
|
||||
if not args.synchronous:
|
||||
print("llm pass running in background — check `hermes curator status` later")
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_pause(args) -> int:
|
||||
from agent import curator
|
||||
curator.set_paused(True)
|
||||
print("curator: paused")
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_resume(args) -> int:
|
||||
from agent import curator
|
||||
curator.set_paused(False)
|
||||
print("curator: resumed")
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_pin(args) -> int:
|
||||
from tools import skill_usage
|
||||
if not skill_usage.is_agent_created(args.skill):
|
||||
print(
|
||||
f"curator: '{args.skill}' is bundled or hub-installed — cannot pin "
|
||||
"(only agent-created skills participate in curation)"
|
||||
)
|
||||
return 1
|
||||
skill_usage.set_pinned(args.skill, True)
|
||||
print(f"curator: pinned '{args.skill}' (will bypass auto-transitions)")
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_unpin(args) -> int:
|
||||
from tools import skill_usage
|
||||
if not skill_usage.is_agent_created(args.skill):
|
||||
print(
|
||||
f"curator: '{args.skill}' is bundled or hub-installed — "
|
||||
"there's nothing to unpin (curator only tracks agent-created skills)"
|
||||
)
|
||||
return 1
|
||||
skill_usage.set_pinned(args.skill, False)
|
||||
print(f"curator: unpinned '{args.skill}'")
|
||||
return 0
|
||||
|
||||
|
||||
def _cmd_restore(args) -> int:
|
||||
from tools import skill_usage
|
||||
ok, msg = skill_usage.restore_skill(args.skill)
|
||||
print(f"curator: {msg}")
|
||||
return 0 if ok else 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# argparse wiring (called from hermes_cli.main)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def register_cli(parent: argparse.ArgumentParser) -> None:
|
||||
"""Attach `curator` subcommands to *parent*.
|
||||
|
||||
main.py calls this with the ArgumentParser returned by
|
||||
``subparsers.add_parser("curator", ...)``.
|
||||
"""
|
||||
parent.set_defaults(func=lambda a: (parent.print_help(), 0)[1])
|
||||
subs = parent.add_subparsers(dest="curator_command")
|
||||
|
||||
p_status = subs.add_parser("status", help="Show curator status and skill stats")
|
||||
p_status.set_defaults(func=_cmd_status)
|
||||
|
||||
p_run = subs.add_parser("run", help="Trigger a curator review now")
|
||||
p_run.add_argument(
|
||||
"--sync", "--synchronous", dest="synchronous", action="store_true",
|
||||
help="Wait for the LLM review pass to finish (default: background thread)",
|
||||
)
|
||||
p_run.set_defaults(func=_cmd_run)
|
||||
|
||||
p_pause = subs.add_parser("pause", help="Pause the curator until resumed")
|
||||
p_pause.set_defaults(func=_cmd_pause)
|
||||
|
||||
p_resume = subs.add_parser("resume", help="Resume a paused curator")
|
||||
p_resume.set_defaults(func=_cmd_resume)
|
||||
|
||||
p_pin = subs.add_parser("pin", help="Pin a skill so the curator never auto-transitions it")
|
||||
p_pin.add_argument("skill", help="Skill name")
|
||||
p_pin.set_defaults(func=_cmd_pin)
|
||||
|
||||
p_unpin = subs.add_parser("unpin", help="Unpin a skill")
|
||||
p_unpin.add_argument("skill", help="Skill name")
|
||||
p_unpin.set_defaults(func=_cmd_unpin)
|
||||
|
||||
p_restore = subs.add_parser("restore", help="Restore an archived skill")
|
||||
p_restore.add_argument("skill", help="Skill name")
|
||||
p_restore.set_defaults(func=_cmd_restore)
|
||||
|
||||
|
||||
def cli_main(argv=None) -> int:
|
||||
"""Standalone entry (also usable by hermes_cli.main fallthrough)."""
|
||||
parser = argparse.ArgumentParser(prog="hermes curator")
|
||||
register_cli(parser)
|
||||
args = parser.parse_args(argv)
|
||||
fn = getattr(args, "func", None)
|
||||
if fn is None:
|
||||
parser.print_help()
|
||||
return 0
|
||||
return int(fn(args) or 0)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
sys.exit(cli_main())
|
||||
@@ -8644,6 +8644,26 @@ Examples:
|
||||
except Exception as _exc:
|
||||
logging.getLogger(__name__).debug("Plugin CLI discovery failed: %s", _exc)
|
||||
|
||||
# =========================================================================
|
||||
# curator command — background skill maintenance
|
||||
# =========================================================================
|
||||
curator_parser = subparsers.add_parser(
|
||||
"curator",
|
||||
help="Background skill maintenance (curator) — status, run, pause, pin",
|
||||
description=(
|
||||
"The curator is an auxiliary-model background task that "
|
||||
"periodically reviews agent-created skills, prunes stale ones, "
|
||||
"consolidates overlaps, and archives obsolete skills. "
|
||||
"Bundled and hub-installed skills are never touched. "
|
||||
"Archives are recoverable; auto-deletion never happens."
|
||||
),
|
||||
)
|
||||
try:
|
||||
from hermes_cli.curator import register_cli as _register_curator_cli
|
||||
_register_curator_cli(curator_parser)
|
||||
except Exception as _exc:
|
||||
logging.getLogger(__name__).debug("curator CLI wiring failed: %s", _exc)
|
||||
|
||||
# =========================================================================
|
||||
# memory command
|
||||
# =========================================================================
|
||||
|
||||
436
tests/agent/test_curator.py
Normal file
436
tests/agent/test_curator.py
Normal file
@@ -0,0 +1,436 @@
|
||||
"""Tests for agent/curator.py — orchestrator, idle gating, state transitions.
|
||||
|
||||
LLM spawning is never exercised here — `_run_llm_review` is monkeypatched so
|
||||
tests run fully offline and the curator module doesn't need real credentials.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def curator_env(tmp_path, monkeypatch):
|
||||
"""Isolated HERMES_HOME + freshly reloaded curator + skill_usage modules."""
|
||||
home = tmp_path / ".hermes"
|
||||
(home / "skills").mkdir(parents=True)
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
|
||||
import tools.skill_usage as usage
|
||||
importlib.reload(usage)
|
||||
import agent.curator as curator
|
||||
importlib.reload(curator)
|
||||
|
||||
# Neutralize the real LLM pass by default — tests opt in per-case.
|
||||
monkeypatch.setattr(curator, "_run_llm_review", lambda prompt: "llm-stub")
|
||||
|
||||
# Default: no config file → curator defaults. Tests can override.
|
||||
monkeypatch.setattr(curator, "_load_config", lambda: {})
|
||||
|
||||
return {"home": home, "curator": curator, "usage": usage}
|
||||
|
||||
|
||||
def _write_skill(skills_dir: Path, name: str):
|
||||
d = skills_dir / name
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
(d / "SKILL.md").write_text(
|
||||
f"---\nname: {name}\ndescription: x\n---\n", encoding="utf-8",
|
||||
)
|
||||
return d
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Config gates
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_curator_enabled_default_true(curator_env):
|
||||
assert curator_env["curator"].is_enabled() is True
|
||||
|
||||
|
||||
def test_curator_disabled_via_config(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
monkeypatch.setattr(c, "_load_config", lambda: {"enabled": False})
|
||||
assert c.is_enabled() is False
|
||||
assert c.should_run_now() is False
|
||||
|
||||
|
||||
def test_curator_defaults(curator_env):
|
||||
c = curator_env["curator"]
|
||||
assert c.get_interval_hours() == 24 * 7 # 7 days
|
||||
assert c.get_min_idle_hours() == 2
|
||||
assert c.get_stale_after_days() == 30
|
||||
assert c.get_archive_after_days() == 90
|
||||
|
||||
|
||||
def test_curator_config_overrides(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
monkeypatch.setattr(c, "_load_config", lambda: {
|
||||
"interval_hours": 12,
|
||||
"min_idle_hours": 0.5,
|
||||
"stale_after_days": 7,
|
||||
"archive_after_days": 60,
|
||||
})
|
||||
assert c.get_interval_hours() == 12
|
||||
assert c.get_min_idle_hours() == 0.5
|
||||
assert c.get_stale_after_days() == 7
|
||||
assert c.get_archive_after_days() == 60
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# should_run_now
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_first_run_always_eligible(curator_env):
|
||||
c = curator_env["curator"]
|
||||
assert c.should_run_now() is True
|
||||
|
||||
|
||||
def test_recent_run_blocks(curator_env):
|
||||
c = curator_env["curator"]
|
||||
c.save_state({
|
||||
"last_run_at": datetime.now(timezone.utc).isoformat(),
|
||||
"paused": False,
|
||||
})
|
||||
assert c.should_run_now() is False
|
||||
|
||||
|
||||
def test_old_run_eligible(curator_env):
|
||||
"""A run older than the configured interval should re-trigger. Use a
|
||||
2x-interval cushion so the test doesn't become coupled to the exact
|
||||
default — bumping DEFAULT_INTERVAL_HOURS shouldn't break it."""
|
||||
c = curator_env["curator"]
|
||||
long_ago = datetime.now(timezone.utc) - timedelta(
|
||||
hours=c.get_interval_hours() * 2
|
||||
)
|
||||
c.save_state({"last_run_at": long_ago.isoformat(), "paused": False})
|
||||
assert c.should_run_now() is True
|
||||
|
||||
|
||||
def test_paused_blocks_even_if_stale(curator_env):
|
||||
c = curator_env["curator"]
|
||||
long_ago = datetime.now(timezone.utc) - timedelta(days=30)
|
||||
c.save_state({"last_run_at": long_ago.isoformat(), "paused": True})
|
||||
assert c.should_run_now() is False
|
||||
|
||||
|
||||
def test_set_paused_roundtrip(curator_env):
|
||||
c = curator_env["curator"]
|
||||
c.set_paused(True)
|
||||
assert c.is_paused() is True
|
||||
c.set_paused(False)
|
||||
assert c.is_paused() is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Automatic state transitions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_unused_skill_transitions_to_stale(curator_env):
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "old-skill")
|
||||
|
||||
# Record last-use well past stale_after_days (30 default)
|
||||
long_ago = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
|
||||
data = u.load_usage()
|
||||
data["old-skill"] = u._empty_record()
|
||||
data["old-skill"]["last_used_at"] = long_ago
|
||||
data["old-skill"]["created_at"] = long_ago
|
||||
u.save_usage(data)
|
||||
|
||||
counts = c.apply_automatic_transitions()
|
||||
assert counts["marked_stale"] == 1
|
||||
assert u.get_record("old-skill")["state"] == "stale"
|
||||
|
||||
|
||||
def test_very_old_skill_gets_archived(curator_env):
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
skill_dir = _write_skill(skills_dir, "ancient")
|
||||
|
||||
super_old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
|
||||
data = u.load_usage()
|
||||
data["ancient"] = u._empty_record()
|
||||
data["ancient"]["last_used_at"] = super_old
|
||||
data["ancient"]["created_at"] = super_old
|
||||
u.save_usage(data)
|
||||
|
||||
counts = c.apply_automatic_transitions()
|
||||
assert counts["archived"] == 1
|
||||
assert not skill_dir.exists()
|
||||
assert (skills_dir / ".archive" / "ancient" / "SKILL.md").exists()
|
||||
assert u.get_record("ancient")["state"] == "archived"
|
||||
|
||||
|
||||
def test_pinned_skill_is_never_touched(curator_env):
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "precious")
|
||||
|
||||
super_old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
|
||||
data = u.load_usage()
|
||||
data["precious"] = u._empty_record()
|
||||
data["precious"]["last_used_at"] = super_old
|
||||
data["precious"]["created_at"] = super_old
|
||||
data["precious"]["pinned"] = True
|
||||
u.save_usage(data)
|
||||
|
||||
counts = c.apply_automatic_transitions()
|
||||
assert counts["archived"] == 0
|
||||
assert counts["marked_stale"] == 0
|
||||
rec = u.get_record("precious")
|
||||
assert rec["state"] == "active" # untouched
|
||||
assert rec["pinned"] is True
|
||||
|
||||
|
||||
def test_stale_skill_reactivates_on_recent_use(curator_env):
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "revived")
|
||||
|
||||
recent = datetime.now(timezone.utc).isoformat()
|
||||
data = u.load_usage()
|
||||
data["revived"] = u._empty_record()
|
||||
data["revived"]["state"] = "stale"
|
||||
data["revived"]["last_used_at"] = recent
|
||||
data["revived"]["created_at"] = recent
|
||||
u.save_usage(data)
|
||||
|
||||
counts = c.apply_automatic_transitions()
|
||||
assert counts["reactivated"] == 1
|
||||
assert u.get_record("revived")["state"] == "active"
|
||||
|
||||
|
||||
def test_new_skill_without_last_used_not_immediately_archived(curator_env):
|
||||
"""A freshly-created skill with no use history should not get archived
|
||||
just because last_used_at is None."""
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "fresh")
|
||||
|
||||
# Bump nothing — record doesn't exist yet. Curator should create it
|
||||
# and fall back to created_at which is ~now.
|
||||
counts = c.apply_automatic_transitions()
|
||||
assert counts["archived"] == 0
|
||||
assert counts["marked_stale"] == 0
|
||||
assert (skills_dir / "fresh").exists()
|
||||
|
||||
|
||||
def test_bundled_skill_not_touched_by_transitions(curator_env):
|
||||
c = curator_env["curator"]
|
||||
u = curator_env["usage"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "bundled")
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"bundled:abc\n", encoding="utf-8",
|
||||
)
|
||||
|
||||
super_old = (datetime.now(timezone.utc) - timedelta(days=500)).isoformat()
|
||||
data = u.load_usage()
|
||||
data["bundled"] = u._empty_record()
|
||||
data["bundled"]["last_used_at"] = super_old
|
||||
u.save_usage(data)
|
||||
|
||||
counts = c.apply_automatic_transitions()
|
||||
# bundled skills are excluded from the agent-created list entirely
|
||||
assert counts["checked"] == 0
|
||||
assert (skills_dir / "bundled").exists() # never moved
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_curator_review orchestration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_run_review_records_state(curator_env):
|
||||
c = curator_env["curator"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "a")
|
||||
|
||||
result = c.run_curator_review(synchronous=True)
|
||||
assert "started_at" in result
|
||||
state = c.load_state()
|
||||
assert state["last_run_at"] is not None
|
||||
assert state["run_count"] >= 1
|
||||
assert state["last_run_summary"] is not None
|
||||
|
||||
|
||||
def test_run_review_synchronous_invokes_llm_stub(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "a")
|
||||
|
||||
calls = []
|
||||
monkeypatch.setattr(
|
||||
c, "_run_llm_review",
|
||||
lambda prompt: (calls.append(prompt), "stubbed-summary")[1],
|
||||
)
|
||||
|
||||
captured = []
|
||||
c.run_curator_review(on_summary=lambda s: captured.append(s), synchronous=True)
|
||||
|
||||
assert len(calls) == 1
|
||||
assert "skill CURATOR" in calls[0] or "CURATOR" in calls[0]
|
||||
assert captured # on_summary was called
|
||||
assert any("stubbed-summary" in s for s in captured)
|
||||
|
||||
|
||||
def test_run_review_skips_llm_when_no_candidates(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
# No skills in the dir → no candidates
|
||||
calls = []
|
||||
monkeypatch.setattr(
|
||||
c, "_run_llm_review",
|
||||
lambda prompt: (calls.append(prompt), "never-called")[1],
|
||||
)
|
||||
|
||||
captured = []
|
||||
c.run_curator_review(on_summary=lambda s: captured.append(s), synchronous=True)
|
||||
|
||||
assert calls == [] # LLM not invoked
|
||||
assert any("skipped" in s for s in captured)
|
||||
|
||||
|
||||
def test_maybe_run_curator_respects_disabled(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
monkeypatch.setattr(c, "_load_config", lambda: {"enabled": False})
|
||||
result = c.maybe_run_curator()
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_maybe_run_curator_enforces_idle_gate(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
monkeypatch.setattr(c, "_load_config", lambda: {"min_idle_hours": 2})
|
||||
# idle less than the threshold
|
||||
result = c.maybe_run_curator(idle_for_seconds=60.0)
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_maybe_run_curator_runs_when_eligible(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "a")
|
||||
# Force idle over threshold
|
||||
result = c.maybe_run_curator(idle_for_seconds=99999.0)
|
||||
assert result is not None
|
||||
assert "started_at" in result
|
||||
|
||||
|
||||
def test_maybe_run_curator_swallows_exceptions(curator_env, monkeypatch):
|
||||
c = curator_env["curator"]
|
||||
|
||||
def explode():
|
||||
raise RuntimeError("boom")
|
||||
|
||||
monkeypatch.setattr(c, "should_run_now", explode)
|
||||
# Must not raise
|
||||
assert c.maybe_run_curator() is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_state_file_survives_corrupt_read(curator_env):
|
||||
c = curator_env["curator"]
|
||||
c._state_file().write_text("not json", encoding="utf-8")
|
||||
# Must fall back to default, not raise
|
||||
assert c.load_state() == c._default_state()
|
||||
|
||||
|
||||
def test_state_atomic_write_no_tmp_leftovers(curator_env):
|
||||
c = curator_env["curator"]
|
||||
c.save_state({"paused": True})
|
||||
parent = c._state_file().parent
|
||||
for p in parent.iterdir():
|
||||
assert not p.name.startswith(".curator_state_"), f"tmp leftover: {p.name}"
|
||||
|
||||
|
||||
def test_curator_review_prompt_has_invariants():
|
||||
"""Core invariants must be in the review prompt text."""
|
||||
from agent.curator import CURATOR_REVIEW_PROMPT
|
||||
assert "MUST NOT" in CURATOR_REVIEW_PROMPT
|
||||
assert "bundled" in CURATOR_REVIEW_PROMPT.lower()
|
||||
assert "delete" in CURATOR_REVIEW_PROMPT.lower()
|
||||
assert "pinned" in CURATOR_REVIEW_PROMPT.lower()
|
||||
# Must mention the decisions the reviewer can make
|
||||
for verb in ("keep", "patch", "archive", "consolidate"):
|
||||
assert verb in CURATOR_REVIEW_PROMPT.lower()
|
||||
|
||||
|
||||
def test_curator_review_prompt_points_at_existing_tools_only():
|
||||
"""The review prompt must rely on existing tools (skill_manage + terminal)
|
||||
and must NOT reference bespoke curator tools that are not registered
|
||||
model tools."""
|
||||
from agent.curator import CURATOR_REVIEW_PROMPT
|
||||
assert "skill_manage" in CURATOR_REVIEW_PROMPT
|
||||
assert "skills_list" in CURATOR_REVIEW_PROMPT
|
||||
assert "skill_view" in CURATOR_REVIEW_PROMPT
|
||||
assert "terminal" in CURATOR_REVIEW_PROMPT.lower()
|
||||
# These would be nice but aren't actually registered as tools — the
|
||||
# curator uses skill_manage + terminal mv instead.
|
||||
assert "archive_skill" not in CURATOR_REVIEW_PROMPT
|
||||
assert "pin_skill" not in CURATOR_REVIEW_PROMPT
|
||||
|
||||
|
||||
def test_curator_does_not_instruct_model_to_pin():
|
||||
"""Pinning is a user opt-out, not a model decision. The prompt should
|
||||
not tell the reviewer to pin skills autonomously."""
|
||||
from agent.curator import CURATOR_REVIEW_PROMPT
|
||||
# "pinned" appears in the invariant ("skip pinned skills"), but "pin"
|
||||
# as a decision verb should not.
|
||||
lines = CURATOR_REVIEW_PROMPT.split("\n")
|
||||
decision_block = "\n".join(
|
||||
l for l in lines
|
||||
if l.strip().startswith(("keep", "patch", "archive", "consolidate", "pin "))
|
||||
)
|
||||
# No standalone "pin" action line
|
||||
assert not any(l.strip().startswith("pin ") for l in lines), (
|
||||
f"Found a pin action line in:\n{decision_block}"
|
||||
)
|
||||
|
||||
|
||||
|
||||
def test_cli_unpin_refuses_bundled_skill(curator_env, capsys):
|
||||
"""hermes curator unpin must refuse bundled/hub skills too (matches pin)."""
|
||||
from hermes_cli import curator as cli
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "ship-skill")
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"ship-skill:abc\n", encoding="utf-8",
|
||||
)
|
||||
|
||||
class _A:
|
||||
skill = "ship-skill"
|
||||
|
||||
rc = cli._cmd_unpin(_A())
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 1
|
||||
assert "bundled" in captured.out.lower() or "hub" in captured.out.lower()
|
||||
|
||||
|
||||
def test_cli_pin_refuses_bundled_skill(curator_env, capsys):
|
||||
from hermes_cli import curator as cli
|
||||
skills_dir = curator_env["home"] / "skills"
|
||||
_write_skill(skills_dir, "ship-skill")
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"ship-skill:abc\n", encoding="utf-8",
|
||||
)
|
||||
|
||||
class _A:
|
||||
skill = "ship-skill"
|
||||
|
||||
rc = cli._cmd_pin(_A())
|
||||
captured = capsys.readouterr()
|
||||
assert rc == 1
|
||||
assert "bundled" in captured.out.lower() or "hub" in captured.out.lower()
|
||||
487
tests/tools/test_skill_usage.py
Normal file
487
tests/tools/test_skill_usage.py
Normal file
@@ -0,0 +1,487 @@
|
||||
"""Tests for tools/skill_usage.py — sidecar telemetry + provenance filtering."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def skills_home(tmp_path, monkeypatch):
|
||||
"""Isolated HERMES_HOME with a clean skills/ dir for each test."""
|
||||
home = tmp_path / ".hermes"
|
||||
home.mkdir()
|
||||
(home / "skills").mkdir()
|
||||
monkeypatch.setattr(Path, "home", lambda: tmp_path)
|
||||
monkeypatch.setenv("HERMES_HOME", str(home))
|
||||
# Force skill_usage module to re-resolve paths per test
|
||||
import importlib
|
||||
import tools.skill_usage as mod
|
||||
importlib.reload(mod)
|
||||
return home
|
||||
|
||||
|
||||
def _write_skill(skills_dir: Path, name: str, category: str = ""):
|
||||
"""Create a minimal SKILL.md with a name: frontmatter field."""
|
||||
if category:
|
||||
d = skills_dir / category / name
|
||||
else:
|
||||
d = skills_dir / name
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
(d / "SKILL.md").write_text(
|
||||
f"""---
|
||||
name: {name}
|
||||
description: test skill
|
||||
---
|
||||
|
||||
# body
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
return d
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round-trip
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_empty_usage_returns_empty_dict(skills_home):
|
||||
from tools.skill_usage import load_usage
|
||||
assert load_usage() == {}
|
||||
|
||||
|
||||
def test_save_and_load_roundtrip(skills_home):
|
||||
from tools.skill_usage import load_usage, save_usage
|
||||
data = {"skill-a": {"use_count": 3, "state": "active"}}
|
||||
save_usage(data)
|
||||
loaded = load_usage()
|
||||
assert loaded["skill-a"]["use_count"] == 3
|
||||
assert loaded["skill-a"]["state"] == "active"
|
||||
|
||||
|
||||
def test_save_is_atomic_no_partial_tmp_files(skills_home):
|
||||
from tools.skill_usage import save_usage, _usage_file
|
||||
save_usage({"x": {"use_count": 1}})
|
||||
skills_dir = _usage_file().parent
|
||||
# No leftover tempfile
|
||||
for p in skills_dir.iterdir():
|
||||
assert not p.name.startswith(".usage_"), f"leftover tmp: {p.name}"
|
||||
|
||||
|
||||
def test_get_record_missing_returns_empty_record(skills_home):
|
||||
from tools.skill_usage import get_record
|
||||
rec = get_record("nonexistent")
|
||||
assert rec["use_count"] == 0
|
||||
assert rec["view_count"] == 0
|
||||
assert rec["state"] == "active"
|
||||
assert rec["pinned"] is False
|
||||
assert rec["archived_at"] is None
|
||||
|
||||
|
||||
def test_get_record_backfills_missing_keys(skills_home):
|
||||
from tools.skill_usage import get_record, save_usage
|
||||
save_usage({"legacy": {"use_count": 5}}) # old-format record
|
||||
rec = get_record("legacy")
|
||||
assert rec["use_count"] == 5
|
||||
assert "view_count" in rec # backfilled
|
||||
assert "state" in rec
|
||||
|
||||
|
||||
def test_load_usage_handles_corrupt_file(skills_home):
|
||||
from tools.skill_usage import load_usage, _usage_file
|
||||
_usage_file().write_text("{ not json }", encoding="utf-8")
|
||||
assert load_usage() == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Counter bumps
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_bump_view_increments_and_timestamps(skills_home):
|
||||
from tools.skill_usage import bump_view, get_record
|
||||
bump_view("my-skill")
|
||||
bump_view("my-skill")
|
||||
rec = get_record("my-skill")
|
||||
assert rec["view_count"] == 2
|
||||
assert rec["last_viewed_at"] is not None
|
||||
|
||||
|
||||
def test_bump_use_increments_and_timestamps(skills_home):
|
||||
from tools.skill_usage import bump_use, get_record
|
||||
bump_use("my-skill")
|
||||
rec = get_record("my-skill")
|
||||
assert rec["use_count"] == 1
|
||||
assert rec["last_used_at"] is not None
|
||||
|
||||
|
||||
def test_bump_patch_increments_and_timestamps(skills_home):
|
||||
from tools.skill_usage import bump_patch, get_record
|
||||
bump_patch("my-skill")
|
||||
rec = get_record("my-skill")
|
||||
assert rec["patch_count"] == 1
|
||||
assert rec["last_patched_at"] is not None
|
||||
|
||||
|
||||
def test_bump_on_empty_name_is_noop(skills_home):
|
||||
from tools.skill_usage import bump_view, load_usage
|
||||
bump_view("")
|
||||
assert load_usage() == {}
|
||||
|
||||
|
||||
def test_bumps_do_not_corrupt_other_skills(skills_home):
|
||||
from tools.skill_usage import bump_view, bump_use, get_record
|
||||
bump_view("skill-a")
|
||||
bump_use("skill-b")
|
||||
bump_view("skill-a")
|
||||
assert get_record("skill-a")["view_count"] == 2
|
||||
assert get_record("skill-a")["use_count"] == 0
|
||||
assert get_record("skill-b")["use_count"] == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# State transitions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_set_state_active(skills_home):
|
||||
from tools.skill_usage import set_state, get_record, STATE_ACTIVE
|
||||
set_state("x", STATE_ACTIVE)
|
||||
assert get_record("x")["state"] == "active"
|
||||
|
||||
|
||||
def test_set_state_archived_records_timestamp(skills_home):
|
||||
from tools.skill_usage import set_state, get_record, STATE_ARCHIVED
|
||||
set_state("x", STATE_ARCHIVED)
|
||||
rec = get_record("x")
|
||||
assert rec["state"] == "archived"
|
||||
assert rec["archived_at"] is not None
|
||||
|
||||
|
||||
def test_set_state_invalid_is_noop(skills_home):
|
||||
from tools.skill_usage import set_state, get_record
|
||||
set_state("x", "bogus")
|
||||
# No record created for invalid state
|
||||
rec = get_record("x")
|
||||
assert rec["state"] == "active" # default
|
||||
|
||||
|
||||
def test_restoring_from_archive_clears_timestamp(skills_home):
|
||||
from tools.skill_usage import set_state, get_record, STATE_ARCHIVED, STATE_ACTIVE
|
||||
set_state("x", STATE_ARCHIVED)
|
||||
assert get_record("x")["archived_at"] is not None
|
||||
set_state("x", STATE_ACTIVE)
|
||||
assert get_record("x")["archived_at"] is None
|
||||
|
||||
|
||||
def test_set_pinned(skills_home):
|
||||
from tools.skill_usage import set_pinned, get_record
|
||||
set_pinned("x", True)
|
||||
assert get_record("x")["pinned"] is True
|
||||
set_pinned("x", False)
|
||||
assert get_record("x")["pinned"] is False
|
||||
|
||||
|
||||
def test_forget_removes_record(skills_home):
|
||||
from tools.skill_usage import bump_view, forget, load_usage
|
||||
bump_view("x")
|
||||
assert "x" in load_usage()
|
||||
forget("x")
|
||||
assert "x" not in load_usage()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provenance filter — the load-bearing safety check
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_agent_created_excludes_bundled(skills_home):
|
||||
from tools.skill_usage import list_agent_created_skill_names
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "bundled-skill", category="github")
|
||||
_write_skill(skills_dir, "my-skill")
|
||||
# Seed a bundled manifest marking bundled-skill as upstream
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"bundled-skill:abc123\n", encoding="utf-8",
|
||||
)
|
||||
names = list_agent_created_skill_names()
|
||||
assert "my-skill" in names
|
||||
assert "bundled-skill" not in names
|
||||
|
||||
|
||||
def test_agent_created_excludes_hub_installed(skills_home):
|
||||
from tools.skill_usage import list_agent_created_skill_names
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "hub-skill")
|
||||
_write_skill(skills_dir, "my-skill")
|
||||
hub_dir = skills_dir / ".hub"
|
||||
hub_dir.mkdir()
|
||||
(hub_dir / "lock.json").write_text(
|
||||
json.dumps({"version": 1, "installed": {"hub-skill": {"source": "taps/main"}}}),
|
||||
encoding="utf-8",
|
||||
)
|
||||
names = list_agent_created_skill_names()
|
||||
assert "my-skill" in names
|
||||
assert "hub-skill" not in names
|
||||
|
||||
|
||||
def test_is_agent_created(skills_home):
|
||||
from tools.skill_usage import is_agent_created
|
||||
skills_dir = skills_home / "skills"
|
||||
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
|
||||
hub_dir = skills_dir / ".hub"
|
||||
hub_dir.mkdir()
|
||||
(hub_dir / "lock.json").write_text(
|
||||
json.dumps({"installed": {"hubbed": {}}}), encoding="utf-8",
|
||||
)
|
||||
assert is_agent_created("my-skill") is True
|
||||
assert is_agent_created("bundled") is False
|
||||
assert is_agent_created("hubbed") is False
|
||||
|
||||
|
||||
def test_agent_created_skips_archive_and_hub_dirs(skills_home):
|
||||
from tools.skill_usage import list_agent_created_skill_names
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "real-skill")
|
||||
# Dot-prefixed dirs must be ignored even if they contain SKILL.md
|
||||
archive = skills_dir / ".archive" / "old-skill"
|
||||
archive.mkdir(parents=True)
|
||||
(archive / "SKILL.md").write_text(
|
||||
"---\nname: old-skill\n---\n", encoding="utf-8",
|
||||
)
|
||||
names = list_agent_created_skill_names()
|
||||
assert "real-skill" in names
|
||||
assert "old-skill" not in names
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Archive / restore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_archive_skill_moves_directory(skills_home):
|
||||
from tools.skill_usage import archive_skill, get_record, STATE_ARCHIVED
|
||||
skills_dir = skills_home / "skills"
|
||||
skill_dir = _write_skill(skills_dir, "old-skill")
|
||||
assert skill_dir.exists()
|
||||
|
||||
ok, msg = archive_skill("old-skill")
|
||||
assert ok, msg
|
||||
assert not skill_dir.exists()
|
||||
assert (skills_dir / ".archive" / "old-skill" / "SKILL.md").exists()
|
||||
assert get_record("old-skill")["state"] == "archived"
|
||||
assert get_record("old-skill")["archived_at"] is not None
|
||||
|
||||
|
||||
def test_archive_refuses_bundled_skill(skills_home):
|
||||
from tools.skill_usage import archive_skill
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "bundled")
|
||||
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
|
||||
|
||||
ok, msg = archive_skill("bundled")
|
||||
assert not ok
|
||||
assert "bundled" in msg.lower() or "hub" in msg.lower()
|
||||
|
||||
|
||||
def test_archive_refuses_hub_skill(skills_home):
|
||||
from tools.skill_usage import archive_skill
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "hub-skill")
|
||||
hub_dir = skills_dir / ".hub"
|
||||
hub_dir.mkdir()
|
||||
(hub_dir / "lock.json").write_text(
|
||||
json.dumps({"installed": {"hub-skill": {}}}), encoding="utf-8",
|
||||
)
|
||||
|
||||
ok, msg = archive_skill("hub-skill")
|
||||
assert not ok
|
||||
|
||||
|
||||
def test_archive_missing_skill_returns_error(skills_home):
|
||||
from tools.skill_usage import archive_skill
|
||||
ok, msg = archive_skill("nonexistent")
|
||||
assert not ok
|
||||
assert "not found" in msg.lower()
|
||||
|
||||
|
||||
def test_restore_skill_moves_back(skills_home):
|
||||
from tools.skill_usage import archive_skill, restore_skill, get_record
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "temp-skill")
|
||||
archive_skill("temp-skill")
|
||||
assert not (skills_dir / "temp-skill").exists()
|
||||
|
||||
ok, msg = restore_skill("temp-skill")
|
||||
assert ok, msg
|
||||
assert (skills_dir / "temp-skill" / "SKILL.md").exists()
|
||||
assert get_record("temp-skill")["state"] == "active"
|
||||
|
||||
|
||||
def test_archive_collision_gets_suffix(skills_home):
|
||||
from tools.skill_usage import archive_skill
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "dup")
|
||||
archive_skill("dup")
|
||||
_write_skill(skills_dir, "dup") # recreate
|
||||
ok, msg = archive_skill("dup")
|
||||
assert ok
|
||||
# Two entries under .archive/ — second should have a timestamp suffix
|
||||
archived = sorted(p.name for p in (skills_dir / ".archive").iterdir() if p.is_dir())
|
||||
assert "dup" in archived
|
||||
assert any(n.startswith("dup-") and n != "dup" for n in archived)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reporting
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_agent_created_report_includes_defaults(skills_home):
|
||||
from tools.skill_usage import agent_created_report, bump_view
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "a")
|
||||
_write_skill(skills_dir, "b")
|
||||
bump_view("a")
|
||||
rows = agent_created_report()
|
||||
by_name = {r["name"]: r for r in rows}
|
||||
assert "a" in by_name and "b" in by_name
|
||||
assert by_name["a"]["view_count"] == 1
|
||||
# b has no usage record yet — must still appear with defaults
|
||||
assert by_name["b"]["view_count"] == 0
|
||||
assert by_name["b"]["state"] == "active"
|
||||
|
||||
|
||||
def test_agent_created_report_excludes_bundled_and_hub(skills_home):
|
||||
from tools.skill_usage import agent_created_report
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "mine")
|
||||
_write_skill(skills_dir, "bundled")
|
||||
_write_skill(skills_dir, "hubbed")
|
||||
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
|
||||
hub = skills_dir / ".hub"
|
||||
hub.mkdir()
|
||||
(hub / "lock.json").write_text(
|
||||
json.dumps({"installed": {"hubbed": {}}}), encoding="utf-8",
|
||||
)
|
||||
names = {r["name"] for r in agent_created_report()}
|
||||
assert "mine" in names
|
||||
assert "bundled" not in names
|
||||
assert "hubbed" not in names
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provenance guard — telemetry must not leak records for bundled/hub skills
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_bump_view_no_op_for_bundled_skill(skills_home):
|
||||
"""Telemetry bumps on bundled skills are dropped — the sidecar must stay
|
||||
focused on agent-created skills only."""
|
||||
from tools.skill_usage import bump_view, load_usage
|
||||
skills_dir = skills_home / "skills"
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"ship-bundled:abc\n", encoding="utf-8",
|
||||
)
|
||||
|
||||
bump_view("ship-bundled")
|
||||
assert "ship-bundled" not in load_usage(), (
|
||||
"bundled skill leaked into .usage.json"
|
||||
)
|
||||
|
||||
|
||||
def test_bump_patch_no_op_for_hub_skill(skills_home):
|
||||
from tools.skill_usage import bump_patch, load_usage
|
||||
skills_dir = skills_home / "skills"
|
||||
hub = skills_dir / ".hub"
|
||||
hub.mkdir()
|
||||
(hub / "lock.json").write_text(
|
||||
json.dumps({"installed": {"from-hub": {}}}), encoding="utf-8",
|
||||
)
|
||||
|
||||
bump_patch("from-hub")
|
||||
assert "from-hub" not in load_usage()
|
||||
|
||||
|
||||
def test_bump_use_no_op_for_hub_skill(skills_home):
|
||||
from tools.skill_usage import bump_use, load_usage
|
||||
skills_dir = skills_home / "skills"
|
||||
hub = skills_dir / ".hub"
|
||||
hub.mkdir()
|
||||
(hub / "lock.json").write_text(
|
||||
json.dumps({"installed": {"from-hub": {}}}), encoding="utf-8",
|
||||
)
|
||||
|
||||
bump_use("from-hub")
|
||||
assert "from-hub" not in load_usage()
|
||||
|
||||
|
||||
def test_set_state_no_op_for_bundled_skill(skills_home):
|
||||
"""State transitions on bundled skills must not land in the sidecar."""
|
||||
from tools.skill_usage import set_state, load_usage, STATE_ARCHIVED
|
||||
skills_dir = skills_home / "skills"
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"locked:abc\n", encoding="utf-8",
|
||||
)
|
||||
set_state("locked", STATE_ARCHIVED)
|
||||
assert "locked" not in load_usage()
|
||||
|
||||
|
||||
def test_restore_refuses_to_shadow_bundled_skill(skills_home):
|
||||
"""If a bundled skill now occupies the name, refuse to restore."""
|
||||
from tools.skill_usage import archive_skill, restore_skill
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "shared-name")
|
||||
archive_skill("shared-name")
|
||||
|
||||
# Now a bundled skill appears with the same name
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"shared-name:abc\n", encoding="utf-8",
|
||||
)
|
||||
_write_skill(skills_dir, "shared-name") # bundled install landed
|
||||
|
||||
ok, msg = restore_skill("shared-name")
|
||||
assert not ok
|
||||
assert "bundled" in msg.lower() or "shadow" in msg.lower()
|
||||
|
||||
|
||||
def test_end_to_end_no_code_path_mutates_bundled_skill(skills_home):
|
||||
"""The combined guarantee: no curator code path can archive, mark stale,
|
||||
set-state, or persist telemetry for a bundled or hub-installed skill."""
|
||||
from tools.skill_usage import (
|
||||
bump_view, bump_use, bump_patch, set_state, set_pinned,
|
||||
archive_skill, load_usage, STATE_STALE, STATE_ARCHIVED,
|
||||
)
|
||||
skills_dir = skills_home / "skills"
|
||||
_write_skill(skills_dir, "bundled-one")
|
||||
_write_skill(skills_dir, "hub-one")
|
||||
_write_skill(skills_dir, "mine")
|
||||
|
||||
(skills_dir / ".bundled_manifest").write_text(
|
||||
"bundled-one:abc\n", encoding="utf-8",
|
||||
)
|
||||
hub = skills_dir / ".hub"
|
||||
hub.mkdir()
|
||||
(hub / "lock.json").write_text(
|
||||
json.dumps({"installed": {"hub-one": {}}}), encoding="utf-8",
|
||||
)
|
||||
|
||||
# Hammer every mutator at the bundled/hub names
|
||||
for name in ("bundled-one", "hub-one"):
|
||||
bump_view(name)
|
||||
bump_use(name)
|
||||
bump_patch(name)
|
||||
set_state(name, STATE_STALE)
|
||||
set_state(name, STATE_ARCHIVED)
|
||||
set_pinned(name, True)
|
||||
ok, _msg = archive_skill(name)
|
||||
assert not ok, f"archive_skill(\"{name}\") should refuse"
|
||||
|
||||
# Sidecar must be clean of all three
|
||||
data = load_usage()
|
||||
assert "bundled-one" not in data
|
||||
assert "hub-one" not in data
|
||||
|
||||
# Directories must still be in place on disk
|
||||
assert (skills_dir / "bundled-one" / "SKILL.md").exists()
|
||||
assert (skills_dir / "hub-one" / "SKILL.md").exists()
|
||||
|
||||
# The agent-created skill can still be mutated normally
|
||||
bump_view("mine")
|
||||
assert load_usage()["mine"]["view_count"] == 1
|
||||
@@ -698,6 +698,17 @@ def skill_manage(
|
||||
clear_skills_system_prompt_cache(clear_snapshot=True)
|
||||
except Exception:
|
||||
pass
|
||||
# Curator telemetry: bump patch_count on edit/patch/write_file (the actions
|
||||
# that mutate an existing skill's guidance), drop the record on delete.
|
||||
# Best-effort; telemetry failures never break the tool.
|
||||
try:
|
||||
from tools.skill_usage import bump_patch, forget
|
||||
if action in ("patch", "edit", "write_file", "remove_file"):
|
||||
bump_patch(name)
|
||||
elif action == "delete":
|
||||
forget(name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return json.dumps(result, ensure_ascii=False)
|
||||
|
||||
|
||||
456
tools/skill_usage.py
Normal file
456
tools/skill_usage.py
Normal file
@@ -0,0 +1,456 @@
|
||||
"""Skill usage telemetry + provenance tracking for the Curator feature.
|
||||
|
||||
Tracks per-skill usage metadata in a sidecar JSON file (~/.hermes/skills/.usage.json)
|
||||
keyed by skill name. Counters are bumped by the existing skill tools (skill_view,
|
||||
skill_manage); the curator orchestrator reads them to decide lifecycle transitions.
|
||||
|
||||
Design notes:
|
||||
- Sidecar, not frontmatter. Keeps operational telemetry out of user-authored
|
||||
SKILL.md content and avoids conflict pressure for bundled/hub skills.
|
||||
- Atomic writes via tempfile + os.replace (same pattern as .bundled_manifest).
|
||||
- All counter bumps are best-effort: failures log at DEBUG and return silently.
|
||||
A broken sidecar never breaks the underlying tool call.
|
||||
- Provenance filter: "agent-created" == not in .bundled_manifest AND not in
|
||||
.hub/lock.json. The curator only ever mutates agent-created skills.
|
||||
|
||||
Lifecycle states:
|
||||
active -> default
|
||||
stale -> unused > stale_after_days (config)
|
||||
archived -> unused > archive_after_days (config); moved to .archive/
|
||||
pinned -> opt-out from auto transitions (boolean flag, orthogonal to state)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
STATE_ACTIVE = "active"
|
||||
STATE_STALE = "stale"
|
||||
STATE_ARCHIVED = "archived"
|
||||
_VALID_STATES = {STATE_ACTIVE, STATE_STALE, STATE_ARCHIVED}
|
||||
|
||||
|
||||
def _skills_dir() -> Path:
|
||||
return get_hermes_home() / "skills"
|
||||
|
||||
|
||||
def _usage_file() -> Path:
|
||||
return _skills_dir() / ".usage.json"
|
||||
|
||||
|
||||
def _archive_dir() -> Path:
|
||||
return _skills_dir() / ".archive"
|
||||
|
||||
|
||||
def _now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Provenance — which skills are agent-created (and thus eligible for curation)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _read_bundled_manifest_names() -> Set[str]:
|
||||
"""Return the set of skill names that were seeded from the bundled repo.
|
||||
|
||||
Reads ~/.hermes/skills/.bundled_manifest (format: "name:hash" per line).
|
||||
Returns empty set if the file is missing or unreadable.
|
||||
"""
|
||||
manifest = _skills_dir() / ".bundled_manifest"
|
||||
if not manifest.exists():
|
||||
return set()
|
||||
names: Set[str] = set()
|
||||
try:
|
||||
for line in manifest.read_text(encoding="utf-8").splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
name = line.split(":", 1)[0].strip()
|
||||
if name:
|
||||
names.add(name)
|
||||
except OSError as e:
|
||||
logger.debug("Failed to read bundled manifest: %s", e)
|
||||
return names
|
||||
|
||||
|
||||
def _read_hub_installed_names() -> Set[str]:
|
||||
"""Return the set of skill names installed via the Skills Hub.
|
||||
|
||||
Reads ~/.hermes/skills/.hub/lock.json (see tools/skills_hub.py :: HubLockFile).
|
||||
"""
|
||||
lock_path = _skills_dir() / ".hub" / "lock.json"
|
||||
if not lock_path.exists():
|
||||
return set()
|
||||
try:
|
||||
data = json.loads(lock_path.read_text(encoding="utf-8"))
|
||||
if isinstance(data, dict):
|
||||
installed = data.get("installed") or {}
|
||||
if isinstance(installed, dict):
|
||||
return {str(k) for k in installed.keys()}
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
logger.debug("Failed to read hub lock file: %s", e)
|
||||
return set()
|
||||
|
||||
|
||||
def list_agent_created_skill_names() -> List[str]:
|
||||
"""Enumerate skills that were authored by the agent (or user), NOT by a
|
||||
bundled or hub-installed source.
|
||||
|
||||
The curator operates exclusively on this set. Bundled / hub skills are
|
||||
maintained by their upstream sources and must never be pruned here.
|
||||
"""
|
||||
base = _skills_dir()
|
||||
if not base.exists():
|
||||
return []
|
||||
bundled = _read_bundled_manifest_names()
|
||||
hub = _read_hub_installed_names()
|
||||
off_limits = bundled | hub
|
||||
|
||||
names: List[str] = []
|
||||
# Top-level SKILL.md files (flat layout) AND nested category/skill/SKILL.md
|
||||
for skill_md in base.rglob("SKILL.md"):
|
||||
# Skip anything under .archive or .hub
|
||||
try:
|
||||
rel = skill_md.relative_to(base)
|
||||
except ValueError:
|
||||
continue
|
||||
parts = rel.parts
|
||||
if parts and (parts[0].startswith(".") or parts[0] == "node_modules"):
|
||||
continue
|
||||
name = _read_skill_name(skill_md, fallback=skill_md.parent.name)
|
||||
if name in off_limits:
|
||||
continue
|
||||
names.append(name)
|
||||
return sorted(set(names))
|
||||
|
||||
|
||||
def _read_skill_name(skill_md: Path, fallback: str) -> str:
|
||||
"""Parse the `name:` field from a SKILL.md YAML frontmatter."""
|
||||
try:
|
||||
text = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
|
||||
except OSError:
|
||||
return fallback
|
||||
in_frontmatter = False
|
||||
for line in text.split("\n"):
|
||||
stripped = line.strip()
|
||||
if stripped == "---":
|
||||
if in_frontmatter:
|
||||
break
|
||||
in_frontmatter = True
|
||||
continue
|
||||
if in_frontmatter and stripped.startswith("name:"):
|
||||
value = stripped.split(":", 1)[1].strip().strip("\"'")
|
||||
if value:
|
||||
return value
|
||||
return fallback
|
||||
|
||||
|
||||
def is_agent_created(skill_name: str) -> bool:
|
||||
"""Whether *skill_name* is neither bundled nor hub-installed."""
|
||||
off_limits = _read_bundled_manifest_names() | _read_hub_installed_names()
|
||||
return skill_name not in off_limits
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sidecar I/O
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _empty_record() -> Dict[str, Any]:
|
||||
return {
|
||||
"use_count": 0,
|
||||
"view_count": 0,
|
||||
"last_used_at": None,
|
||||
"last_viewed_at": None,
|
||||
"patch_count": 0,
|
||||
"last_patched_at": None,
|
||||
"created_at": _now_iso(),
|
||||
"state": STATE_ACTIVE,
|
||||
"pinned": False,
|
||||
"archived_at": None,
|
||||
}
|
||||
|
||||
|
||||
def load_usage() -> Dict[str, Dict[str, Any]]:
|
||||
"""Read the entire .usage.json map. Returns empty dict on missing/corrupt."""
|
||||
path = _usage_file()
|
||||
if not path.exists():
|
||||
return {}
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError) as e:
|
||||
logger.debug("Failed to read %s: %s", path, e)
|
||||
return {}
|
||||
if not isinstance(data, dict):
|
||||
return {}
|
||||
# Defensive: coerce any non-dict values to a fresh empty record
|
||||
clean: Dict[str, Dict[str, Any]] = {}
|
||||
for k, v in data.items():
|
||||
if isinstance(v, dict):
|
||||
clean[str(k)] = v
|
||||
return clean
|
||||
|
||||
|
||||
def save_usage(data: Dict[str, Dict[str, Any]]) -> None:
|
||||
"""Write the usage map atomically. Best-effort — errors are logged, not raised."""
|
||||
path = _usage_file()
|
||||
try:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
fd, tmp_path = tempfile.mkstemp(
|
||||
dir=str(path.parent), prefix=".usage_", suffix=".tmp"
|
||||
)
|
||||
try:
|
||||
with os.fdopen(fd, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp_path, path)
|
||||
except BaseException:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.debug("Failed to write %s: %s", path, e, exc_info=True)
|
||||
|
||||
|
||||
def get_record(skill_name: str) -> Dict[str, Any]:
|
||||
"""Return the record for *skill_name*, creating a fresh one if missing."""
|
||||
data = load_usage()
|
||||
rec = data.get(skill_name)
|
||||
if not isinstance(rec, dict):
|
||||
return _empty_record()
|
||||
# Backfill any missing keys so callers don't need to handle old files
|
||||
base = _empty_record()
|
||||
for k, v in base.items():
|
||||
rec.setdefault(k, v)
|
||||
return rec
|
||||
|
||||
|
||||
def _mutate(skill_name: str, mutator) -> None:
|
||||
"""Load, apply *mutator(record)* in place, save. Best-effort.
|
||||
|
||||
Bundled and hub-installed skills are NEVER recorded in the sidecar.
|
||||
This keeps .usage.json focused on agent-created skills (the only ones
|
||||
the curator considers) and prevents stale counters from hanging around
|
||||
for upstream-managed skills.
|
||||
"""
|
||||
if not skill_name:
|
||||
return
|
||||
try:
|
||||
if not is_agent_created(skill_name):
|
||||
return
|
||||
data = load_usage()
|
||||
rec = data.get(skill_name)
|
||||
if not isinstance(rec, dict):
|
||||
rec = _empty_record()
|
||||
mutator(rec)
|
||||
data[skill_name] = rec
|
||||
save_usage(data)
|
||||
except Exception as e:
|
||||
logger.debug("skill_usage._mutate(%s) failed: %s", skill_name, e, exc_info=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public counter-bump helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def bump_view(skill_name: str) -> None:
|
||||
"""Bump view_count and last_viewed_at. Called from skill_view()."""
|
||||
def _apply(rec: Dict[str, Any]) -> None:
|
||||
rec["view_count"] = int(rec.get("view_count") or 0) + 1
|
||||
rec["last_viewed_at"] = _now_iso()
|
||||
_mutate(skill_name, _apply)
|
||||
|
||||
|
||||
def bump_use(skill_name: str) -> None:
|
||||
"""Bump use_count and last_used_at. Called when a skill is actively used
|
||||
(e.g. loaded into the prompt path or referenced from an assistant turn)."""
|
||||
def _apply(rec: Dict[str, Any]) -> None:
|
||||
rec["use_count"] = int(rec.get("use_count") or 0) + 1
|
||||
rec["last_used_at"] = _now_iso()
|
||||
_mutate(skill_name, _apply)
|
||||
|
||||
|
||||
def bump_patch(skill_name: str) -> None:
|
||||
"""Bump patch_count and last_patched_at. Called from skill_manage (patch/edit)."""
|
||||
def _apply(rec: Dict[str, Any]) -> None:
|
||||
rec["patch_count"] = int(rec.get("patch_count") or 0) + 1
|
||||
rec["last_patched_at"] = _now_iso()
|
||||
_mutate(skill_name, _apply)
|
||||
|
||||
|
||||
def set_state(skill_name: str, state: str) -> None:
|
||||
"""Set lifecycle state. No-op if *state* is invalid."""
|
||||
if state not in _VALID_STATES:
|
||||
logger.debug("set_state: invalid state %r for %s", state, skill_name)
|
||||
return
|
||||
def _apply(rec: Dict[str, Any]) -> None:
|
||||
rec["state"] = state
|
||||
if state == STATE_ARCHIVED:
|
||||
rec["archived_at"] = _now_iso()
|
||||
elif state == STATE_ACTIVE:
|
||||
rec["archived_at"] = None
|
||||
_mutate(skill_name, _apply)
|
||||
|
||||
|
||||
def set_pinned(skill_name: str, pinned: bool) -> None:
|
||||
def _apply(rec: Dict[str, Any]) -> None:
|
||||
rec["pinned"] = bool(pinned)
|
||||
_mutate(skill_name, _apply)
|
||||
|
||||
|
||||
def forget(skill_name: str) -> None:
|
||||
"""Drop a skill's usage entry entirely. Called when the skill is deleted."""
|
||||
if not skill_name:
|
||||
return
|
||||
try:
|
||||
data = load_usage()
|
||||
if skill_name in data:
|
||||
del data[skill_name]
|
||||
save_usage(data)
|
||||
except Exception as e:
|
||||
logger.debug("skill_usage.forget(%s) failed: %s", skill_name, e, exc_info=True)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Archive / restore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def archive_skill(skill_name: str) -> Tuple[bool, str]:
|
||||
"""Move an agent-created skill directory to ~/.hermes/skills/.archive/.
|
||||
|
||||
Returns (ok, message). Never archives bundled or hub skills — callers are
|
||||
responsible for checking provenance, but we double-check here as a safety net.
|
||||
"""
|
||||
if not is_agent_created(skill_name):
|
||||
return False, f"skill '{skill_name}' is bundled or hub-installed; never archive"
|
||||
|
||||
skill_dir = _find_skill_dir(skill_name)
|
||||
if skill_dir is None:
|
||||
return False, f"skill '{skill_name}' not found"
|
||||
|
||||
archive_root = _archive_dir()
|
||||
try:
|
||||
archive_root.mkdir(parents=True, exist_ok=True)
|
||||
except OSError as e:
|
||||
return False, f"failed to create archive dir: {e}"
|
||||
|
||||
# Flatten any category nesting into a single ".archive/<skill>/" so restores
|
||||
# are simple. If a collision exists, append a timestamp.
|
||||
dest = archive_root / skill_dir.name
|
||||
if dest.exists():
|
||||
dest = archive_root / f"{skill_dir.name}-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
|
||||
|
||||
try:
|
||||
skill_dir.rename(dest)
|
||||
except OSError as e:
|
||||
# Cross-device — fall back to shutil.move
|
||||
import shutil
|
||||
try:
|
||||
shutil.move(str(skill_dir), str(dest))
|
||||
except Exception as e2:
|
||||
return False, f"failed to archive: {e2}"
|
||||
|
||||
set_state(skill_name, STATE_ARCHIVED)
|
||||
return True, f"archived to {dest}"
|
||||
|
||||
|
||||
def restore_skill(skill_name: str) -> Tuple[bool, str]:
|
||||
"""Move an archived skill back to ~/.hermes/skills/. Restores to the flat
|
||||
top-level layout; original category nesting is NOT reconstructed.
|
||||
|
||||
Refuses to restore under a name that now collides with a bundled or
|
||||
hub-installed skill — that would shadow the upstream version.
|
||||
"""
|
||||
# If a bundled or hub skill has since been installed under the same
|
||||
# name, refuse to restore rather than shadow it.
|
||||
if not is_agent_created(skill_name):
|
||||
return False, (
|
||||
f"skill '{skill_name}' is now bundled or hub-installed; "
|
||||
"restore would shadow the upstream version"
|
||||
)
|
||||
archive_root = _archive_dir()
|
||||
if not archive_root.exists():
|
||||
return False, "no archive directory"
|
||||
|
||||
# Try exact name match first, then any prefix match (for timestamped dupes)
|
||||
candidates = [p for p in archive_root.iterdir() if p.is_dir() and p.name == skill_name]
|
||||
if not candidates:
|
||||
candidates = sorted(
|
||||
[p for p in archive_root.iterdir()
|
||||
if p.is_dir() and p.name.startswith(f"{skill_name}-")],
|
||||
reverse=True,
|
||||
)
|
||||
if not candidates:
|
||||
return False, f"skill '{skill_name}' not found in archive"
|
||||
|
||||
src = candidates[0]
|
||||
dest = _skills_dir() / skill_name
|
||||
if dest.exists():
|
||||
return False, f"destination already exists: {dest}"
|
||||
|
||||
try:
|
||||
src.rename(dest)
|
||||
except OSError:
|
||||
import shutil
|
||||
try:
|
||||
shutil.move(str(src), str(dest))
|
||||
except Exception as e:
|
||||
return False, f"failed to restore: {e}"
|
||||
|
||||
set_state(skill_name, STATE_ACTIVE)
|
||||
return True, f"restored to {dest}"
|
||||
|
||||
|
||||
def _find_skill_dir(skill_name: str) -> Optional[Path]:
|
||||
"""Locate the directory for a skill by its frontmatter `name:` field.
|
||||
|
||||
Handles both flat (~/.hermes/skills/<skill>/SKILL.md) and category-nested
|
||||
(~/.hermes/skills/<category>/<skill>/SKILL.md) layouts.
|
||||
"""
|
||||
base = _skills_dir()
|
||||
if not base.exists():
|
||||
return None
|
||||
for skill_md in base.rglob("SKILL.md"):
|
||||
try:
|
||||
rel = skill_md.relative_to(base)
|
||||
except ValueError:
|
||||
continue
|
||||
if rel.parts and rel.parts[0].startswith("."):
|
||||
continue
|
||||
if _read_skill_name(skill_md, fallback=skill_md.parent.name) == skill_name:
|
||||
return skill_md.parent
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Reporting — for the curator CLI / slash command
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def agent_created_report() -> List[Dict[str, Any]]:
|
||||
"""Return a list of {name, state, pinned, last_used_at, use_count, ...}
|
||||
records for every agent-created skill. Missing usage records are backfilled
|
||||
with defaults so callers can always index fields."""
|
||||
data = load_usage()
|
||||
rows: List[Dict[str, Any]] = []
|
||||
for name in list_agent_created_skill_names():
|
||||
rec = data.get(name)
|
||||
if not isinstance(rec, dict):
|
||||
rec = _empty_record()
|
||||
base = _empty_record()
|
||||
for k, v in base.items():
|
||||
rec.setdefault(k, v)
|
||||
rows.append({"name": name, **rec})
|
||||
return rows
|
||||
@@ -1480,13 +1480,32 @@ registry.register(
|
||||
check_fn=check_skills_requirements,
|
||||
emoji="📚",
|
||||
)
|
||||
def _skill_view_with_bump(args, **kw):
|
||||
"""Invoke skill_view, then bump view_count on success. Best-effort: a
|
||||
telemetry failure never breaks the tool call."""
|
||||
name = args.get("name", "")
|
||||
result = skill_view(
|
||||
name, file_path=args.get("file_path"), task_id=kw.get("task_id")
|
||||
)
|
||||
try:
|
||||
parsed = json.loads(result)
|
||||
if isinstance(parsed, dict) and parsed.get("success"):
|
||||
# Use the resolved skill name from the payload when present —
|
||||
# qualified forms ("plugin:skill") return with the canonical name.
|
||||
resolved = parsed.get("name") or name
|
||||
if resolved:
|
||||
from tools.skill_usage import bump_view
|
||||
bump_view(str(resolved))
|
||||
except Exception:
|
||||
pass
|
||||
return result
|
||||
|
||||
|
||||
registry.register(
|
||||
name="skill_view",
|
||||
toolset="skills",
|
||||
schema=SKILL_VIEW_SCHEMA,
|
||||
handler=lambda args, **kw: skill_view(
|
||||
args.get("name", ""), file_path=args.get("file_path"), task_id=kw.get("task_id")
|
||||
),
|
||||
handler=_skill_view_with_bump,
|
||||
check_fn=check_skills_requirements,
|
||||
emoji="📚",
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user