mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-29 23:41:35 +08:00
Compare commits
1 Commits
fix/plugin
...
hermes/her
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f361cdca6 |
@@ -1214,6 +1214,135 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
|
||||
|
||||
|
||||
def _execute_and_record(
|
||||
job: dict,
|
||||
adapters=None,
|
||||
loop=None,
|
||||
verbose: bool = False,
|
||||
) -> bool:
|
||||
"""Run one due job end-to-end: execute, save output, deliver, mark.
|
||||
|
||||
Returns True if execution + bookkeeping completed (even if the job's
|
||||
agent run itself failed — in that case last_status is recorded as
|
||||
``error``). Returns False only on an unhandled exception in this
|
||||
wrapper; such failures are also recorded via ``mark_job_run``.
|
||||
|
||||
Shared between the periodic ``tick()`` loop and the on-demand
|
||||
``run_job_now()`` path, so ``cronjob(action="run")`` behaves
|
||||
identically to scheduled execution.
|
||||
"""
|
||||
try:
|
||||
success, output, final_response, error = run_job(job)
|
||||
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
logger.info("Output saved to: %s", output_file)
|
||||
|
||||
# Deliver the final response to the origin/target chat.
|
||||
# If the agent responded with [SILENT], skip delivery (but
|
||||
# output is already saved above). Failed jobs always deliver.
|
||||
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
||||
should_deliver = bool(deliver_content)
|
||||
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
||||
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
||||
should_deliver = False
|
||||
|
||||
delivery_error = None
|
||||
if should_deliver:
|
||||
try:
|
||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
||||
except Exception as de:
|
||||
delivery_error = str(de)
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
# Treat empty final_response as a soft failure so last_status
|
||||
# is not "ok" — the agent ran but produced nothing useful.
|
||||
# (issue #8585)
|
||||
if success and not final_response:
|
||||
success = False
|
||||
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
|
||||
|
||||
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing job %s: %s", job['id'], e)
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
return False
|
||||
|
||||
|
||||
def run_job_now(
|
||||
job_id: str,
|
||||
adapters=None,
|
||||
loop=None,
|
||||
verbose: bool = False,
|
||||
) -> Optional[dict]:
|
||||
"""Execute a single job inline, right now, and return the updated job.
|
||||
|
||||
Used by ``cronjob(action="run")`` and ``POST /api/jobs/{id}/run`` when
|
||||
no gateway ticker is running. Blocks the caller for the duration of
|
||||
the agent run.
|
||||
|
||||
Grabs the same file lock ``tick()`` uses so an in-process gateway
|
||||
ticker firing at the same moment can't double-execute the job. If
|
||||
the lock is held (another tick is mid-flight), returns None without
|
||||
touching the job so the caller can fall back to the defer path.
|
||||
|
||||
Also advances ``next_run_at`` for recurring jobs **before** execution
|
||||
to preserve at-most-once semantics matching the tick loop.
|
||||
|
||||
Returns the updated job dict (post ``mark_job_run``) on successful
|
||||
inline execution, or None if the job does not exist or the lock
|
||||
could not be acquired.
|
||||
"""
|
||||
from cron.jobs import get_job
|
||||
|
||||
job = get_job(job_id)
|
||||
if not job:
|
||||
return None
|
||||
|
||||
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
lock_fd = None
|
||||
try:
|
||||
lock_fd = open(_LOCK_FILE, "w")
|
||||
if fcntl:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
elif msvcrt:
|
||||
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
except (OSError, IOError):
|
||||
logger.debug("run_job_now: tick lock held, skipping inline exec for %s", job_id)
|
||||
if lock_fd is not None:
|
||||
lock_fd.close()
|
||||
return None
|
||||
|
||||
try:
|
||||
# Match the tick() semantics: advance recurring jobs' next_run_at
|
||||
# BEFORE the agent runs, so a crash mid-execution doesn't cause
|
||||
# a re-fire on the next ticker pass.
|
||||
advance_next_run(job_id)
|
||||
|
||||
# Re-load the job so the adjusted next_run_at is reflected in
|
||||
# the working copy passed to _execute_and_record (_apply_skill_fields
|
||||
# and any other live-computed fields also get refreshed).
|
||||
job = get_job(job_id) or job
|
||||
|
||||
_execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose)
|
||||
return get_job(job_id)
|
||||
finally:
|
||||
if fcntl:
|
||||
try:
|
||||
fcntl.flock(lock_fd, fcntl.LOCK_UN)
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
elif msvcrt:
|
||||
try:
|
||||
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
except (OSError, IOError):
|
||||
pass
|
||||
lock_fd.close()
|
||||
|
||||
|
||||
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
"""
|
||||
Check and run all due jobs.
|
||||
@@ -1288,45 +1417,7 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
)
|
||||
|
||||
def _process_job(job: dict) -> bool:
|
||||
"""Run one due job end-to-end: execute, save, deliver, mark."""
|
||||
try:
|
||||
success, output, final_response, error = run_job(job)
|
||||
|
||||
output_file = save_job_output(job["id"], output)
|
||||
if verbose:
|
||||
logger.info("Output saved to: %s", output_file)
|
||||
|
||||
# Deliver the final response to the origin/target chat.
|
||||
# If the agent responded with [SILENT], skip delivery (but
|
||||
# output is already saved above). Failed jobs always deliver.
|
||||
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
|
||||
should_deliver = bool(deliver_content)
|
||||
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
|
||||
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
|
||||
should_deliver = False
|
||||
|
||||
delivery_error = None
|
||||
if should_deliver:
|
||||
try:
|
||||
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
|
||||
except Exception as de:
|
||||
delivery_error = str(de)
|
||||
logger.error("Delivery failed for job %s: %s", job["id"], de)
|
||||
|
||||
# Treat empty final_response as a soft failure so last_status
|
||||
# is not "ok" — the agent ran but produced nothing useful.
|
||||
# (issue #8585)
|
||||
if success and not final_response:
|
||||
success = False
|
||||
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
|
||||
|
||||
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error processing job %s: %s", job['id'], e)
|
||||
mark_job_run(job["id"], False, str(e))
|
||||
return False
|
||||
return _execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose)
|
||||
|
||||
# Partition due jobs: those with a per-job workdir mutate
|
||||
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global —
|
||||
|
||||
@@ -2216,7 +2216,17 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
async def _handle_run_job(self, request: "web.Request") -> "web.Response":
|
||||
"""POST /api/jobs/{job_id}/run — trigger immediate execution."""
|
||||
"""POST /api/jobs/{job_id}/run — trigger immediate execution.
|
||||
|
||||
When a gateway ticker is up (which it is for any live api_server
|
||||
process hosted inside ``hermes gateway``), this sets ``next_run_at``
|
||||
to now and returns — the ticker will fire the job on its next pass
|
||||
(within ~60s).
|
||||
|
||||
When no gateway ticker is up (rare, e.g. api_server running standalone
|
||||
via a one-off invocation), the job is executed inline off the event
|
||||
loop and the response includes ``last_run_at`` / ``last_status``.
|
||||
"""
|
||||
auth_err = self._check_auth(request)
|
||||
if auth_err:
|
||||
return auth_err
|
||||
@@ -2227,10 +2237,61 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
if id_err:
|
||||
return id_err
|
||||
try:
|
||||
job = _cron_trigger(job_id)
|
||||
if not job:
|
||||
return web.json_response({"error": "Job not found"}, status=404)
|
||||
return web.json_response({"job": job})
|
||||
ticker_up = True
|
||||
try:
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
ticker_up = bool(find_gateway_pids())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if ticker_up:
|
||||
job = _cron_trigger(job_id)
|
||||
if not job:
|
||||
return web.json_response({"error": "Job not found"}, status=404)
|
||||
return web.json_response(
|
||||
{
|
||||
"job": job,
|
||||
"message": (
|
||||
"Job scheduled for next tick (within ~60s). "
|
||||
"Poll /api/jobs/{job_id} for updated last_run_at/last_status."
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
# Inline execution off the event loop.
|
||||
try:
|
||||
from cron.scheduler import run_job_now
|
||||
except Exception:
|
||||
# Fall back to defer with a warning.
|
||||
job = _cron_trigger(job_id)
|
||||
if not job:
|
||||
return web.json_response({"error": "Job not found"}, status=404)
|
||||
return web.json_response(
|
||||
{
|
||||
"job": job,
|
||||
"warning": (
|
||||
"Gateway is not running and inline execution is unavailable. "
|
||||
"Start the gateway with 'hermes gateway' so jobs fire automatically."
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
updated = await loop.run_in_executor(None, run_job_now, job_id)
|
||||
if not updated:
|
||||
if not _cron_get(job_id):
|
||||
return web.json_response({"error": "Job not found"}, status=404)
|
||||
# Lock contention — fall back to defer.
|
||||
job = _cron_trigger(job_id)
|
||||
return web.json_response(
|
||||
{
|
||||
"job": job,
|
||||
"message": "Another cron tick is running; job scheduled for that tick.",
|
||||
}
|
||||
)
|
||||
return web.json_response(
|
||||
{"job": updated, "message": "Job executed inline (no gateway ticker running)."}
|
||||
)
|
||||
except Exception as e:
|
||||
return web.json_response({"error": str(e)}, status=500)
|
||||
|
||||
|
||||
@@ -255,7 +255,18 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int:
|
||||
if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"):
|
||||
print(f" Next run: {result['job']['next_run_at']}")
|
||||
if action == "run":
|
||||
print(" It will run on the next scheduler tick.")
|
||||
msg = result.get("message") or " It will run on the next scheduler tick."
|
||||
last_status = job.get("last_status")
|
||||
if last_status == "ok":
|
||||
print(f" {color('✓ Last run:', Colors.GREEN)} {job.get('last_run_at', '?')} {color('ok', Colors.GREEN)}")
|
||||
elif last_status == "error":
|
||||
last_err = job.get("last_error", "?")
|
||||
print(f" {color('✗ Last run:', Colors.RED)} {job.get('last_run_at', '?')} "
|
||||
f"{color('error: ' + str(last_err), Colors.RED)}")
|
||||
print(f" {msg}")
|
||||
warning = result.get("warning")
|
||||
if warning:
|
||||
print(color(f" ⚠ {warning}", Colors.YELLOW))
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
290
tests/cron/test_run_job_now.py
Normal file
290
tests/cron/test_run_job_now.py
Normal file
@@ -0,0 +1,290 @@
|
||||
"""Tests for on-demand cron execution — cron.scheduler.run_job_now and the
|
||||
tool/API ``action='run'`` branching between inline execution (no gateway)
|
||||
and defer-to-ticker (gateway running).
|
||||
|
||||
Regression target: issue #16612 — ``cronjob run`` previously only called
|
||||
``trigger_job()``, which just sets ``next_run_at=now``. When no gateway
|
||||
ticker was running, the job never actually executed and the user saw
|
||||
``state: scheduled`` with ``last_run_at: null`` forever.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_job_now — inline execution unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRunJobNow:
|
||||
"""cron.scheduler.run_job_now executes a single job inline."""
|
||||
|
||||
def _make_job(self, job_id="inline-job", **overrides):
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": "inline test",
|
||||
"enabled": True,
|
||||
"schedule": {"kind": "interval", "value": "every 1h"},
|
||||
"state": "scheduled",
|
||||
"prompt": "do a thing",
|
||||
}
|
||||
job.update(overrides)
|
||||
return job
|
||||
|
||||
def test_returns_none_for_missing_job(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
# Reload scheduler so _LOCK_DIR picks up the temp HERMES_HOME.
|
||||
import importlib
|
||||
import cron.scheduler
|
||||
importlib.reload(cron.scheduler)
|
||||
from cron.scheduler import run_job_now
|
||||
|
||||
with patch("cron.jobs.get_job", return_value=None):
|
||||
assert run_job_now("does-not-exist") is None
|
||||
|
||||
def test_executes_inline_and_returns_updated_job(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
import importlib
|
||||
import cron.scheduler
|
||||
importlib.reload(cron.scheduler)
|
||||
from cron.scheduler import run_job_now
|
||||
|
||||
job = self._make_job()
|
||||
updated_after_run = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "ok"}
|
||||
|
||||
# cron.scheduler.run_job_now imports get_job lazily from cron.jobs,
|
||||
# so patch at the source (cron.jobs.get_job). Three calls:
|
||||
# (1) existence check, (2) re-load after advance, (3) final return.
|
||||
with patch("cron.jobs.get_job", side_effect=[job, job, updated_after_run]), \
|
||||
patch("cron.scheduler.advance_next_run", return_value=True) as adv, \
|
||||
patch("cron.scheduler.run_job", return_value=(True, "# output", "final reply", None)), \
|
||||
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
||||
patch("cron.scheduler._deliver_result", return_value=None), \
|
||||
patch("cron.scheduler.mark_job_run") as mark:
|
||||
|
||||
result = run_job_now("inline-job")
|
||||
|
||||
# advance_next_run is called BEFORE the agent run to preserve
|
||||
# at-most-once semantics (matches tick() behaviour).
|
||||
adv.assert_called_once_with("inline-job")
|
||||
mark.assert_called_once()
|
||||
# mark_job_run is called with (job_id, success=True, error=None, delivery_error=None)
|
||||
args, kwargs = mark.call_args
|
||||
assert args[0] == "inline-job"
|
||||
assert args[1] is True
|
||||
assert result == updated_after_run
|
||||
|
||||
def test_inline_records_failure_when_agent_errors(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
import importlib
|
||||
import cron.scheduler
|
||||
importlib.reload(cron.scheduler)
|
||||
from cron.scheduler import run_job_now
|
||||
|
||||
job = self._make_job()
|
||||
updated = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "error", "last_error": "boom"}
|
||||
|
||||
with patch("cron.jobs.get_job", side_effect=[job, job, updated]), \
|
||||
patch("cron.scheduler.advance_next_run", return_value=True), \
|
||||
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "boom")), \
|
||||
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
||||
patch("cron.scheduler._deliver_result", return_value=None), \
|
||||
patch("cron.scheduler.mark_job_run") as mark:
|
||||
|
||||
result = run_job_now("inline-job")
|
||||
|
||||
args, kwargs = mark.call_args
|
||||
assert args[1] is False # success=False
|
||||
assert args[2] == "boom" # error propagated
|
||||
assert result["last_status"] == "error"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# cronjob(action="run") tool — gateway-aware branching
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestCronjobRunTool:
|
||||
"""tools.cronjob_tools._handle_run_action branches on gateway presence."""
|
||||
|
||||
def _fake_job(self, job_id="tool-job", **overrides):
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": "tool test",
|
||||
"enabled": True,
|
||||
"schedule": {"kind": "interval", "value": "every 1h"},
|
||||
"state": "scheduled",
|
||||
"prompt": "do a thing",
|
||||
"deliver": "local",
|
||||
}
|
||||
job.update(overrides)
|
||||
return job
|
||||
|
||||
def test_gateway_running_defers_to_tick(self, tmp_path, monkeypatch):
|
||||
"""When gateway PIDs exist, action='run' only calls trigger_job."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
job = self._fake_job()
|
||||
deferred = {**job, "next_run_at": "2026-04-28T00:00:00"}
|
||||
|
||||
with patch("tools.cronjob_tools.get_job", return_value=job), \
|
||||
patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig, \
|
||||
patch("tools.cronjob_tools._gateway_ticker_running", return_value=True):
|
||||
out = cronjob(action="run", job_id="tool-job")
|
||||
|
||||
result = json.loads(out)
|
||||
assert result["success"] is True
|
||||
trig.assert_called_once_with("tool-job")
|
||||
assert "next tick" in result["message"].lower() or "within" in result["message"].lower()
|
||||
# The deferred path must NOT claim the job already executed.
|
||||
assert result["job"].get("last_run_at") in (None, "")
|
||||
|
||||
def test_gateway_not_running_executes_inline(self, tmp_path, monkeypatch):
|
||||
"""When no gateway PIDs exist, action='run' invokes run_job_now
|
||||
and the returned job carries last_run_at + last_status=ok."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
job = self._fake_job()
|
||||
executed = {
|
||||
**job,
|
||||
"last_run_at": "2026-04-28T00:00:00",
|
||||
"last_status": "ok",
|
||||
}
|
||||
|
||||
with patch("tools.cronjob_tools.get_job", return_value=job), \
|
||||
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
|
||||
patch("cron.scheduler.run_job_now", return_value=executed) as run_now, \
|
||||
patch("tools.cronjob_tools.trigger_job") as trig:
|
||||
out = cronjob(action="run", job_id="tool-job")
|
||||
|
||||
result = json.loads(out)
|
||||
assert result["success"] is True
|
||||
run_now.assert_called_once_with("tool-job")
|
||||
trig.assert_not_called() # defer path must not be taken
|
||||
assert result["job"]["last_run_at"] == "2026-04-28T00:00:00"
|
||||
assert result["job"]["last_status"] == "ok"
|
||||
assert "inline" in result["message"].lower()
|
||||
|
||||
def test_gateway_not_running_falls_back_to_defer_on_lock_contention(self, tmp_path, monkeypatch):
|
||||
"""If run_job_now returns None (tick lock held), fall back to defer
|
||||
so the caller isn't left with a silent no-op."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
job = self._fake_job()
|
||||
deferred = {**job, "next_run_at": "2026-04-28T00:00:00"}
|
||||
|
||||
with patch("tools.cronjob_tools.get_job", return_value=job), \
|
||||
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
|
||||
patch("cron.scheduler.run_job_now", return_value=None), \
|
||||
patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig:
|
||||
out = cronjob(action="run", job_id="tool-job")
|
||||
|
||||
result = json.loads(out)
|
||||
assert result["success"] is True
|
||||
trig.assert_called_once_with("tool-job")
|
||||
assert "another cron tick" in result["message"].lower() or "tick" in result["message"].lower()
|
||||
|
||||
def test_unknown_job_returns_error(self, tmp_path, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
# get_job at the TOP of cronjob() is what short-circuits missing jobs.
|
||||
with patch("tools.cronjob_tools.get_job", return_value=None):
|
||||
out = cronjob(action="run", job_id="nope")
|
||||
|
||||
result = json.loads(out)
|
||||
assert result["success"] is False
|
||||
assert "not found" in result["error"].lower()
|
||||
|
||||
def test_inline_execution_surfaces_failure(self, tmp_path, monkeypatch):
|
||||
"""Failed inline exec — the message must indicate the failure
|
||||
instead of claiming a clean run."""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
job = self._fake_job()
|
||||
executed = {
|
||||
**job,
|
||||
"last_run_at": "2026-04-28T00:00:00",
|
||||
"last_status": "error",
|
||||
"last_error": "model timed out",
|
||||
}
|
||||
|
||||
with patch("tools.cronjob_tools.get_job", return_value=job), \
|
||||
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
|
||||
patch("cron.scheduler.run_job_now", return_value=executed):
|
||||
out = cronjob(action="run", job_id="tool-job")
|
||||
|
||||
result = json.loads(out)
|
||||
assert result["success"] is True # the tool call itself succeeded
|
||||
assert result["job"]["last_status"] == "error"
|
||||
assert "model timed out" in result["message"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _execute_and_record — shared helper covering both tick and run_job_now
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestExecuteAndRecord:
|
||||
"""_execute_and_record is the shared end-to-end helper used by both
|
||||
the tick loop and run_job_now."""
|
||||
|
||||
def _make_job(self):
|
||||
return {
|
||||
"id": "shared-helper-job",
|
||||
"name": "shared helper",
|
||||
"deliver": "local",
|
||||
}
|
||||
|
||||
def test_success_path_delivers_and_marks_ok(self):
|
||||
from cron.scheduler import _execute_and_record
|
||||
|
||||
with patch("cron.scheduler.run_job", return_value=(True, "# out", "final", None)), \
|
||||
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
||||
patch("cron.scheduler._deliver_result", return_value=None) as deliver, \
|
||||
patch("cron.scheduler.mark_job_run") as mark:
|
||||
ok = _execute_and_record(self._make_job())
|
||||
|
||||
assert ok is True
|
||||
deliver.assert_called_once()
|
||||
mark.assert_called_once()
|
||||
args, kwargs = mark.call_args
|
||||
assert args[1] is True
|
||||
# mark_job_run(job_id, success, error, delivery_error=...)
|
||||
assert kwargs.get("delivery_error") is None
|
||||
|
||||
def test_empty_response_marked_as_failure(self):
|
||||
"""Issue #8585 behaviour must be preserved after the refactor."""
|
||||
from cron.scheduler import _execute_and_record
|
||||
|
||||
with patch("cron.scheduler.run_job", return_value=(True, "# out", "", None)), \
|
||||
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
|
||||
patch("cron.scheduler._deliver_result"), \
|
||||
patch("cron.scheduler.mark_job_run") as mark:
|
||||
_execute_and_record(self._make_job())
|
||||
|
||||
args, _ = mark.call_args
|
||||
assert args[1] is False
|
||||
assert "empty response" in args[2].lower()
|
||||
|
||||
def test_exception_in_run_job_marks_error(self):
|
||||
from cron.scheduler import _execute_and_record
|
||||
|
||||
with patch("cron.scheduler.run_job", side_effect=RuntimeError("boom")), \
|
||||
patch("cron.scheduler.mark_job_run") as mark:
|
||||
ok = _execute_and_record(self._make_job())
|
||||
|
||||
assert ok is False
|
||||
mark.assert_called_once()
|
||||
args, _ = mark.call_args
|
||||
assert args[1] is False
|
||||
assert "boom" in args[2]
|
||||
@@ -447,23 +447,53 @@ class TestResumeJob:
|
||||
|
||||
class TestRunJob:
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_job(self, adapter):
|
||||
"""POST /api/jobs/{id}/run returns triggered job."""
|
||||
async def test_run_job_defers_when_gateway_running(self, adapter):
|
||||
"""POST /api/jobs/{id}/run calls trigger_job when a gateway ticker is up."""
|
||||
app = _create_app(adapter)
|
||||
triggered_job = {**SAMPLE_JOB, "last_run": "2025-01-01T00:00:00Z"}
|
||||
mock_trigger = MagicMock(return_value=triggered_job)
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
with patch(
|
||||
f"{_MOD}._CRON_AVAILABLE", True
|
||||
), patch(
|
||||
f"{_MOD}._cron_trigger", mock_trigger
|
||||
):
|
||||
with patch(f"{_MOD}._CRON_AVAILABLE", True), \
|
||||
patch(f"{_MOD}._cron_trigger", mock_trigger), \
|
||||
patch("hermes_cli.gateway.find_gateway_pids", return_value=[12345]):
|
||||
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert data["job"] == triggered_job
|
||||
assert "tick" in data.get("message", "").lower()
|
||||
mock_trigger.assert_called_once_with(VALID_JOB_ID)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_job_executes_inline_when_no_gateway(self, adapter):
|
||||
"""POST /api/jobs/{id}/run calls run_job_now when no gateway ticker is up.
|
||||
|
||||
Regression for issue #16612: the old handler only called
|
||||
trigger_job(), which just set next_run_at=now without executing
|
||||
anything. Users saw last_run_at=null forever.
|
||||
"""
|
||||
app = _create_app(adapter)
|
||||
executed_job = {
|
||||
**SAMPLE_JOB,
|
||||
"last_run_at": "2026-04-28T00:00:00",
|
||||
"last_status": "ok",
|
||||
}
|
||||
mock_run_now = MagicMock(return_value=executed_job)
|
||||
mock_trigger = MagicMock()
|
||||
|
||||
async with TestClient(TestServer(app)) as cli:
|
||||
with patch(f"{_MOD}._CRON_AVAILABLE", True), \
|
||||
patch(f"{_MOD}._cron_trigger", mock_trigger), \
|
||||
patch("hermes_cli.gateway.find_gateway_pids", return_value=[]), \
|
||||
patch("cron.scheduler.run_job_now", mock_run_now):
|
||||
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
|
||||
assert resp.status == 200
|
||||
data = await resp.json()
|
||||
assert data["job"]["last_run_at"] == "2026-04-28T00:00:00"
|
||||
assert data["job"]["last_status"] == "ok"
|
||||
assert "inline" in data.get("message", "").lower()
|
||||
mock_run_now.assert_called_once_with(VALID_JOB_ID)
|
||||
mock_trigger.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# 17. test_auth_required
|
||||
|
||||
@@ -243,6 +243,109 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return result
|
||||
|
||||
|
||||
def _gateway_ticker_running() -> bool:
|
||||
"""Return True if a ``hermes gateway`` process with a live cron ticker
|
||||
is running (either a different process or the current one).
|
||||
|
||||
The ticker fires due jobs every 60 seconds; when it's up, ``action="run"``
|
||||
just needs to set ``next_run_at=now`` and let the ticker pick it up on the
|
||||
next pass. When it isn't, ``action="run"`` would otherwise be a no-op —
|
||||
so we fall back to executing the job inline in the caller's process.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
except Exception:
|
||||
return False
|
||||
try:
|
||||
return bool(find_gateway_pids())
|
||||
except Exception:
|
||||
# If we can't tell, err on the side of inline execution so
|
||||
# ``action="run"`` never silently drops on the floor.
|
||||
return False
|
||||
|
||||
|
||||
def _handle_run_action(job_id: str) -> str:
|
||||
"""Handle ``cronjob(action="run")``.
|
||||
|
||||
When a gateway is running (ticker alive), set ``next_run_at=now`` and
|
||||
return — the ticker will pick the job up within its interval (≤60s).
|
||||
|
||||
When no gateway is running, execute the job inline in this process so
|
||||
the caller actually observes ``last_run_at`` / ``last_status`` updates.
|
||||
Blocks the caller for the duration of the agent run.
|
||||
"""
|
||||
if _gateway_ticker_running():
|
||||
updated = trigger_job(job_id)
|
||||
if not updated:
|
||||
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"job": _format_job(updated),
|
||||
"message": (
|
||||
"Job scheduled for immediate execution. The gateway cron "
|
||||
"ticker will fire it on its next pass (within ~60s). "
|
||||
"Re-inspect with cronjob(action='list') to see last_run_at "
|
||||
"and last_status once it completes."
|
||||
),
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
|
||||
# No gateway → inline execution so the user actually gets a run.
|
||||
try:
|
||||
from cron.scheduler import run_job_now
|
||||
except Exception as e:
|
||||
# Extremely unlikely, but if scheduler can't be imported (circular
|
||||
# import, missing deps) fall back to the defer path with a warning.
|
||||
logger.warning("run_job_now unavailable, falling back to defer: %s", e)
|
||||
updated = trigger_job(job_id)
|
||||
if not updated:
|
||||
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"job": _format_job(updated),
|
||||
"warning": (
|
||||
"Gateway is not running and inline execution is unavailable. "
|
||||
"Job was re-scheduled (next_run_at=now) but will not execute "
|
||||
"until you start the gateway with 'hermes gateway'."
|
||||
),
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
|
||||
updated = run_job_now(job_id)
|
||||
if not updated:
|
||||
# Either the job doesn't exist or the tick lock was held by a
|
||||
# concurrent ticker that started after we checked _gateway_ticker_running.
|
||||
# Double-check existence vs. lock contention.
|
||||
if not get_job(job_id):
|
||||
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
|
||||
# Lock was busy — fall back to defer.
|
||||
updated = trigger_job(job_id)
|
||||
return json.dumps(
|
||||
{
|
||||
"success": True,
|
||||
"job": _format_job(updated) if updated else None,
|
||||
"message": "Another cron tick is running; job scheduled for that tick.",
|
||||
},
|
||||
indent=2,
|
||||
)
|
||||
|
||||
formatted = _format_job(updated)
|
||||
last_status = updated.get("last_status")
|
||||
if last_status == "ok":
|
||||
msg = "Job executed inline (no gateway ticker running)."
|
||||
elif last_status == "error":
|
||||
msg = (
|
||||
f"Job executed inline but failed: {updated.get('last_error') or 'unknown error'}"
|
||||
)
|
||||
else:
|
||||
msg = "Job executed inline (no gateway ticker running)."
|
||||
return json.dumps({"success": True, "job": formatted, "message": msg}, indent=2)
|
||||
|
||||
|
||||
def cronjob(
|
||||
action: str,
|
||||
job_id: Optional[str] = None,
|
||||
@@ -372,8 +475,7 @@ def cronjob(
|
||||
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
|
||||
|
||||
if normalized in {"run", "run_now", "trigger"}:
|
||||
updated = trigger_job(job_id)
|
||||
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
|
||||
return _handle_run_action(job_id)
|
||||
|
||||
if normalized == "update":
|
||||
updates: Dict[str, Any] = {}
|
||||
@@ -476,7 +578,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"description": "One of: create, list, update, pause, resume, remove, run"
|
||||
"description": "One of: create, list, update, pause, resume, remove, run. 'run' executes the job now when no gateway ticker is up, or schedules it for the next tick (≤60s) when one is."
|
||||
},
|
||||
"job_id": {
|
||||
"type": "string",
|
||||
|
||||
@@ -200,7 +200,7 @@ Scripts that dump megabytes of output will slow down the agent and may hit token
|
||||
|
||||
```bash
|
||||
hermes cron list # Show all jobs, states, next_run times
|
||||
hermes cron run <job_id> # Schedule for next tick (for testing)
|
||||
hermes cron run <job_id> # Run now (inline if no gateway; otherwise next tick)
|
||||
hermes cron edit <job_id> # Fix configuration issues
|
||||
hermes logs # View recent Hermes logs
|
||||
hermes skills list # Verify installed skills
|
||||
@@ -212,7 +212,7 @@ hermes skills list # Verify installed skills
|
||||
|
||||
If you've worked through this guide and the issue persists:
|
||||
|
||||
1. Run the job with `hermes cron run <job_id>` (fires on next gateway tick) and watch for errors in the chat output
|
||||
1. Run the job with `hermes cron run <job_id>` (runs inline when no gateway is up, otherwise fires on next gateway tick) and watch for errors in the chat output
|
||||
2. Check `~/.hermes/logs/agent.log` for scheduler messages and `~/.hermes/logs/errors.log` for warnings
|
||||
3. Open an issue at [github.com/NousResearch/hermes-agent](https://github.com/NousResearch/hermes-agent) with:
|
||||
- The job ID and schedule
|
||||
|
||||
@@ -295,7 +295,7 @@ hermes cron <list|create|edit|pause|resume|run|remove|status|tick>
|
||||
| `edit` | Update a job's schedule, prompt, name, delivery, repeat count, or attached skills. Supports `--clear-skills`, `--add-skill`, and `--remove-skill`. |
|
||||
| `pause` | Pause a job without deleting it. |
|
||||
| `resume` | Resume a paused job and compute its next future run. |
|
||||
| `run` | Trigger a job on the next scheduler tick. |
|
||||
| `run` | Run a job now. Executes inline when no gateway is running; otherwise defers to the next gateway tick (≤60s). |
|
||||
| `remove` | Delete a scheduled job. |
|
||||
| `status` | Check whether the cron scheduler is running. |
|
||||
| `tick` | Run due jobs once and exit. |
|
||||
|
||||
Reference in New Issue
Block a user