perf(ttft): cache skills prompt with shared skill_utils module (salvage #3366) (#3421)

Two-layer caching for build_skills_system_prompt():
  1. In-process LRU (OrderedDict, max 8) — same-process: 546ms → <1ms
  2. Disk snapshot (.skills_prompt_snapshot.json) — cold start: 297ms → 103ms

Key improvements over original PR #3366:
- Extract shared logic into agent/skill_utils.py (parse_frontmatter,
  skill_matches_platform, get_disabled_skill_names, extract_skill_conditions,
  extract_skill_description, iter_skill_index_files)
- tools/skills_tool.py delegates to shared module — zero code duplication
- Proper LRU eviction via OrderedDict.move_to_end + popitem(last=False)
- Cache invalidation on all skill mutation paths:
  - skill_manage tool (in-conversation writes)
  - hermes skills install (CLI hub)
  - hermes skills uninstall (CLI hub)
  - Automatic via mtime/size manifest on cold start

prompt_builder.py no longer imports tools.skills_tool (avoids pulling
in the entire tool registry chain at prompt build time).

6301 tests pass, 0 failures.

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
This commit is contained in:
Teknium
2026-03-27 10:54:02 -07:00
committed by GitHub
parent cc4514076b
commit 5127567d5d
8 changed files with 527 additions and 196 deletions

View File

@@ -4,14 +4,27 @@ All functions are stateless. AIAgent._build_system_prompt() calls these to
assemble pieces, then combines them with memory and ephemeral prompts. assemble pieces, then combines them with memory and ephemeral prompts.
""" """
import json
import logging import logging
import os import os
import re import re
import threading
from collections import OrderedDict
from pathlib import Path from pathlib import Path
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
from typing import Optional from typing import Optional
from agent.skill_utils import (
extract_skill_conditions,
extract_skill_description,
get_disabled_skill_names,
iter_skill_index_files,
parse_frontmatter,
skill_matches_platform,
)
from utils import atomic_json_write
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -230,6 +243,111 @@ CONTEXT_TRUNCATE_HEAD_RATIO = 0.7
CONTEXT_TRUNCATE_TAIL_RATIO = 0.2 CONTEXT_TRUNCATE_TAIL_RATIO = 0.2
# =========================================================================
# Skills prompt cache
# =========================================================================
_SKILLS_PROMPT_CACHE_MAX = 8
_SKILLS_PROMPT_CACHE: OrderedDict[tuple, str] = OrderedDict()
_SKILLS_PROMPT_CACHE_LOCK = threading.Lock()
_SKILLS_SNAPSHOT_VERSION = 1
def _skills_prompt_snapshot_path() -> Path:
return get_hermes_home() / ".skills_prompt_snapshot.json"
def clear_skills_system_prompt_cache(*, clear_snapshot: bool = False) -> None:
"""Drop the in-process skills prompt cache (and optionally the disk snapshot)."""
with _SKILLS_PROMPT_CACHE_LOCK:
_SKILLS_PROMPT_CACHE.clear()
if clear_snapshot:
try:
_skills_prompt_snapshot_path().unlink(missing_ok=True)
except OSError as e:
logger.debug("Could not remove skills prompt snapshot: %s", e)
def _build_skills_manifest(skills_dir: Path) -> dict[str, list[int]]:
"""Build an mtime/size manifest of all SKILL.md and DESCRIPTION.md files."""
manifest: dict[str, list[int]] = {}
for filename in ("SKILL.md", "DESCRIPTION.md"):
for path in iter_skill_index_files(skills_dir, filename):
try:
st = path.stat()
except OSError:
continue
manifest[str(path.relative_to(skills_dir))] = [st.st_mtime_ns, st.st_size]
return manifest
def _load_skills_snapshot(skills_dir: Path) -> Optional[dict]:
"""Load the disk snapshot if it exists and its manifest still matches."""
snapshot_path = _skills_prompt_snapshot_path()
if not snapshot_path.exists():
return None
try:
snapshot = json.loads(snapshot_path.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(snapshot, dict):
return None
if snapshot.get("version") != _SKILLS_SNAPSHOT_VERSION:
return None
if snapshot.get("manifest") != _build_skills_manifest(skills_dir):
return None
return snapshot
def _write_skills_snapshot(
skills_dir: Path,
manifest: dict[str, list[int]],
skill_entries: list[dict],
category_descriptions: dict[str, str],
) -> None:
"""Persist skill metadata to disk for fast cold-start reuse."""
payload = {
"version": _SKILLS_SNAPSHOT_VERSION,
"manifest": manifest,
"skills": skill_entries,
"category_descriptions": category_descriptions,
}
try:
atomic_json_write(_skills_prompt_snapshot_path(), payload)
except Exception as e:
logger.debug("Could not write skills prompt snapshot: %s", e)
def _build_snapshot_entry(
skill_file: Path,
skills_dir: Path,
frontmatter: dict,
description: str,
) -> dict:
"""Build a serialisable metadata dict for one skill."""
rel_path = skill_file.relative_to(skills_dir)
parts = rel_path.parts
if len(parts) >= 2:
skill_name = parts[-2]
category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0]
else:
category = "general"
skill_name = skill_file.parent.name
platforms = frontmatter.get("platforms") or []
if isinstance(platforms, str):
platforms = [platforms]
return {
"skill_name": skill_name,
"category": category,
"frontmatter_name": str(frontmatter.get("name", skill_name)),
"description": description,
"platforms": [str(p).strip() for p in platforms if str(p).strip()],
"conditions": extract_skill_conditions(frontmatter),
}
# ========================================================================= # =========================================================================
# Skills index # Skills index
# ========================================================================= # =========================================================================
@@ -241,22 +359,13 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]:
(True, {}, "") to err on the side of showing the skill. (True, {}, "") to err on the side of showing the skill.
""" """
try: try:
from tools.skills_tool import _parse_frontmatter, skill_matches_platform
raw = skill_file.read_text(encoding="utf-8")[:2000] raw = skill_file.read_text(encoding="utf-8")[:2000]
frontmatter, _ = _parse_frontmatter(raw) frontmatter, _ = parse_frontmatter(raw)
if not skill_matches_platform(frontmatter): if not skill_matches_platform(frontmatter):
return False, {}, "" return False, frontmatter, ""
desc = "" return True, frontmatter, extract_skill_description(frontmatter)
raw_desc = frontmatter.get("description", "")
if raw_desc:
desc = str(raw_desc).strip().strip("'\"")
if len(desc) > 60:
desc = desc[:57] + "..."
return True, frontmatter, desc
except Exception as e: except Exception as e:
logger.debug("Failed to parse skill file %s: %s", skill_file, e) logger.debug("Failed to parse skill file %s: %s", skill_file, e)
return True, {}, "" return True, {}, ""
@@ -265,16 +374,9 @@ def _parse_skill_file(skill_file: Path) -> tuple[bool, dict, str]:
def _read_skill_conditions(skill_file: Path) -> dict: def _read_skill_conditions(skill_file: Path) -> dict:
"""Extract conditional activation fields from SKILL.md frontmatter.""" """Extract conditional activation fields from SKILL.md frontmatter."""
try: try:
from tools.skills_tool import _parse_frontmatter
raw = skill_file.read_text(encoding="utf-8")[:2000] raw = skill_file.read_text(encoding="utf-8")[:2000]
frontmatter, _ = _parse_frontmatter(raw) frontmatter, _ = parse_frontmatter(raw)
hermes = frontmatter.get("metadata", {}).get("hermes", {}) return extract_skill_conditions(frontmatter)
return {
"fallback_for_toolsets": hermes.get("fallback_for_toolsets", []),
"requires_toolsets": hermes.get("requires_toolsets", []),
"fallback_for_tools": hermes.get("fallback_for_tools", []),
"requires_tools": hermes.get("requires_tools", []),
}
except Exception as e: except Exception as e:
logger.debug("Failed to read skill conditions from %s: %s", skill_file, e) logger.debug("Failed to read skill conditions from %s: %s", skill_file, e)
return {} return {}
@@ -317,10 +419,12 @@ def build_skills_system_prompt(
) -> str: ) -> str:
"""Build a compact skill index for the system prompt. """Build a compact skill index for the system prompt.
Scans ~/.hermes/skills/ for SKILL.md files grouped by category. Two-layer cache:
Includes per-skill descriptions from frontmatter so the model can 1. In-process LRU dict keyed by (skills_dir, tools, toolsets)
match skills by meaning, not just name. 2. Disk snapshot (``.skills_prompt_snapshot.json``) validated by
Filters out skills incompatible with the current OS platform. mtime/size manifest — survives process restarts
Falls back to a full filesystem scan when both layers miss.
""" """
hermes_home = get_hermes_home() hermes_home = get_hermes_home()
skills_dir = hermes_home / "skills" skills_dir = hermes_home / "skills"
@@ -328,98 +432,140 @@ def build_skills_system_prompt(
if not skills_dir.exists(): if not skills_dir.exists():
return "" return ""
# Collect skills with descriptions, grouped by category. # ── Layer 1: in-process LRU cache ─────────────────────────────────
# Each entry: (skill_name, description) cache_key = (
# Supports sub-categories: skills/mlops/training/axolotl/SKILL.md str(skills_dir.resolve()),
# -> category "mlops/training", skill "axolotl" tuple(sorted(str(t) for t in (available_tools or set()))),
# Load disabled skill names once for the entire scan tuple(sorted(str(ts) for ts in (available_toolsets or set()))),
try: )
from tools.skills_tool import _get_disabled_skill_names with _SKILLS_PROMPT_CACHE_LOCK:
disabled = _get_disabled_skill_names() cached = _SKILLS_PROMPT_CACHE.get(cache_key)
except Exception: if cached is not None:
disabled = set() _SKILLS_PROMPT_CACHE.move_to_end(cache_key)
return cached
disabled = get_disabled_skill_names()
# ── Layer 2: disk snapshot ────────────────────────────────────────
snapshot = _load_skills_snapshot(skills_dir)
skills_by_category: dict[str, list[tuple[str, str]]] = {} skills_by_category: dict[str, list[tuple[str, str]]] = {}
for skill_file in skills_dir.rglob("SKILL.md"): category_descriptions: dict[str, str] = {}
is_compatible, frontmatter, desc = _parse_skill_file(skill_file)
if not is_compatible: if snapshot is not None:
continue # Fast path: use pre-parsed metadata from disk
rel_path = skill_file.relative_to(skills_dir) for entry in snapshot.get("skills", []):
parts = rel_path.parts if not isinstance(entry, dict):
if len(parts) >= 2: continue
skill_name = parts[-2] skill_name = entry.get("skill_name") or ""
category = "/".join(parts[:-2]) if len(parts) > 2 else parts[0] category = entry.get("category") or "general"
else: frontmatter_name = entry.get("frontmatter_name") or skill_name
category = "general" platforms = entry.get("platforms") or []
skill_name = skill_file.parent.name if not skill_matches_platform({"platforms": platforms}):
# Respect user's disabled skills config continue
fm_name = frontmatter.get("name", skill_name) if frontmatter_name in disabled or skill_name in disabled:
if fm_name in disabled or skill_name in disabled: continue
continue if not _skill_should_show(
# Extract conditions inline from already-parsed frontmatter entry.get("conditions") or {},
# (avoids redundant file re-read that _read_skill_conditions would do) available_tools,
hermes_meta = (frontmatter.get("metadata") or {}).get("hermes") or {} available_toolsets,
conditions = { ):
"fallback_for_toolsets": hermes_meta.get("fallback_for_toolsets", []), continue
"requires_toolsets": hermes_meta.get("requires_toolsets", []), skills_by_category.setdefault(category, []).append(
"fallback_for_tools": hermes_meta.get("fallback_for_tools", []), (skill_name, entry.get("description", ""))
"requires_tools": hermes_meta.get("requires_tools", []), )
category_descriptions = {
str(k): str(v)
for k, v in (snapshot.get("category_descriptions") or {}).items()
} }
if not _skill_should_show(conditions, available_tools, available_toolsets): else:
continue # Cold path: full filesystem scan + write snapshot for next time
skills_by_category.setdefault(category, []).append((skill_name, desc)) skill_entries: list[dict] = []
for skill_file in iter_skill_index_files(skills_dir, "SKILL.md"):
is_compatible, frontmatter, desc = _parse_skill_file(skill_file)
entry = _build_snapshot_entry(skill_file, skills_dir, frontmatter, desc)
skill_entries.append(entry)
if not is_compatible:
continue
skill_name = entry["skill_name"]
if entry["frontmatter_name"] in disabled or skill_name in disabled:
continue
if not _skill_should_show(
extract_skill_conditions(frontmatter),
available_tools,
available_toolsets,
):
continue
skills_by_category.setdefault(entry["category"], []).append(
(skill_name, entry["description"])
)
if not skills_by_category: # Read category-level DESCRIPTION.md files
return "" for desc_file in iter_skill_index_files(skills_dir, "DESCRIPTION.md"):
# Read category-level descriptions from DESCRIPTION.md
# Checks both the exact category path and parent directories
category_descriptions = {}
for category in skills_by_category:
cat_path = Path(category)
desc_file = skills_dir / cat_path / "DESCRIPTION.md"
if desc_file.exists():
try: try:
content = desc_file.read_text(encoding="utf-8") content = desc_file.read_text(encoding="utf-8")
match = re.search(r"^---\s*\n.*?description:\s*(.+?)\s*\n.*?^---", content, re.MULTILINE | re.DOTALL) fm, _ = parse_frontmatter(content)
if match: cat_desc = fm.get("description")
category_descriptions[category] = match.group(1).strip() if not cat_desc:
continue
rel = desc_file.relative_to(skills_dir)
cat = "/".join(rel.parts[:-1]) if len(rel.parts) > 1 else "general"
category_descriptions[cat] = str(cat_desc).strip().strip("'\"")
except Exception as e: except Exception as e:
logger.debug("Could not read skill description %s: %s", desc_file, e) logger.debug("Could not read skill description %s: %s", desc_file, e)
index_lines = [] _write_skills_snapshot(
for category in sorted(skills_by_category.keys()): skills_dir,
cat_desc = category_descriptions.get(category, "") _build_skills_manifest(skills_dir),
if cat_desc: skill_entries,
index_lines.append(f" {category}: {cat_desc}") category_descriptions,
else: )
index_lines.append(f" {category}:")
# Deduplicate and sort skills within each category
seen = set()
for name, desc in sorted(skills_by_category[category], key=lambda x: x[0]):
if name in seen:
continue
seen.add(name)
if desc:
index_lines.append(f" - {name}: {desc}")
else:
index_lines.append(f" - {name}")
return ( if not skills_by_category:
"## Skills (mandatory)\n" result = ""
"Before replying, scan the skills below. If one clearly matches your task, " else:
"load it with skill_view(name) and follow its instructions. " index_lines = []
"If a skill has issues, fix it with skill_manage(action='patch').\n" for category in sorted(skills_by_category.keys()):
"After difficult/iterative tasks, offer to save as a skill. " cat_desc = category_descriptions.get(category, "")
"If a skill you loaded was missing steps, had wrong commands, or needed " if cat_desc:
"pitfalls you discovered, update it before finishing.\n" index_lines.append(f" {category}: {cat_desc}")
"\n" else:
"<available_skills>\n" index_lines.append(f" {category}:")
+ "\n".join(index_lines) + "\n" # Deduplicate and sort skills within each category
"</available_skills>\n" seen = set()
"\n" for name, desc in sorted(skills_by_category[category], key=lambda x: x[0]):
"If none match, proceed normally without loading a skill." if name in seen:
) continue
seen.add(name)
if desc:
index_lines.append(f" - {name}: {desc}")
else:
index_lines.append(f" - {name}")
result = (
"## Skills (mandatory)\n"
"Before replying, scan the skills below. If one clearly matches your task, "
"load it with skill_view(name) and follow its instructions. "
"If a skill has issues, fix it with skill_manage(action='patch').\n"
"After difficult/iterative tasks, offer to save as a skill. "
"If a skill you loaded was missing steps, had wrong commands, or needed "
"pitfalls you discovered, update it before finishing.\n"
"\n"
"<available_skills>\n"
+ "\n".join(index_lines) + "\n"
"</available_skills>\n"
"\n"
"If none match, proceed normally without loading a skill."
)
# ── Store in LRU cache ────────────────────────────────────────────
with _SKILLS_PROMPT_CACHE_LOCK:
_SKILLS_PROMPT_CACHE[cache_key] = result
_SKILLS_PROMPT_CACHE.move_to_end(cache_key)
while len(_SKILLS_PROMPT_CACHE) > _SKILLS_PROMPT_CACHE_MAX:
_SKILLS_PROMPT_CACHE.popitem(last=False)
return result
# ========================================================================= # =========================================================================

