Harden agent attack surface: scan writes to memory, skills, cron, and context files

The security scanner (skills_guard.py) was only wired into the hub install path.
All other write paths to persistent state — skills created by the agent, memory
entries, cron prompts, and context files — bypassed it entirely. This closes
those gaps:

- file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc.
- code_execution_tool: filter secret env vars from sandbox child process
- skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback
- skills_guard: add "agent-created" trust level (same policy as community)
- memory_tool: scan content for injection/exfil before system prompt injection
- prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection
- cronjob_tools: scan cron prompts for critical threats before scheduling

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Raeli Savitt
2026-02-25 23:43:15 -05:00
parent 0310170869
commit 95b6bd5df6
7 changed files with 278 additions and 8 deletions

View File

@@ -12,6 +12,50 @@ from typing import Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Context file scanning — detect prompt injection in AGENTS.md, .cursorrules,
# SOUL.md before they get injected into the system prompt.
# ---------------------------------------------------------------------------
_CONTEXT_THREAT_PATTERNS = [
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
(r'system\s+prompt\s+override', "sys_prompt_override"),
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
(r'<!--[^>]*(?:ignore|override|system|secret|hidden)[^>]*-->', "html_comment_injection"),
(r'<\s*div\s+style\s*=\s*["\'].*display\s*:\s*none', "hidden_div"),
(r'translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)', "translate_execute"),
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
]
_CONTEXT_INVISIBLE_CHARS = {
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
}
def _scan_context_content(content: str, filename: str) -> str:
"""Scan context file content for injection. Returns sanitized content."""
findings = []
# Check invisible unicode
for char in _CONTEXT_INVISIBLE_CHARS:
if char in content:
findings.append(f"invisible unicode U+{ord(char):04X}")
# Check threat patterns
for pattern, pid in _CONTEXT_THREAT_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
findings.append(pid)
if findings:
logger.warning("Context file %s blocked: %s", filename, ", ".join(findings))
return f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]"
return content
# =========================================================================
# Constants
# =========================================================================
@@ -215,6 +259,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
content = agents_path.read_text(encoding="utf-8").strip()
if content:
rel_path = agents_path.relative_to(cwd_path)
content = _scan_context_content(content, str(rel_path))
total_agents_content += f"## {rel_path}\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read %s: %s", agents_path, e)
@@ -230,6 +275,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
try:
content = cursorrules_file.read_text(encoding="utf-8").strip()
if content:
content = _scan_context_content(content, ".cursorrules")
cursorrules_content += f"## .cursorrules\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read .cursorrules: %s", e)
@@ -241,6 +287,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
try:
content = mdc_file.read_text(encoding="utf-8").strip()
if content:
content = _scan_context_content(content, f".cursor/rules/{mdc_file.name}")
cursorrules_content += f"## .cursor/rules/{mdc_file.name}\n\n{content}\n\n"
except Exception as e:
logger.debug("Could not read %s: %s", mdc_file, e)
@@ -265,6 +312,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str:
try:
content = soul_path.read_text(encoding="utf-8").strip()
if content:
content = _scan_context_content(content, "SOUL.md")
content = _truncate_content(content, "SOUL.md")
sections.append(
f"## SOUL.md\n\nIf SOUL.md is present, embody its persona and tone. "

View File

@@ -381,7 +381,14 @@ def execute_code(
rpc_thread.start()
# --- Spawn child process ---
child_env = os.environ.copy()
# Filter out secret env vars to prevent exfiltration from sandbox
_SECRET_PATTERNS = ("KEY", "TOKEN", "SECRET", "PASSWORD", "CREDENTIAL",
"API_KEY", "OPENROUTER", "ANTHROPIC", "OPENAI",
"AWS_SECRET", "GITHUB_TOKEN")
child_env = {
k: v for k, v in os.environ.items()
if not any(pat in k.upper() for pat in _SECRET_PATTERNS)
}
child_env["HERMES_RPC_SOCKET"] = sock_path
child_env["PYTHONDONTWRITEBYTECODE"] = "1"

View File

@@ -10,6 +10,7 @@ The prompt must contain ALL necessary information.
import json
import os
import re
from typing import Optional
# Import from cron module (will be available when properly installed)
@@ -20,6 +21,41 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from cron.jobs import create_job, get_job, list_jobs, remove_job
# ---------------------------------------------------------------------------
# Cron prompt scanning — critical-severity patterns only, since cron prompts
# run in fresh sessions with full tool access.
# ---------------------------------------------------------------------------
_CRON_THREAT_PATTERNS = [
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
(r'system\s+prompt\s+override', "sys_prompt_override"),
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
(r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"),
(r'authorized_keys', "ssh_backdoor"),
(r'/etc/sudoers|visudo', "sudoers_mod"),
(r'rm\s+-rf\s+/', "destructive_root_rm"),
]
_CRON_INVISIBLE_CHARS = {
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
}
def _scan_cron_prompt(prompt: str) -> str:
"""Scan a cron prompt for critical threats. Returns error string if blocked, else empty."""
for char in _CRON_INVISIBLE_CHARS:
if char in prompt:
return f"Blocked: prompt contains invisible unicode U+{ord(char):04X} (possible injection)."
for pattern, pid in _CRON_THREAT_PATTERNS:
if re.search(pattern, prompt, re.IGNORECASE):
return f"Blocked: prompt matches threat pattern '{pid}'. Cron prompts must not contain injection or exfiltration payloads."
return ""
# =============================================================================
# Tool: schedule_cronjob
# =============================================================================
@@ -71,6 +107,11 @@ def schedule_cronjob(
Returns:
JSON with job_id, next_run time, and confirmation
"""
# Scan prompt for critical threats before scheduling
scan_error = _scan_cron_prompt(prompt)
if scan_error:
return json.dumps({"success": False, "error": scan_error}, indent=2)
# Get origin info from environment if available
origin = None
origin_platform = os.getenv("HERMES_SESSION_PLATFORM")

View File

@@ -35,6 +35,53 @@ from typing import Optional, List, Dict, Any, Tuple
from pathlib import Path
# ---------------------------------------------------------------------------
# Write-path deny list — blocks writes to sensitive system/credential files
# ---------------------------------------------------------------------------
_HOME = str(Path.home())
WRITE_DENIED_PATHS = {
os.path.join(_HOME, ".ssh", "authorized_keys"),
os.path.join(_HOME, ".ssh", "id_rsa"),
os.path.join(_HOME, ".ssh", "id_ed25519"),
os.path.join(_HOME, ".ssh", "config"),
os.path.join(_HOME, ".hermes", ".env"),
os.path.join(_HOME, ".bashrc"),
os.path.join(_HOME, ".zshrc"),
os.path.join(_HOME, ".profile"),
os.path.join(_HOME, ".bash_profile"),
os.path.join(_HOME, ".zprofile"),
os.path.join(_HOME, ".netrc"),
os.path.join(_HOME, ".pgpass"),
os.path.join(_HOME, ".npmrc"),
os.path.join(_HOME, ".pypirc"),
"/etc/sudoers",
"/etc/passwd",
"/etc/shadow",
}
WRITE_DENIED_PREFIXES = [
os.path.join(_HOME, ".ssh") + os.sep,
os.path.join(_HOME, ".aws") + os.sep,
os.path.join(_HOME, ".gnupg") + os.sep,
os.path.join(_HOME, ".kube") + os.sep,
"/etc/sudoers.d" + os.sep,
"/etc/systemd" + os.sep,
]
def _is_write_denied(path: str) -> bool:
"""Return True if path is on the write deny list."""
resolved = os.path.realpath(os.path.expanduser(path))
if resolved in WRITE_DENIED_PATHS:
return True
for prefix in WRITE_DENIED_PREFIXES:
if resolved.startswith(prefix):
return True
return False
# =============================================================================
# Result Data Classes
# =============================================================================
@@ -564,21 +611,25 @@ class ShellFileOperations(FileOperations):
def write_file(self, path: str, content: str) -> WriteResult:
"""
Write content to a file, creating parent directories as needed.
Pipes content through stdin to avoid OS ARG_MAX limits on large
files. The content never appears in the shell command string —
only the file path does.
Args:
path: File path to write
content: Content to write
Returns:
WriteResult with bytes written or error
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Create parent directories
parent = os.path.dirname(path)
dirs_created = False
@@ -619,19 +670,23 @@ class ShellFileOperations(FileOperations):
replace_all: bool = False) -> PatchResult:
"""
Replace text in a file using fuzzy matching.
Args:
path: File path to modify
old_string: Text to find (must be unique unless replace_all=True)
new_string: Replacement text
replace_all: If True, replace all occurrences
Returns:
PatchResult with diff and lint results
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
# Block writes to sensitive paths
if _is_write_denied(path):
return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Read current content
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)

View File

@@ -24,17 +24,66 @@ Design:
"""
import json
import logging
import os
import re
import tempfile
from pathlib import Path
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
# Where memory files live
MEMORY_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "memories"
ENTRY_DELIMITER = "\n§\n"
# ---------------------------------------------------------------------------
# Memory content scanning — lightweight check for injection/exfiltration
# in content that gets injected into the system prompt.
# ---------------------------------------------------------------------------
_MEMORY_THREAT_PATTERNS = [
# Prompt injection
(r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"),
(r'you\s+are\s+now\s+', "role_hijack"),
(r'do\s+not\s+tell\s+the\s+user', "deception_hide"),
(r'system\s+prompt\s+override', "sys_prompt_override"),
(r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"),
(r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"),
# Exfiltration via curl/wget with secrets
(r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"),
(r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"),
(r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"),
# Persistence via shell rc
(r'authorized_keys', "ssh_backdoor"),
(r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"),
(r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"),
]
# Subset of invisible chars for injection detection
_INVISIBLE_CHARS = {
'\u200b', '\u200c', '\u200d', '\u2060', '\ufeff',
'\u202a', '\u202b', '\u202c', '\u202d', '\u202e',
}
def _scan_memory_content(content: str) -> Optional[str]:
"""Scan memory content for injection/exfil patterns. Returns error string if blocked."""
# Check invisible unicode
for char in _INVISIBLE_CHARS:
if char in content:
return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)."
# Check threat patterns
for pattern, pid in _MEMORY_THREAT_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads."
return None
class MemoryStore:
"""
Bounded curated memory with file persistence. One instance per AIAgent.
@@ -108,6 +157,11 @@ class MemoryStore:
if not content:
return {"success": False, "error": "Content cannot be empty."}
# Scan for injection/exfiltration before accepting
scan_error = _scan_memory_content(content)
if scan_error:
return {"success": False, "error": scan_error}
entries = self._entries_for(target)
limit = self._char_limit(target)
@@ -147,6 +201,11 @@ class MemoryStore:
if not new_content:
return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."}
# Scan replacement content for injection/exfiltration
scan_error = _scan_memory_content(new_content)
if scan_error:
return {"success": False, "error": scan_error}
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]

View File

@@ -33,12 +33,38 @@ Directory layout for user skills:
"""
import json
import logging
import os
import re
import shutil
from pathlib import Path
from typing import Dict, Any, Optional
logger = logging.getLogger(__name__)
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try:
from tools.skills_guard import scan_skill, should_allow_install, format_scan_report
_GUARD_AVAILABLE = True
except ImportError:
_GUARD_AVAILABLE = False
def _security_scan_skill(skill_dir: Path) -> Optional[str]:
"""Scan a skill directory after write. Returns error string if blocked, else None."""
if not _GUARD_AVAILABLE:
return None
try:
result = scan_skill(skill_dir, source="agent-created")
allowed, reason = should_allow_install(result)
if not allowed:
report = format_scan_report(result)
return f"Security scan blocked this skill ({reason}):\n{report}"
except Exception as e:
logger.warning("Security scan failed for %s: %s", skill_dir, e)
return None
import yaml
@@ -196,6 +222,12 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
skill_md = skill_dir / "SKILL.md"
skill_md.write_text(content, encoding="utf-8")
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
if scan_error:
shutil.rmtree(skill_dir, ignore_errors=True)
return {"success": False, "error": scan_error}
result = {
"success": True,
"message": f"Skill '{name}' created.",
@@ -222,8 +254,17 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
skill_md = existing["path"] / "SKILL.md"
# Back up original content for rollback
original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None
skill_md.write_text(content, encoding="utf-8")
# Security scan — roll back on block
scan_error = _security_scan_skill(existing["path"])
if scan_error:
if original_content is not None:
skill_md.write_text(original_content, encoding="utf-8")
return {"success": False, "error": scan_error}
return {
"success": True,
"message": f"Skill '{name}' updated.",
@@ -300,8 +341,15 @@ def _patch_skill(
"error": f"Patch would break SKILL.md structure: {err}",
}
original_content = content # for rollback
target.write_text(new_content, encoding="utf-8")
# Security scan — roll back on block
scan_error = _security_scan_skill(skill_dir)
if scan_error:
target.write_text(original_content, encoding="utf-8")
return {"success": False, "error": scan_error}
replacements = count if replace_all else 1
return {
"success": True,
@@ -344,8 +392,19 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
target = existing["path"] / file_path
target.parent.mkdir(parents=True, exist_ok=True)
# Back up for rollback
original_content = target.read_text(encoding="utf-8") if target.exists() else None
target.write_text(file_content, encoding="utf-8")
# Security scan — roll back on block
scan_error = _security_scan_skill(existing["path"])
if scan_error:
if original_content is not None:
target.write_text(original_content, encoding="utf-8")
else:
target.unlink(missing_ok=True)
return {"success": False, "error": scan_error}
return {
"success": True,
"message": f"File '{file_path}' written to skill '{name}'.",

View File

@@ -43,6 +43,7 @@ INSTALL_POLICY = {
"builtin": ("allow", "allow", "allow"),
"trusted": ("allow", "allow", "block"),
"community": ("allow", "block", "block"),
"agent-created": ("allow", "block", "block"),
}
VERDICT_INDEX = {"safe": 0, "caution": 1, "dangerous": 2}