Compare commits

...

1 Commits

Author SHA1 Message Date
Teknium
af4efe741c feat(cron): add script field for pre-run data collection
Add an optional 'script' parameter to cron jobs that references a Python
script. The script runs before each agent turn, and its stdout is injected
into the prompt as context. This enables stateful monitoring — the script
handles data collection and change detection, the LLM analyzes and reports.

- cron/jobs.py: add script field to create_job(), stored in job dict
- cron/scheduler.py: add _run_job_script() executor with timeout handling,
  inject script output/errors into _build_job_prompt()
- tools/cronjob_tools.py: add script to tool schema, create/update handlers,
  _format_job display
- hermes_cli/cron.py: add --script to create/edit, display in list/edit output
- hermes_cli/main.py: add --script argparse for cron create/edit subcommands
- tests/cron/test_cron_script.py: 20 tests covering job CRUD, script
  execution, path resolution, error handling, prompt injection, tool API

Script paths can be absolute or relative (resolved against ~/.hermes/scripts/).
Scripts run with a 120s timeout. Failures are injected as error context so
the LLM can report the problem. Empty string clears an attached script.
2026-04-04 10:28:05 -07:00
6 changed files with 417 additions and 1 deletions

View File

@@ -375,6 +375,7 @@ def create_job(
model: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -391,6 +392,9 @@ def create_job(
model: Optional per-job model override
provider: Optional per-job provider override
base_url: Optional per-job base URL override
script: Optional path to a Python script whose stdout is injected into the
prompt each run. The script runs before the agent turn, and its output
is prepended as context. Useful for data collection / change detection.
Returns:
The created job dict
@@ -419,6 +423,8 @@ def create_job(
normalized_model = normalized_model or None
normalized_provider = normalized_provider or None
normalized_base_url = normalized_base_url or None
normalized_script = str(script).strip() if isinstance(script, str) else None
normalized_script = normalized_script or None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
job = {
@@ -430,6 +436,7 @@ def create_job(
"model": normalized_model,
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {

View File

@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import subprocess
import sys
import traceback
@@ -229,11 +230,89 @@ def _deliver_result(job: dict, content: str) -> None:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
_SCRIPT_TIMEOUT = 120 # seconds
def _run_job_script(script_path: str) -> tuple[bool, str]:
"""Execute a cron job's data-collection script and capture its output.
Args:
script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute).
Returns:
(success, output) — on failure *output* contains the error message so the
LLM can report the problem to the user.
"""
from hermes_constants import get_hermes_home
path = Path(script_path).expanduser()
if not path.is_absolute():
# Resolve relative paths against HERMES_HOME/scripts/
path = get_hermes_home() / "scripts" / path
if not path.exists():
return False, f"Script not found: {path}"
if not path.is_file():
return False, f"Script path is not a file: {path}"
try:
result = subprocess.run(
[sys.executable, str(path)],
capture_output=True,
text=True,
timeout=_SCRIPT_TIMEOUT,
cwd=str(path.parent),
)
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
if result.returncode != 0:
parts = [f"Script exited with code {result.returncode}"]
if stderr:
parts.append(f"stderr:\n{stderr}")
if stdout:
parts.append(f"stdout:\n{stdout}")
return False, "\n".join(parts)
return True, stdout
except subprocess.TimeoutExpired:
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
except Exception as exc:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
# Run data-collection script if configured, inject output as context.
script_path = job.get("script")
if script_path:
success, script_output = _run_job_script(script_path)
if success:
if script_output:
prompt = (
"## Script Output\n"
"The following data was collected by a pre-run script. "
"Use it as context for your analysis.\n\n"
f"```\n{script_output}\n```\n\n"
f"{prompt}"
)
else:
prompt = (
"[Script ran successfully but produced no output.]\n\n"
f"{prompt}"
)
else:
prompt = (
"## Script Error\n"
"The data-collection script failed. Report this to the user.\n\n"
f"```\n{script_output}\n```\n\n"
f"{prompt}"
)
# Always prepend [SILENT] guidance so the cron agent can suppress
# delivery when it has nothing new or noteworthy to report.
silent_hint = (

View File

@@ -90,6 +90,9 @@ def cron_list(show_all: bool = False):
print(f" Deliver: {deliver_str}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
if script:
print(f" Script: {script}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -149,6 +152,7 @@ def cron_create(args):
repeat=getattr(args, "repeat", None),
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -158,6 +162,9 @@ def cron_create(args):
print(f" Schedule: {result['schedule']}")
if result.get("skills"):
print(f" Skills: {', '.join(result['skills'])}")
job_data = result.get("job", {})
if job_data.get("script"):
print(f" Script: {job_data['script']}")
print(f" Next run: {result['next_run_at']}")
return 0
@@ -195,6 +202,7 @@ def cron_edit(args):
deliver=getattr(args, "deliver", None),
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -208,6 +216,8 @@ def cron_edit(args):
print(f" Skills: {', '.join(updated['skills'])}")
else:
print(" Skills: none")
if updated.get("script"):
print(f" Script: {updated['script']}")
return 0

View File

@@ -4416,6 +4416,7 @@ For more help on a command:
cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id")
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
# cron edit
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
@@ -4429,6 +4430,7 @@ For more help on a command:
cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.")
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
# lifecycle actions
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")

View File

@@ -0,0 +1,300 @@
"""Tests for cron job script injection feature.
Tests cover:
- Script field in job creation / storage / update
- Script execution and output injection into prompts
- Error handling (missing script, timeout, non-zero exit)
- Path resolution (absolute, relative to HERMES_HOME/scripts/)
"""
import json
import os
import stat
import sys
import textwrap
from pathlib import Path
from unittest.mock import patch
import pytest
# Ensure project root is importable
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@pytest.fixture
def cron_env(tmp_path, monkeypatch):
"""Isolated cron environment with temp HERMES_HOME."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "cron").mkdir()
(hermes_home / "cron" / "output").mkdir()
(hermes_home / "scripts").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# Clear cached module-level paths
import cron.jobs as jobs_mod
monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home)
monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron")
monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json")
monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output")
return hermes_home
class TestJobScriptField:
"""Test that the script field is stored and retrieved correctly."""
def test_create_job_with_script(self, cron_env):
from cron.jobs import create_job, get_job
job = create_job(
prompt="Analyze the data",
schedule="every 30m",
script="/path/to/monitor.py",
)
assert job["script"] == "/path/to/monitor.py"
loaded = get_job(job["id"])
assert loaded["script"] == "/path/to/monitor.py"
def test_create_job_without_script(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h")
assert job.get("script") is None
def test_create_job_empty_script_normalized_to_none(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h", script=" ")
assert job.get("script") is None
def test_update_job_add_script(self, cron_env):
from cron.jobs import create_job, update_job
job = create_job(prompt="Hello", schedule="every 1h")
assert job.get("script") is None
updated = update_job(job["id"], {"script": "/new/script.py"})
assert updated["script"] == "/new/script.py"
def test_update_job_clear_script(self, cron_env):
from cron.jobs import create_job, update_job
job = create_job(prompt="Hello", schedule="every 1h", script="/some/script.py")
assert job["script"] == "/some/script.py"
updated = update_job(job["id"], {"script": None})
assert updated.get("script") is None
class TestRunJobScript:
"""Test the _run_job_script() function."""
def test_successful_script(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "test.py"
script.write_text('print("hello from script")\n')
success, output = _run_job_script(str(script))
assert success is True
assert output == "hello from script"
def test_script_relative_path(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "relative.py"
script.write_text('print("relative works")\n')
success, output = _run_job_script("relative.py")
assert success is True
assert output == "relative works"
def test_script_not_found(self, cron_env):
from cron.scheduler import _run_job_script
success, output = _run_job_script("/nonexistent/script.py")
assert success is False
assert "not found" in output.lower()
def test_script_nonzero_exit(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "fail.py"
script.write_text(textwrap.dedent("""\
import sys
print("partial output")
print("error info", file=sys.stderr)
sys.exit(1)
"""))
success, output = _run_job_script(str(script))
assert success is False
assert "exited with code 1" in output
assert "error info" in output
def test_script_empty_output(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "empty.py"
script.write_text("# no output\n")
success, output = _run_job_script(str(script))
assert success is True
assert output == ""
def test_script_timeout(self, cron_env, monkeypatch):
from cron import scheduler as sched_mod
from cron.scheduler import _run_job_script
# Use a very short timeout
monkeypatch.setattr(sched_mod, "_SCRIPT_TIMEOUT", 1)
script = cron_env / "scripts" / "slow.py"
script.write_text("import time; time.sleep(30)\n")
success, output = _run_job_script(str(script))
assert success is False
assert "timed out" in output.lower()
def test_script_json_output(self, cron_env):
"""Scripts can output structured JSON for the LLM to parse."""
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "json_out.py"
script.write_text(textwrap.dedent("""\
import json
data = {"new_prs": [{"number": 42, "title": "Fix bug"}]}
print(json.dumps(data, indent=2))
"""))
success, output = _run_job_script(str(script))
assert success is True
parsed = json.loads(output)
assert parsed["new_prs"][0]["number"] == 42
class TestBuildJobPromptWithScript:
"""Test that script output is injected into the prompt."""
def test_script_output_injected(self, cron_env):
from cron.scheduler import _build_job_prompt
script = cron_env / "scripts" / "data.py"
script.write_text('print("new PR: #123 fix typo")\n')
job = {
"prompt": "Report any notable changes.",
"script": str(script),
}
prompt = _build_job_prompt(job)
assert "## Script Output" in prompt
assert "new PR: #123 fix typo" in prompt
assert "Report any notable changes." in prompt
def test_script_error_injected(self, cron_env):
from cron.scheduler import _build_job_prompt
job = {
"prompt": "Report status.",
"script": "/nonexistent/script.py",
}
prompt = _build_job_prompt(job)
assert "## Script Error" in prompt
assert "not found" in prompt.lower()
assert "Report status." in prompt
def test_no_script_unchanged(self, cron_env):
from cron.scheduler import _build_job_prompt
job = {"prompt": "Simple job."}
prompt = _build_job_prompt(job)
assert "## Script Output" not in prompt
assert "Simple job." in prompt
def test_script_empty_output_noted(self, cron_env):
from cron.scheduler import _build_job_prompt
script = cron_env / "scripts" / "noop.py"
script.write_text("# nothing\n")
job = {
"prompt": "Check status.",
"script": str(script),
}
prompt = _build_job_prompt(job)
assert "no output" in prompt.lower()
assert "Check status." in prompt
class TestCronjobToolScript:
"""Test the cronjob tool's script parameter."""
def test_create_with_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/home/user/monitor.py",
))
assert result["success"] is True
assert result["job"]["script"] == "/home/user/monitor.py"
def test_update_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
create_result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
))
job_id = create_result["job_id"]
update_result = json.loads(cronjob(
action="update",
job_id=job_id,
script="/new/script.py",
))
assert update_result["success"] is True
assert update_result["job"]["script"] == "/new/script.py"
def test_clear_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
create_result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/some/script.py",
))
job_id = create_result["job_id"]
update_result = json.loads(cronjob(
action="update",
job_id=job_id,
script="",
))
assert update_result["success"] is True
assert "script" not in update_result["job"]
def test_list_shows_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/path/to/script.py",
)
list_result = json.loads(cronjob(action="list"))
assert list_result["success"] is True
assert len(list_result["jobs"]) == 1
assert list_result["jobs"][0]["script"] == "/path/to/script.py"

View File

@@ -116,7 +116,7 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash:
def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
prompt = job.get("prompt", "")
skills = _canonical_skills(job.get("skill"), job.get("skills"))
return {
result = {
"job_id": job["id"],
"name": job["name"],
"skill": skills[0] if skills else None,
@@ -136,6 +136,9 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
if job.get("script"):
result["script"] = job["script"]
return result
def cronjob(
@@ -153,6 +156,7 @@ def cronjob(
provider: Optional[str] = None,
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -183,6 +187,7 @@ def cronjob(
model=_normalize_optional_job_value(model),
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
)
return json.dumps(
{
@@ -265,6 +270,9 @@ def cronjob(
updates["provider"] = _normalize_optional_job_value(provider)
if base_url is not None:
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
if script is not None:
# Pass empty string to clear an existing script
updates["script"] = _normalize_optional_job_value(script) if script else None
if repeat is not None:
# Normalize: treat 0 or negative as None (infinite)
normalized_repeat = None if repeat <= 0 else repeat
@@ -338,6 +346,11 @@ Jobs run in a fresh session with no current-chat context, so prompts must be sel
If skill or skills are provided on create, the future cron run loads those skills in order, then follows the prompt as the task instruction.
On update, passing skills=[] clears attached skills.
If script is provided on create, the referenced Python script runs before each agent turn.
Its stdout is injected into the prompt as context. Use this for data collection and change
detection — the script handles gathering data, the agent analyzes and reports.
On update, pass script="" to clear an attached script.
NOTE: The agent's final response is auto-delivered to the target. Put the primary
user-facing content in the final response. Cron jobs run autonomously with no user
present — they cannot ask questions or request clarification.
@@ -402,6 +415,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"reason": {
"type": "string",
"description": "Optional pause reason"
},
"script": {
"type": "string",
"description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear."
}
},
"required": ["action"]
@@ -451,6 +468,7 @@ registry.register(
provider=args.get("provider"),
base_url=args.get("base_url"),
reason=args.get("reason"),
script=args.get("script"),
task_id=kw.get("task_id"),
),
check_fn=check_cronjob_requirements,