203
agent/skill_utils.py Normal file
View File

@@ -0,0 +1,203 @@
"""Lightweight skill metadata utilities shared by prompt_builder and skills_tool.
This module intentionally avoids importing the tool registry, CLI config, or any
heavy dependency chain. It is safe to import at module level without triggering
tool registration or provider resolution.
"""
import logging
import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# ── Platform mapping ──────────────────────────────────────────────────────
PLATFORM_MAP = {
"macos": "darwin",
"linux": "linux",
"windows": "win32",
}
EXCLUDED_SKILL_DIRS = frozenset((".git", ".github", ".hub"))
# ── Lazy YAML loader ─────────────────────────────────────────────────────
_yaml_load_fn = None
def yaml_load(content: str):
"""Parse YAML with lazy import and CSafeLoader preference."""
global _yaml_load_fn
if _yaml_load_fn is None:
import yaml
loader = getattr(yaml, "CSafeLoader", None) or yaml.SafeLoader
def _load(value: str):
return yaml.load(value, Loader=loader)
_yaml_load_fn = _load
return _yaml_load_fn(content)
# ── Frontmatter parsing ──────────────────────────────────────────────────
def parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]:
"""Parse YAML frontmatter from a markdown string.
Uses yaml with CSafeLoader for full YAML support (nested metadata, lists)
with a fallback to simple key:value splitting for robustness.
Returns:
(frontmatter_dict, remaining_body)
"""
frontmatter: Dict[str, Any] = {}
body = content
if not content.startswith("---"):
return frontmatter, body
end_match = re.search(r"\n---\s*\n", content[3:])
if not end_match:
return frontmatter, body
yaml_content = content[3 : end_match.start() + 3]
body = content[end_match.end() + 3 :]
try:
parsed = yaml_load(yaml_content)
if isinstance(parsed, dict):
frontmatter = parsed
except Exception:
# Fallback: simple key:value parsing for malformed YAML
for line in yaml_content.strip().split("\n"):
if ":" not in line:
continue
key, value = line.split(":", 1)
frontmatter[key.strip()] = value.strip()
return frontmatter, body
# ── Platform matching ─────────────────────────────────────────────────────
def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool:
"""Return True when the skill is compatible with the current OS.
Skills declare platform requirements via a top-level ``platforms`` list
in their YAML frontmatter::
platforms: [macos] # macOS only
platforms: [macos, linux] # macOS and Linux
If the field is absent or empty the skill is compatible with **all**
platforms (backward-compatible default).
"""
platforms = frontmatter.get("platforms")
if not platforms:
return True
if not isinstance(platforms, list):
platforms = [platforms]
current = sys.platform
for platform in platforms:
normalized = str(platform).lower().strip()
mapped = PLATFORM_MAP.get(normalized, normalized)
if current.startswith(mapped):
return True
return False
# ── Disabled skills ───────────────────────────────────────────────────────
def get_disabled_skill_names() -> Set[str]:
"""Read disabled skill names from config.yaml.
Resolves platform from ``HERMES_PLATFORM`` env var, falls back to
the global disabled list. Reads the config file directly (no CLI
config imports) to stay lightweight.
"""
config_path = get_hermes_home() / "config.yaml"
if not config_path.exists():
return set()
try:
parsed = yaml_load(config_path.read_text(encoding="utf-8"))
except Exception as e:
logger.debug("Could not read skill config %s: %s", config_path, e)
return set()
if not isinstance(parsed, dict):
return set()
skills_cfg = parsed.get("skills")
if not isinstance(skills_cfg, dict):
return set()
resolved_platform = os.getenv("HERMES_PLATFORM")
if resolved_platform:
platform_disabled = (skills_cfg.get("platform_disabled") or {}).get(
resolved_platform
)
if platform_disabled is not None:
return _normalize_string_set(platform_disabled)
return _normalize_string_set(skills_cfg.get("disabled"))
def _normalize_string_set(values) -> Set[str]:
if values is None:
return set()
if isinstance(values, str):
values = [values]
return {str(v).strip() for v in values if str(v).strip()}
# ── Condition extraction ──────────────────────────────────────────────────
def extract_skill_conditions(frontmatter: Dict[str, Any]) -> Dict[str, List]:
"""Extract conditional activation fields from parsed frontmatter."""
hermes = (frontmatter.get("metadata") or {}).get("hermes") or {}
return {
"fallback_for_toolsets": hermes.get("fallback_for_toolsets", []),
"requires_toolsets": hermes.get("requires_toolsets", []),
"fallback_for_tools": hermes.get("fallback_for_tools", []),
"requires_tools": hermes.get("requires_tools", []),
}
# ── Description extraction ────────────────────────────────────────────────
def extract_skill_description(frontmatter: Dict[str, Any]) -> str:
"""Extract a truncated description from parsed frontmatter."""
raw_desc = frontmatter.get("description", "")
if not raw_desc:
return ""
desc = str(raw_desc).strip().strip("'\"")
if len(desc) > 60:
return desc[:57] + "..."
return desc
# ── File iteration ────────────────────────────────────────────────────────
def iter_skill_index_files(skills_dir: Path, filename: str):
"""Walk skills_dir yielding sorted paths matching *filename*.
Excludes ``.git``, ``.github``, ``.hub`` directories.
"""
matches = []
for root, dirs, files in os.walk(skills_dir):
dirs[:] = [d for d in dirs if d not in EXCLUDED_SKILL_DIRS]
if filename in files:
matches.append(Path(root) / filename)
for path in sorted(matches, key=lambda p: str(p.relative_to(skills_dir))):
yield path

