diff --git a/cron/jobs.py b/cron/jobs.py index 22c04d0c63..214da521fe 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -375,6 +375,7 @@ def create_job( model: Optional[str] = None, provider: Optional[str] = None, base_url: Optional[str] = None, + script: Optional[str] = None, ) -> Dict[str, Any]: """ Create a new cron job. @@ -391,6 +392,9 @@ def create_job( model: Optional per-job model override provider: Optional per-job provider override base_url: Optional per-job base URL override + script: Optional path to a Python script whose stdout is injected into the + prompt each run. The script runs before the agent turn, and its output + is prepended as context. Useful for data collection / change detection. Returns: The created job dict @@ -419,6 +423,8 @@ def create_job( normalized_model = normalized_model or None normalized_provider = normalized_provider or None normalized_base_url = normalized_base_url or None + normalized_script = str(script).strip() if isinstance(script, str) else None + normalized_script = normalized_script or None label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job" job = { @@ -430,6 +436,7 @@ def create_job( "model": normalized_model, "provider": normalized_provider, "base_url": normalized_base_url, + "script": normalized_script, "schedule": parsed_schedule, "schedule_display": parsed_schedule.get("display", schedule), "repeat": { diff --git a/cron/scheduler.py b/cron/scheduler.py index 8a54520a14..b014799837 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -13,6 +13,7 @@ import concurrent.futures import json import logging import os +import subprocess import sys import traceback @@ -229,11 +230,89 @@ def _deliver_result(job: dict, content: str) -> None: logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id) +_SCRIPT_TIMEOUT = 120 # seconds + + +def _run_job_script(script_path: str) -> tuple[bool, str]: + """Execute a cron job's data-collection script and capture its output. + + Args: + script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute). + + Returns: + (success, output) — on failure *output* contains the error message so the + LLM can report the problem to the user. + """ + from hermes_constants import get_hermes_home + + path = Path(script_path).expanduser() + if not path.is_absolute(): + # Resolve relative paths against HERMES_HOME/scripts/ + path = get_hermes_home() / "scripts" / path + + if not path.exists(): + return False, f"Script not found: {path}" + if not path.is_file(): + return False, f"Script path is not a file: {path}" + + try: + result = subprocess.run( + [sys.executable, str(path)], + capture_output=True, + text=True, + timeout=_SCRIPT_TIMEOUT, + cwd=str(path.parent), + ) + stdout = (result.stdout or "").strip() + stderr = (result.stderr or "").strip() + + if result.returncode != 0: + parts = [f"Script exited with code {result.returncode}"] + if stderr: + parts.append(f"stderr:\n{stderr}") + if stdout: + parts.append(f"stdout:\n{stdout}") + return False, "\n".join(parts) + + return True, stdout + + except subprocess.TimeoutExpired: + return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}" + except Exception as exc: + return False, f"Script execution failed: {exc}" + + def _build_job_prompt(job: dict) -> str: """Build the effective prompt for a cron job, optionally loading one or more skills first.""" prompt = job.get("prompt", "") skills = job.get("skills") + # Run data-collection script if configured, inject output as context. + script_path = job.get("script") + if script_path: + success, script_output = _run_job_script(script_path) + if success: + if script_output: + prompt = ( + "## Script Output\n" + "The following data was collected by a pre-run script. " + "Use it as context for your analysis.\n\n" + f"```\n{script_output}\n```\n\n" + f"{prompt}" + ) + else: + prompt = ( + "[Script ran successfully but produced no output.]\n\n" + f"{prompt}" + ) + else: + prompt = ( + "## Script Error\n" + "The data-collection script failed. Report this to the user.\n\n" + f"```\n{script_output}\n```\n\n" + f"{prompt}" + ) + # Always prepend [SILENT] guidance so the cron agent can suppress # delivery when it has nothing new or noteworthy to report. silent_hint = ( diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index f6da8a2d2c..d10513a280 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -90,6 +90,9 @@ def cron_list(show_all: bool = False): print(f" Deliver: {deliver_str}") if skills: print(f" Skills: {', '.join(skills)}") + script = job.get("script") + if script: + print(f" Script: {script}") print() from hermes_cli.gateway import find_gateway_pids @@ -149,6 +152,7 @@ def cron_create(args): repeat=getattr(args, "repeat", None), skill=getattr(args, "skill", None), skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)), + script=getattr(args, "script", None), ) if not result.get("success"): print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED)) @@ -158,6 +162,9 @@ def cron_create(args): print(f" Schedule: {result['schedule']}") if result.get("skills"): print(f" Skills: {', '.join(result['skills'])}") + job_data = result.get("job", {}) + if job_data.get("script"): + print(f" Script: {job_data['script']}") print(f" Next run: {result['next_run_at']}") return 0 @@ -195,6 +202,7 @@ def cron_edit(args): deliver=getattr(args, "deliver", None), repeat=getattr(args, "repeat", None), skills=final_skills, + script=getattr(args, "script", None), ) if not result.get("success"): print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED)) @@ -208,6 +216,8 @@ def cron_edit(args): print(f" Skills: {', '.join(updated['skills'])}") else: print(" Skills: none") + if updated.get("script"): + print(f" Script: {updated['script']}") return 0 diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 0f1f4aa513..5150bfa1a7 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4381,6 +4381,7 @@ For more help on a command: cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id") cron_create.add_argument("--repeat", type=int, help="Optional repeat count") cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.") + cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run") # cron edit cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job") @@ -4394,6 +4395,7 @@ For more help on a command: cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.") cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.") cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job") + cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.") # lifecycle actions cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job") diff --git a/tests/cron/test_cron_script.py b/tests/cron/test_cron_script.py new file mode 100644 index 0000000000..e833963547 --- /dev/null +++ b/tests/cron/test_cron_script.py @@ -0,0 +1,300 @@ +"""Tests for cron job script injection feature. + +Tests cover: +- Script field in job creation / storage / update +- Script execution and output injection into prompts +- Error handling (missing script, timeout, non-zero exit) +- Path resolution (absolute, relative to HERMES_HOME/scripts/) +""" + +import json +import os +import stat +import sys +import textwrap +from pathlib import Path +from unittest.mock import patch + +import pytest + +# Ensure project root is importable +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +@pytest.fixture +def cron_env(tmp_path, monkeypatch): + """Isolated cron environment with temp HERMES_HOME.""" + hermes_home = tmp_path / ".hermes" + hermes_home.mkdir() + (hermes_home / "cron").mkdir() + (hermes_home / "cron" / "output").mkdir() + (hermes_home / "scripts").mkdir() + monkeypatch.setenv("HERMES_HOME", str(hermes_home)) + + # Clear cached module-level paths + import cron.jobs as jobs_mod + monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home) + monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron") + monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json") + monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output") + + return hermes_home + + +class TestJobScriptField: + """Test that the script field is stored and retrieved correctly.""" + + def test_create_job_with_script(self, cron_env): + from cron.jobs import create_job, get_job + + job = create_job( + prompt="Analyze the data", + schedule="every 30m", + script="/path/to/monitor.py", + ) + assert job["script"] == "/path/to/monitor.py" + + loaded = get_job(job["id"]) + assert loaded["script"] == "/path/to/monitor.py" + + def test_create_job_without_script(self, cron_env): + from cron.jobs import create_job + + job = create_job(prompt="Hello", schedule="every 1h") + assert job.get("script") is None + + def test_create_job_empty_script_normalized_to_none(self, cron_env): + from cron.jobs import create_job + + job = create_job(prompt="Hello", schedule="every 1h", script=" ") + assert job.get("script") is None + + def test_update_job_add_script(self, cron_env): + from cron.jobs import create_job, update_job + + job = create_job(prompt="Hello", schedule="every 1h") + assert job.get("script") is None + + updated = update_job(job["id"], {"script": "/new/script.py"}) + assert updated["script"] == "/new/script.py" + + def test_update_job_clear_script(self, cron_env): + from cron.jobs import create_job, update_job + + job = create_job(prompt="Hello", schedule="every 1h", script="/some/script.py") + assert job["script"] == "/some/script.py" + + updated = update_job(job["id"], {"script": None}) + assert updated.get("script") is None + + +class TestRunJobScript: + """Test the _run_job_script() function.""" + + def test_successful_script(self, cron_env): + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "test.py" + script.write_text('print("hello from script")\n') + + success, output = _run_job_script(str(script)) + assert success is True + assert output == "hello from script" + + def test_script_relative_path(self, cron_env): + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "relative.py" + script.write_text('print("relative works")\n') + + success, output = _run_job_script("relative.py") + assert success is True + assert output == "relative works" + + def test_script_not_found(self, cron_env): + from cron.scheduler import _run_job_script + + success, output = _run_job_script("/nonexistent/script.py") + assert success is False + assert "not found" in output.lower() + + def test_script_nonzero_exit(self, cron_env): + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "fail.py" + script.write_text(textwrap.dedent("""\ + import sys + print("partial output") + print("error info", file=sys.stderr) + sys.exit(1) + """)) + + success, output = _run_job_script(str(script)) + assert success is False + assert "exited with code 1" in output + assert "error info" in output + + def test_script_empty_output(self, cron_env): + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "empty.py" + script.write_text("# no output\n") + + success, output = _run_job_script(str(script)) + assert success is True + assert output == "" + + def test_script_timeout(self, cron_env, monkeypatch): + from cron import scheduler as sched_mod + from cron.scheduler import _run_job_script + + # Use a very short timeout + monkeypatch.setattr(sched_mod, "_SCRIPT_TIMEOUT", 1) + + script = cron_env / "scripts" / "slow.py" + script.write_text("import time; time.sleep(30)\n") + + success, output = _run_job_script(str(script)) + assert success is False + assert "timed out" in output.lower() + + def test_script_json_output(self, cron_env): + """Scripts can output structured JSON for the LLM to parse.""" + from cron.scheduler import _run_job_script + + script = cron_env / "scripts" / "json_out.py" + script.write_text(textwrap.dedent("""\ + import json + data = {"new_prs": [{"number": 42, "title": "Fix bug"}]} + print(json.dumps(data, indent=2)) + """)) + + success, output = _run_job_script(str(script)) + assert success is True + parsed = json.loads(output) + assert parsed["new_prs"][0]["number"] == 42 + + +class TestBuildJobPromptWithScript: + """Test that script output is injected into the prompt.""" + + def test_script_output_injected(self, cron_env): + from cron.scheduler import _build_job_prompt + + script = cron_env / "scripts" / "data.py" + script.write_text('print("new PR: #123 fix typo")\n') + + job = { + "prompt": "Report any notable changes.", + "script": str(script), + } + prompt = _build_job_prompt(job) + assert "## Script Output" in prompt + assert "new PR: #123 fix typo" in prompt + assert "Report any notable changes." in prompt + + def test_script_error_injected(self, cron_env): + from cron.scheduler import _build_job_prompt + + job = { + "prompt": "Report status.", + "script": "/nonexistent/script.py", + } + prompt = _build_job_prompt(job) + assert "## Script Error" in prompt + assert "not found" in prompt.lower() + assert "Report status." in prompt + + def test_no_script_unchanged(self, cron_env): + from cron.scheduler import _build_job_prompt + + job = {"prompt": "Simple job."} + prompt = _build_job_prompt(job) + assert "## Script Output" not in prompt + assert "Simple job." in prompt + + def test_script_empty_output_noted(self, cron_env): + from cron.scheduler import _build_job_prompt + + script = cron_env / "scripts" / "noop.py" + script.write_text("# nothing\n") + + job = { + "prompt": "Check status.", + "script": str(script), + } + prompt = _build_job_prompt(job) + assert "no output" in prompt.lower() + assert "Check status." in prompt + + +class TestCronjobToolScript: + """Test the cronjob tool's script parameter.""" + + def test_create_with_script(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="/home/user/monitor.py", + )) + assert result["success"] is True + assert result["job"]["script"] == "/home/user/monitor.py" + + def test_update_script(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + create_result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + )) + job_id = create_result["job_id"] + + update_result = json.loads(cronjob( + action="update", + job_id=job_id, + script="/new/script.py", + )) + assert update_result["success"] is True + assert update_result["job"]["script"] == "/new/script.py" + + def test_clear_script(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + create_result = json.loads(cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="/some/script.py", + )) + job_id = create_result["job_id"] + + update_result = json.loads(cronjob( + action="update", + job_id=job_id, + script="", + )) + assert update_result["success"] is True + assert "script" not in update_result["job"] + + def test_list_shows_script(self, cron_env, monkeypatch): + monkeypatch.setenv("HERMES_INTERACTIVE", "1") + from tools.cronjob_tools import cronjob + + cronjob( + action="create", + schedule="every 1h", + prompt="Monitor things", + script="/path/to/script.py", + ) + + list_result = json.loads(cronjob(action="list")) + assert list_result["success"] is True + assert len(list_result["jobs"]) == 1 + assert list_result["jobs"][0]["script"] == "/path/to/script.py" diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 84054c6e24..965cfe1303 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -116,7 +116,7 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash: def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: prompt = job.get("prompt", "") skills = _canonical_skills(job.get("skill"), job.get("skills")) - return { + result = { "job_id": job["id"], "name": job["name"], "skill": skills[0] if skills else None, @@ -136,6 +136,9 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: "paused_at": job.get("paused_at"), "paused_reason": job.get("paused_reason"), } + if job.get("script"): + result["script"] = job["script"] + return result def cronjob( @@ -153,6 +156,7 @@ def cronjob( provider: Optional[str] = None, base_url: Optional[str] = None, reason: Optional[str] = None, + script: Optional[str] = None, task_id: str = None, ) -> str: """Unified cron job management tool.""" @@ -183,6 +187,7 @@ def cronjob( model=_normalize_optional_job_value(model), provider=_normalize_optional_job_value(provider), base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True), + script=_normalize_optional_job_value(script), ) return json.dumps( { @@ -265,6 +270,9 @@ def cronjob( updates["provider"] = _normalize_optional_job_value(provider) if base_url is not None: updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True) + if script is not None: + # Pass empty string to clear an existing script + updates["script"] = _normalize_optional_job_value(script) if script else None if repeat is not None: # Normalize: treat 0 or negative as None (infinite) normalized_repeat = None if repeat <= 0 else repeat @@ -338,6 +346,11 @@ Jobs run in a fresh session with no current-chat context, so prompts must be sel If skill or skills are provided on create, the future cron run loads those skills in order, then follows the prompt as the task instruction. On update, passing skills=[] clears attached skills. +If script is provided on create, the referenced Python script runs before each agent turn. +Its stdout is injected into the prompt as context. Use this for data collection and change +detection — the script handles gathering data, the agent analyzes and reports. +On update, pass script="" to clear an attached script. + NOTE: The agent's final response is auto-delivered to the target. Put the primary user-facing content in the final response. Cron jobs run autonomously with no user present — they cannot ask questions or request clarification. @@ -402,6 +415,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr "reason": { "type": "string", "description": "Optional pause reason" + }, + "script": { + "type": "string", + "description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear." } }, "required": ["action"] @@ -451,6 +468,7 @@ registry.register( provider=args.get("provider"), base_url=args.get("base_url"), reason=args.get("reason"), + script=args.get("script"), task_id=kw.get("task_id"), ), check_fn=check_cronjob_requirements,