Files
hermes-agent/agent/curator.py
Teknium 0be51452fa fix(curator): default cycle is every 7 days, not 24 hours
Weekly is closer to how skill churn actually works — most agent-created
skills don't change multiple times per day, so a daily review is pure
cost without benefit. Bumping the default to 7 days reduces aux-model
spend while still catching drift and staleness on the timescales that
matter (30d stale, 90d archive).

Changes:
- DEFAULT_INTERVAL_HOURS: 24 -> 168 (7 days)
- config.yaml default: interval_hours: 24 -> 24 * 7
- CLI status line renders as '7d' when interval is a whole-day multiple
- Test `test_old_run_eligible` decoupled from the exact default: it now
  uses 2 * get_interval_hours() so future tweaks don't break it
2026-04-26 06:32:18 -07:00

457 lines
16 KiB
Python

"""Curator — background skill maintenance orchestrator.
The curator is an auxiliary-model task that periodically reviews agent-created
skills and maintains the collection. It runs inactivity-triggered (no cron
daemon): when the agent is idle and the last curator run was longer than
``interval_hours`` ago, ``maybe_run_curator()`` spawns a forked AIAgent to do
the review.
Responsibilities:
- Auto-transition lifecycle states based on last_used_at timestamps
- Spawn a background review agent that can pin / archive / consolidate /
patch agent-created skills via skill_manage
- Persist curator state (last_run_at, paused, etc.) in .curator_state
Strict invariants:
- Only touches agent-created skills (see tools/skill_usage.is_agent_created)
- Never auto-deletes — only archives. Archive is recoverable.
- Pinned skills bypass all auto-transitions
- Uses the auxiliary client; never touches the main session's prompt cache
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
import threading
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Optional
from hermes_constants import get_hermes_home
from tools import skill_usage
logger = logging.getLogger(__name__)
DEFAULT_INTERVAL_HOURS = 24 * 7 # 7 days
DEFAULT_MIN_IDLE_HOURS = 2
DEFAULT_STALE_AFTER_DAYS = 30
DEFAULT_ARCHIVE_AFTER_DAYS = 90
# ---------------------------------------------------------------------------
# .curator_state — persistent scheduler + status
# ---------------------------------------------------------------------------
def _state_file() -> Path:
return get_hermes_home() / "skills" / ".curator_state"
def _default_state() -> Dict[str, Any]:
return {
"last_run_at": None,
"last_run_duration_seconds": None,
"last_run_summary": None,
"paused": False,
"run_count": 0,
}
def load_state() -> Dict[str, Any]:
path = _state_file()
if not path.exists():
return _default_state()
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
base = _default_state()
base.update({k: v for k, v in data.items() if k in base or k.startswith("_")})
return base
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read curator state: %s", e)
return _default_state()
def save_state(data: Dict[str, Any]) -> None:
path = _state_file()
try:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=str(path.parent), prefix=".curator_state_", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
except BaseException:
try:
os.unlink(tmp)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to save curator state: %s", e, exc_info=True)
def set_paused(paused: bool) -> None:
state = load_state()
state["paused"] = bool(paused)
save_state(state)
def is_paused() -> bool:
return bool(load_state().get("paused"))
# ---------------------------------------------------------------------------
# Config access
# ---------------------------------------------------------------------------
def _load_config() -> Dict[str, Any]:
"""Read curator.* config from ~/.hermes/config.yaml. Tolerates missing file."""
try:
from hermes_cli.config import load_config
cfg = load_config()
except Exception as e:
logger.debug("Failed to load config for curator: %s", e)
return {}
if not isinstance(cfg, dict):
return {}
cur = cfg.get("curator") or {}
if not isinstance(cur, dict):
return {}
return cur
def is_enabled() -> bool:
"""Default ON when no config says otherwise."""
cfg = _load_config()
return bool(cfg.get("enabled", True))
def get_interval_hours() -> int:
cfg = _load_config()
try:
return int(cfg.get("interval_hours", DEFAULT_INTERVAL_HOURS))
except (TypeError, ValueError):
return DEFAULT_INTERVAL_HOURS
def get_min_idle_hours() -> float:
cfg = _load_config()
try:
return float(cfg.get("min_idle_hours", DEFAULT_MIN_IDLE_HOURS))
except (TypeError, ValueError):
return DEFAULT_MIN_IDLE_HOURS
def get_stale_after_days() -> int:
cfg = _load_config()
try:
return int(cfg.get("stale_after_days", DEFAULT_STALE_AFTER_DAYS))
except (TypeError, ValueError):
return DEFAULT_STALE_AFTER_DAYS
def get_archive_after_days() -> int:
cfg = _load_config()
try:
return int(cfg.get("archive_after_days", DEFAULT_ARCHIVE_AFTER_DAYS))
except (TypeError, ValueError):
return DEFAULT_ARCHIVE_AFTER_DAYS
# ---------------------------------------------------------------------------
# Idle / interval check
# ---------------------------------------------------------------------------
def _parse_iso(ts: Optional[str]) -> Optional[datetime]:
if not ts:
return None
try:
return datetime.fromisoformat(ts)
except (TypeError, ValueError):
return None
def should_run_now(now: Optional[datetime] = None) -> bool:
"""Return True if the curator should run immediately.
Gates:
- curator.enabled == True
- not paused
- last_run_at missing, OR older than interval_hours
The idle check (min_idle_hours) is applied at the call site where we know
whether an agent is actively running — here we only enforce the static
gates.
"""
if not is_enabled():
return False
if is_paused():
return False
state = load_state()
last = _parse_iso(state.get("last_run_at"))
if last is None:
return True
if now is None:
now = datetime.now(timezone.utc)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
interval = timedelta(hours=get_interval_hours())
return (now - last) >= interval
# ---------------------------------------------------------------------------
# Automatic state transitions (pure function, no LLM)
# ---------------------------------------------------------------------------
def apply_automatic_transitions(now: Optional[datetime] = None) -> Dict[str, int]:
"""Walk every agent-created skill and move active/stale/archived based on
last_used_at. Pinned skills are never touched. Returns a counter dict
describing what changed."""
from tools import skill_usage as _u
if now is None:
now = datetime.now(timezone.utc)
stale_cutoff = now - timedelta(days=get_stale_after_days())
archive_cutoff = now - timedelta(days=get_archive_after_days())
counts = {"marked_stale": 0, "archived": 0, "reactivated": 0, "checked": 0}
for row in _u.agent_created_report():
counts["checked"] += 1
name = row["name"]
if row.get("pinned"):
continue
last_used = _parse_iso(row.get("last_used_at"))
# If never used, treat as using created_at as the anchor so new skills
# don't immediately archive themselves.
anchor = last_used or _parse_iso(row.get("created_at")) or now
if anchor.tzinfo is None:
anchor = anchor.replace(tzinfo=timezone.utc)
current = row.get("state", _u.STATE_ACTIVE)
if anchor <= archive_cutoff and current != _u.STATE_ARCHIVED:
ok, _msg = _u.archive_skill(name)
if ok:
counts["archived"] += 1
elif anchor <= stale_cutoff and current == _u.STATE_ACTIVE:
_u.set_state(name, _u.STATE_STALE)
counts["marked_stale"] += 1
elif anchor > stale_cutoff and current == _u.STATE_STALE:
# Skill got used again after being marked stale — reactivate.
_u.set_state(name, _u.STATE_ACTIVE)
counts["reactivated"] += 1
return counts
# ---------------------------------------------------------------------------
# Review prompt for the forked agent
# ---------------------------------------------------------------------------
CURATOR_REVIEW_PROMPT = (
"You are running as Hermes' background skill CURATOR.\n\n"
"Your job is to maintain the collection of AGENT-CREATED skills. Review "
"each candidate below and decide what to do.\n\n"
"Rules — all load-bearing, do not violate:\n"
"1. You MUST NOT touch bundled or hub-installed skills. The candidate list "
"below is already filtered to agent-created skills only.\n"
"2. You MUST NOT delete any skill. Archiving (moving the skill's directory "
"into ~/.hermes/skills/.archive/) is the maximum action. Archives are "
"recoverable; deletion is not.\n"
"3. You MUST NOT touch skills shown as pinned=yes. Skip them.\n"
"4. Prefer GENERALIZING overlapping skills by patching the stronger one "
"and archiving the weaker, rather than leaving two narrow skills in the "
"collection.\n\n"
"Your toolset:\n"
" - skills_list, skill_view — read the current landscape\n"
" - skill_manage action=patch — fix stale commands, wrong paths, or "
"merge two overlapping skills by broadening the stronger one\n"
" - terminal — move a skill directory into the archive, "
"e.g. mv ~/.hermes/skills/<skill-dir> ~/.hermes/skills/.archive/\n\n"
"For each candidate, decide one of:\n"
" keep — leave as-is (most common default; don't over-curate)\n"
" patch — skill_manage action=patch to fix stale commands, wrong "
"paths, or env-specific claims that are no longer true\n"
" consolidate — two skills overlap: patch the stronger one to absorb "
"the weaker (skill_manage), then mv the weaker directory to .archive/\n"
" archive — the skill is genuinely obsolete and has not been used "
"recently: mv its directory to ~/.hermes/skills/.archive/\n\n"
"Start by calling skills_list and skill_view on anything you consider "
"patching or consolidating. Be conservative — if in doubt, keep. "
"When you are done, write a one-sentence summary of what you changed."
)
# ---------------------------------------------------------------------------
# Orchestrator — spawn a forked AIAgent for the LLM review pass
# ---------------------------------------------------------------------------
def _render_candidate_list() -> str:
"""Human/agent-readable list of agent-created skills with usage stats."""
rows = skill_usage.agent_created_report()
if not rows:
return "No agent-created skills to review."
lines = [f"Agent-created skills ({len(rows)}):\n"]
for r in rows:
lines.append(
f"- {r['name']} "
f"state={r['state']} "
f"pinned={'yes' if r.get('pinned') else 'no'} "
f"use={r.get('use_count', 0)} "
f"view={r.get('view_count', 0)} "
f"patches={r.get('patch_count', 0)} "
f"last_used={r.get('last_used_at') or 'never'}"
)
return "\n".join(lines)
def run_curator_review(
on_summary: Optional[Callable[[str], None]] = None,
synchronous: bool = False,
) -> Dict[str, Any]:
"""Execute a single curator review pass.
Steps:
1. Apply automatic state transitions (pure, no LLM).
2. If there are agent-created skills, spawn a forked AIAgent that runs
the LLM review prompt against the current candidate list.
3. Update .curator_state with last_run_at and a one-line summary.
4. Invoke *on_summary* with a user-visible description.
If *synchronous* is True, the LLM review runs in the calling thread; the
default is to spawn a daemon thread so the caller returns immediately.
"""
start = datetime.now(timezone.utc)
counts = apply_automatic_transitions(now=start)
auto_summary_parts = []
if counts["marked_stale"]:
auto_summary_parts.append(f"{counts['marked_stale']} marked stale")
if counts["archived"]:
auto_summary_parts.append(f"{counts['archived']} archived")
if counts["reactivated"]:
auto_summary_parts.append(f"{counts['reactivated']} reactivated")
auto_summary = ", ".join(auto_summary_parts) if auto_summary_parts else "no changes"
# Persist state before the LLM pass so a crash mid-review still records
# the run and doesn't immediately re-trigger.
state = load_state()
state["last_run_at"] = start.isoformat()
state["run_count"] = int(state.get("run_count", 0)) + 1
state["last_run_summary"] = f"auto: {auto_summary}"
save_state(state)
def _llm_pass():
nonlocal auto_summary
try:
candidate_list = _render_candidate_list()
if "No agent-created skills" in candidate_list:
final_summary = f"auto: {auto_summary}; llm: skipped (no candidates)"
else:
prompt = f"{CURATOR_REVIEW_PROMPT}\n\n{candidate_list}"
llm_summary = _run_llm_review(prompt)
final_summary = f"auto: {auto_summary}; llm: {llm_summary}"
except Exception as e:
logger.debug("Curator LLM pass failed: %s", e, exc_info=True)
final_summary = f"auto: {auto_summary}; llm: error ({e})"
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
state2 = load_state()
state2["last_run_duration_seconds"] = elapsed
state2["last_run_summary"] = final_summary
save_state(state2)
if on_summary:
try:
on_summary(f"curator: {final_summary}")
except Exception:
pass
if synchronous:
_llm_pass()
else:
t = threading.Thread(target=_llm_pass, daemon=True, name="curator-review")
t.start()
return {
"started_at": start.isoformat(),
"auto_transitions": counts,
"summary_so_far": auto_summary,
}
def _run_llm_review(prompt: str) -> str:
"""Spawn an AIAgent fork to run the curator review prompt. Returns a short
summary of what the model said in its final response."""
import contextlib
try:
from run_agent import AIAgent
except Exception as e:
return f"AIAgent import failed: {e}"
review_agent = None
try:
with open(os.devnull, "w") as _devnull, \
contextlib.redirect_stdout(_devnull), \
contextlib.redirect_stderr(_devnull):
review_agent = AIAgent(
max_iterations=8,
quiet_mode=True,
platform="curator",
skip_context_files=True,
skip_memory=True,
)
# Disable recursive nudges — the curator must never spawn its own review.
review_agent._memory_nudge_interval = 0
review_agent._skill_nudge_interval = 0
result = review_agent.run_conversation(user_message=prompt)
final = ""
if isinstance(result, dict):
final = str(result.get("final_response") or "").strip()
return (final[:240] + "") if len(final) > 240 else (final or "no change")
except Exception as e:
return f"error: {e}"
finally:
if review_agent is not None:
try:
review_agent.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Public entrypoint for the session-start hook
# ---------------------------------------------------------------------------
def maybe_run_curator(
*,
idle_for_seconds: Optional[float] = None,
on_summary: Optional[Callable[[str], None]] = None,
) -> Optional[Dict[str, Any]]:
"""Best-effort: run a curator pass if all gates pass. Returns the result
dict if a pass was started, else None. Never raises."""
try:
if not should_run_now():
return None
# Idle gating: only enforce when the caller provided a measurement.
if idle_for_seconds is not None:
min_idle_s = get_min_idle_hours() * 3600.0
if idle_for_seconds < min_idle_s:
return None
return run_curator_review(on_summary=on_summary)
except Exception as e:
logger.debug("maybe_run_curator failed: %s", e, exc_info=True)
return None