View File

@@ -417,6 +417,13 @@ def do_install(identifier: str, category: str = "", force: bool = False,
c.print(f"[bold green]Installed:[/] {install_dir.relative_to(SKILLS_DIR)}") c.print(f"[bold green]Installed:[/] {install_dir.relative_to(SKILLS_DIR)}")
c.print(f"[dim]Files: {', '.join(bundle.files.keys())}[/]\n") c.print(f"[dim]Files: {', '.join(bundle.files.keys())}[/]\n")
# Invalidate the skills prompt cache so the new skill appears immediately
try:
from agent.prompt_builder import clear_skills_system_prompt_cache
clear_skills_system_prompt_cache(clear_snapshot=True)
except Exception:
pass
def do_inspect(identifier: str, console: Optional[Console] = None) -> None: def do_inspect(identifier: str, console: Optional[Console] = None) -> None:
"""Preview a skill's SKILL.md content without installing.""" """Preview a skill's SKILL.md content without installing."""
@@ -623,6 +630,11 @@ def do_uninstall(name: str, console: Optional[Console] = None,
success, msg = uninstall_skill(name) success, msg = uninstall_skill(name)
if success: if success:
c.print(f"[bold green]{msg}[/]\n") c.print(f"[bold green]{msg}[/]\n")
try:
from agent.prompt_builder import clear_skills_system_prompt_cache
clear_skills_system_prompt_cache(clear_snapshot=True)
except Exception:
pass
else: else:
c.print(f"[bold red]Error:[/] {msg}\n") c.print(f"[bold red]Error:[/] {msg}\n")

View File

@@ -232,7 +232,18 @@ class TestPromptBuilderImports:
# ========================================================================= # =========================================================================
import pytest
class TestBuildSkillsSystemPrompt: class TestBuildSkillsSystemPrompt:
@pytest.fixture(autouse=True)
def _clear_skills_cache(self):
"""Ensure the in-process skills prompt cache doesn't leak between tests."""
from agent.prompt_builder import clear_skills_system_prompt_cache
clear_skills_system_prompt_cache(clear_snapshot=True)
yield
clear_skills_system_prompt_cache(clear_snapshot=True)
def test_empty_when_no_skills_dir(self, monkeypatch, tmp_path): def test_empty_when_no_skills_dir(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path)) monkeypatch.setenv("HERMES_HOME", str(tmp_path))
result = build_skills_system_prompt() result = build_skills_system_prompt()
@@ -302,7 +313,7 @@ class TestBuildSkillsSystemPrompt:
from unittest.mock import patch from unittest.mock import patch
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
result = build_skills_system_prompt() result = build_skills_system_prompt()
@@ -330,7 +341,7 @@ class TestBuildSkillsSystemPrompt:
from unittest.mock import patch from unittest.mock import patch
with patch( with patch(
"tools.skills_tool._get_disabled_skill_names", "agent.prompt_builder.get_disabled_skill_names",
return_value={"old-tool"}, return_value={"old-tool"},
): ):
result = build_skills_system_prompt() result = build_skills_system_prompt()
@@ -804,6 +815,13 @@ class TestSkillShouldShow:
class TestBuildSkillsSystemPromptConditional: class TestBuildSkillsSystemPromptConditional:
@pytest.fixture(autouse=True)
def _clear_skills_cache(self):
from agent.prompt_builder import clear_skills_system_prompt_cache
clear_skills_system_prompt_cache(clear_snapshot=True)
yield
clear_skills_system_prompt_cache(clear_snapshot=True)
def test_fallback_skill_hidden_when_primary_available(self, monkeypatch, tmp_path): def test_fallback_skill_hidden_when_primary_available(self, monkeypatch, tmp_path):
monkeypatch.setenv("HERMES_HOME", str(tmp_path)) monkeypatch.setenv("HERMES_HOME", str(tmp_path))
skill_dir = tmp_path / "skills" / "search" / "duckduckgo" skill_dir = tmp_path / "skills" / "search" / "duckduckgo"

View File

@@ -54,7 +54,7 @@ class TestScanSkillCommands:
"""macOS-only skills should not register slash commands on Linux.""" """macOS-only skills should not register slash commands on Linux."""
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "linux" mock_sys.platform = "linux"
_make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n") _make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n")
@@ -67,7 +67,7 @@ class TestScanSkillCommands:
"""macOS-only skills should register slash commands on macOS.""" """macOS-only skills should register slash commands on macOS."""
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
_make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n") _make_skill(tmp_path, "imessage", frontmatter_extra="platforms: [macos]\n")
@@ -78,7 +78,7 @@ class TestScanSkillCommands:
"""Skills without platforms field should register on any platform.""" """Skills without platforms field should register on any platform."""
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "win32" mock_sys.platform = "win32"
_make_skill(tmp_path, "generic-tool") _make_skill(tmp_path, "generic-tool")

View File

@@ -589,38 +589,38 @@ class TestSkillMatchesPlatform:
assert skill_matches_platform({"platforms": None}) is True assert skill_matches_platform({"platforms": None}) is True
def test_macos_on_darwin(self): def test_macos_on_darwin(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
assert skill_matches_platform({"platforms": ["macos"]}) is True assert skill_matches_platform({"platforms": ["macos"]}) is True
def test_macos_on_linux(self): def test_macos_on_linux(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "linux" mock_sys.platform = "linux"
assert skill_matches_platform({"platforms": ["macos"]}) is False assert skill_matches_platform({"platforms": ["macos"]}) is False
def test_linux_on_linux(self): def test_linux_on_linux(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "linux" mock_sys.platform = "linux"
assert skill_matches_platform({"platforms": ["linux"]}) is True assert skill_matches_platform({"platforms": ["linux"]}) is True
def test_linux_on_darwin(self): def test_linux_on_darwin(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
assert skill_matches_platform({"platforms": ["linux"]}) is False assert skill_matches_platform({"platforms": ["linux"]}) is False
def test_windows_on_win32(self): def test_windows_on_win32(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "win32" mock_sys.platform = "win32"
assert skill_matches_platform({"platforms": ["windows"]}) is True assert skill_matches_platform({"platforms": ["windows"]}) is True
def test_windows_on_linux(self): def test_windows_on_linux(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "linux" mock_sys.platform = "linux"
assert skill_matches_platform({"platforms": ["windows"]}) is False assert skill_matches_platform({"platforms": ["windows"]}) is False
def test_multi_platform_match(self): def test_multi_platform_match(self):
"""Skills listing multiple platforms should match any of them.""" """Skills listing multiple platforms should match any of them."""
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
assert skill_matches_platform({"platforms": ["macos", "linux"]}) is True assert skill_matches_platform({"platforms": ["macos", "linux"]}) is True
mock_sys.platform = "linux" mock_sys.platform = "linux"
@@ -630,20 +630,20 @@ class TestSkillMatchesPlatform:
def test_string_instead_of_list(self): def test_string_instead_of_list(self):
"""A single string value should be treated as a one-element list.""" """A single string value should be treated as a one-element list."""
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
assert skill_matches_platform({"platforms": "macos"}) is True assert skill_matches_platform({"platforms": "macos"}) is True
mock_sys.platform = "linux" mock_sys.platform = "linux"
assert skill_matches_platform({"platforms": "macos"}) is False assert skill_matches_platform({"platforms": "macos"}) is False
def test_case_insensitive(self): def test_case_insensitive(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
assert skill_matches_platform({"platforms": ["MacOS"]}) is True assert skill_matches_platform({"platforms": ["MacOS"]}) is True
assert skill_matches_platform({"platforms": ["MACOS"]}) is True assert skill_matches_platform({"platforms": ["MACOS"]}) is True
def test_unknown_platform_no_match(self): def test_unknown_platform_no_match(self):
with patch("tools.skills_tool.sys") as mock_sys: with patch("agent.skill_utils.sys") as mock_sys:
mock_sys.platform = "linux" mock_sys.platform = "linux"
assert skill_matches_platform({"platforms": ["freebsd"]}) is False assert skill_matches_platform({"platforms": ["freebsd"]}) is False
@@ -659,7 +659,7 @@ class TestFindAllSkillsPlatformFiltering:
def test_excludes_incompatible_platform(self, tmp_path): def test_excludes_incompatible_platform(self, tmp_path):
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "linux" mock_sys.platform = "linux"
_make_skill(tmp_path, "universal-skill") _make_skill(tmp_path, "universal-skill")
@@ -672,7 +672,7 @@ class TestFindAllSkillsPlatformFiltering:
def test_includes_matching_platform(self, tmp_path): def test_includes_matching_platform(self, tmp_path):
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "darwin" mock_sys.platform = "darwin"
_make_skill(tmp_path, "mac-only", frontmatter_extra="platforms: [macos]\n") _make_skill(tmp_path, "mac-only", frontmatter_extra="platforms: [macos]\n")
@@ -684,7 +684,7 @@ class TestFindAllSkillsPlatformFiltering:
"""Skills without platforms field should appear on any platform.""" """Skills without platforms field should appear on any platform."""
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
mock_sys.platform = "win32" mock_sys.platform = "win32"
_make_skill(tmp_path, "generic-skill") _make_skill(tmp_path, "generic-skill")
@@ -695,7 +695,7 @@ class TestFindAllSkillsPlatformFiltering:
def test_multi_platform_skill(self, tmp_path): def test_multi_platform_skill(self, tmp_path):
with ( with (
patch("tools.skills_tool.SKILLS_DIR", tmp_path), patch("tools.skills_tool.SKILLS_DIR", tmp_path),
patch("tools.skills_tool.sys") as mock_sys, patch("agent.skill_utils.sys") as mock_sys,
): ):
_make_skill( _make_skill(
tmp_path, "cross-plat", frontmatter_extra="platforms: [macos, linux]\n" tmp_path, "cross-plat", frontmatter_extra="platforms: [macos, linux]\n"

View File

@@ -547,6 +547,13 @@ def skill_manage(
else: else:
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"} result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
if result.get("success"):
try:
from agent.prompt_builder import clear_skills_system_prompt_cache
clear_skills_system_prompt_cache(clear_snapshot=True)
except Exception:
pass
return json.dumps(result, ensure_ascii=False) return json.dumps(result, ensure_ascii=False)

View File

@@ -120,28 +120,11 @@ def set_secret_capture_callback(callback) -> None:
def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool: def skill_matches_platform(frontmatter: Dict[str, Any]) -> bool:
"""Check if a skill is compatible with the current OS platform. """Check if a skill is compatible with the current OS platform.
Skills declare platform requirements via a top-level ``platforms`` list Delegates to ``agent.skill_utils.skill_matches_platform`` — kept here
in their YAML frontmatter:: as a public re-export so existing callers don't need updating.
platforms: [macos] # macOS only
platforms: [macos, linux] # macOS and Linux
Valid values: ``macos``, ``linux``, ``windows``.
If the field is absent or empty the skill is compatible with **all**
platforms (backward-compatible default).
""" """
platforms = frontmatter.get("platforms") from agent.skill_utils import skill_matches_platform as _impl
if not platforms: return _impl(frontmatter)
return True # No restriction → loads everywhere
if not isinstance(platforms, list):
platforms = [platforms]
current = sys.platform
for p in platforms:
mapped = _PLATFORM_MAP.get(str(p).lower().strip(), str(p).lower().strip())
if current.startswith(mapped):
return True
return False
def _normalize_prerequisite_values(value: Any) -> List[str]: def _normalize_prerequisite_values(value: Any) -> List[str]:
@@ -419,40 +402,13 @@ def check_skills_requirements() -> bool:
def _parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]: def _parse_frontmatter(content: str) -> Tuple[Dict[str, Any], str]:
"""Parse YAML frontmatter from markdown content.
Delegates to ``agent.skill_utils.parse_frontmatter`` — kept here
as a public re-export so existing callers don't need updating.
""" """
Parse YAML frontmatter from markdown content. from agent.skill_utils import parse_frontmatter
return parse_frontmatter(content)
Uses yaml.safe_load for full YAML support (nested metadata, lists, etc.)
with a fallback to simple key:value splitting for robustness.
Args:
content: Full markdown file content
Returns:
Tuple of (frontmatter dict, remaining content)
"""
frontmatter = {}
body = content
if content.startswith("---"):
end_match = re.search(r"\n---\s*\n", content[3:])
if end_match:
yaml_content = content[3 : end_match.start() + 3]
body = content[end_match.end() + 3 :]
try:
parsed = yaml.safe_load(yaml_content)
if isinstance(parsed, dict):
frontmatter = parsed
# yaml.safe_load returns None for empty frontmatter
except yaml.YAMLError:
# Fallback: simple key:value parsing for malformed YAML
for line in yaml_content.strip().split("\n"):
if ":" in line:
key, value = line.split(":", 1)
frontmatter[key.strip()] = value.strip()
return frontmatter, body
def _get_category_from_path(skill_path: Path) -> Optional[str]: def _get_category_from_path(skill_path: Path) -> Optional[str]:
@@ -516,24 +472,13 @@ def _parse_tags(tags_value) -> List[str]:
def _get_disabled_skill_names() -> Set[str]: def _get_disabled_skill_names() -> Set[str]:
"""Load disabled skill names from config (once per call). """Load disabled skill names from config.
Resolves platform from ``HERMES_PLATFORM`` env var, falls back to Delegates to ``agent.skill_utils.get_disabled_skill_names`` — kept here
the global disabled list. as a public re-export so existing callers don't need updating.
""" """
import os from agent.skill_utils import get_disabled_skill_names
try: return get_disabled_skill_names()
from hermes_cli.config import load_config
config = load_config()
skills_cfg = config.get("skills", {})
resolved_platform = os.getenv("HERMES_PLATFORM")
if resolved_platform:
platform_disabled = skills_cfg.get("platform_disabled", {}).get(resolved_platform)
if platform_disabled is not None:
return set(platform_disabled)
return set(skills_cfg.get("disabled", []))
except Exception:
return set()
def _is_skill_disabled(name: str, platform: str = None) -> bool: def _is_skill_disabled(name: str, platform: str = None) -> bool: