Compare commits

...

1 Commits

Author SHA1 Message Date
teknium1
9f361cdca6 fix(cron): action='run' executes inline when no gateway ticker is running
Closes #16612.

cronjob(action='run') and POST /api/jobs/{id}/run previously only called
trigger_job(), which just sets next_run_at=now. When no gateway process
was running, there was no ticker to pick the job up, so last_run_at
stayed null forever — the tool returned success=true with nothing
actually executing.

- cron.scheduler: extract the per-job run+save+deliver+mark pipeline
  from tick()'s closure into a module-level _execute_and_record helper,
  and add run_job_now() which acquires the shared tick file-lock and
  runs one job inline (advancing next_run_at first for recurring jobs
  to preserve at-most-once semantics).
- tools/cronjob_tools: branch action='run' on gateway presence via
  find_gateway_pids(). Ticker up → defer to next tick (≤60s) with a
  clear message. Ticker down → execute inline via run_job_now() and
  return the updated job including last_run_at/last_status. Tool
  schema description updated to reflect the two modes.
- gateway/platforms/api_server: same branching on the HTTP endpoint,
  running the inline call off the event loop via run_in_executor.
- hermes_cli/cron: CLI 'hermes cron run' now surfaces the message,
  last_run_at, and last_status from the tool result instead of
  always printing 'It will run on the next scheduler tick.'
- docs: update cli-commands and cron-troubleshooting to describe the
  inline-when-no-gateway behaviour.

Tests: 11 new unit tests (tests/cron/test_run_job_now.py) covering
run_job_now's inline path, _execute_and_record's success/empty/error
paths, and the tool/API gateway-presence branching. Plus a new
TestRunJob::test_run_job_executes_inline_when_no_gateway on the
api_server suite. E2E verified in a temp HERMES_HOME against the
real file-based job store.
2026-04-29 08:18:11 -07:00
8 changed files with 643 additions and 58 deletions

View File

@@ -1214,6 +1214,135 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e)
def _execute_and_record(
job: dict,
adapters=None,
loop=None,
verbose: bool = False,
) -> bool:
"""Run one due job end-to-end: execute, save output, deliver, mark.
Returns True if execution + bookkeeping completed (even if the job's
agent run itself failed — in that case last_status is recorded as
``error``). Returns False only on an unhandled exception in this
wrapper; such failures are also recorded via ``mark_job_run``.
Shared between the periodic ``tick()`` loop and the on-demand
``run_job_now()`` path, so ``cronjob(action="run")`` behaves
identically to scheduled execution.
"""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response:
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
def run_job_now(
job_id: str,
adapters=None,
loop=None,
verbose: bool = False,
) -> Optional[dict]:
"""Execute a single job inline, right now, and return the updated job.
Used by ``cronjob(action="run")`` and ``POST /api/jobs/{id}/run`` when
no gateway ticker is running. Blocks the caller for the duration of
the agent run.
Grabs the same file lock ``tick()`` uses so an in-process gateway
ticker firing at the same moment can't double-execute the job. If
the lock is held (another tick is mid-flight), returns None without
touching the job so the caller can fall back to the defer path.
Also advances ``next_run_at`` for recurring jobs **before** execution
to preserve at-most-once semantics matching the tick loop.
Returns the updated job dict (post ``mark_job_run``) on successful
inline execution, or None if the job does not exist or the lock
could not be acquired.
"""
from cron.jobs import get_job
job = get_job(job_id)
if not job:
return None
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
lock_fd = None
try:
lock_fd = open(_LOCK_FILE, "w")
if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
elif msvcrt:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
except (OSError, IOError):
logger.debug("run_job_now: tick lock held, skipping inline exec for %s", job_id)
if lock_fd is not None:
lock_fd.close()
return None
try:
# Match the tick() semantics: advance recurring jobs' next_run_at
# BEFORE the agent runs, so a crash mid-execution doesn't cause
# a re-fire on the next ticker pass.
advance_next_run(job_id)
# Re-load the job so the adjusted next_run_at is reflected in
# the working copy passed to _execute_and_record (_apply_skill_fields
# and any other live-computed fields also get refreshed).
job = get_job(job_id) or job
_execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose)
return get_job(job_id)
finally:
if fcntl:
try:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
except (OSError, IOError):
pass
elif msvcrt:
try:
msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1)
except (OSError, IOError):
pass
lock_fd.close()
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.
@@ -1288,45 +1417,7 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
)
def _process_job(job: dict) -> bool:
"""Run one due job end-to-end: execute, save, deliver, mark."""
try:
success, output, final_response, error = run_job(job)
output_file = save_job_output(job["id"], output)
if verbose:
logger.info("Output saved to: %s", output_file)
# Deliver the final response to the origin/target chat.
# If the agent responded with [SILENT], skip delivery (but
# output is already saved above). Failed jobs always deliver.
deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}"
should_deliver = bool(deliver_content)
if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper():
logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER)
should_deliver = False
delivery_error = None
if should_deliver:
try:
delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop)
except Exception as de:
delivery_error = str(de)
logger.error("Delivery failed for job %s: %s", job["id"], de)
# Treat empty final_response as a soft failure so last_status
# is not "ok" — the agent ran but produced nothing useful.
# (issue #8585)
if success and not final_response:
success = False
error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)"
mark_job_run(job["id"], success, error, delivery_error=delivery_error)
return True
except Exception as e:
logger.error("Error processing job %s: %s", job['id'], e)
mark_job_run(job["id"], False, str(e))
return False
return _execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose)
# Partition due jobs: those with a per-job workdir mutate
# os.environ["TERMINAL_CWD"] inside run_job, which is process-global —

View File

@@ -2216,7 +2216,17 @@ class APIServerAdapter(BasePlatformAdapter):
return web.json_response({"error": str(e)}, status=500)
async def _handle_run_job(self, request: "web.Request") -> "web.Response":
"""POST /api/jobs/{job_id}/run — trigger immediate execution."""
"""POST /api/jobs/{job_id}/run — trigger immediate execution.
When a gateway ticker is up (which it is for any live api_server
process hosted inside ``hermes gateway``), this sets ``next_run_at``
to now and returns — the ticker will fire the job on its next pass
(within ~60s).
When no gateway ticker is up (rare, e.g. api_server running standalone
via a one-off invocation), the job is executed inline off the event
loop and the response includes ``last_run_at`` / ``last_status``.
"""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
@@ -2227,10 +2237,61 @@ class APIServerAdapter(BasePlatformAdapter):
if id_err:
return id_err
try:
job = _cron_trigger(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response({"job": job})
ticker_up = True
try:
from hermes_cli.gateway import find_gateway_pids
ticker_up = bool(find_gateway_pids())
except Exception:
pass
if ticker_up:
job = _cron_trigger(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response(
{
"job": job,
"message": (
"Job scheduled for next tick (within ~60s). "
"Poll /api/jobs/{job_id} for updated last_run_at/last_status."
),
}
)
# Inline execution off the event loop.
try:
from cron.scheduler import run_job_now
except Exception:
# Fall back to defer with a warning.
job = _cron_trigger(job_id)
if not job:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response(
{
"job": job,
"warning": (
"Gateway is not running and inline execution is unavailable. "
"Start the gateway with 'hermes gateway' so jobs fire automatically."
),
}
)
loop = asyncio.get_running_loop()
updated = await loop.run_in_executor(None, run_job_now, job_id)
if not updated:
if not _cron_get(job_id):
return web.json_response({"error": "Job not found"}, status=404)
# Lock contention — fall back to defer.
job = _cron_trigger(job_id)
return web.json_response(
{
"job": job,
"message": "Another cron tick is running; job scheduled for that tick.",
}
)
return web.json_response(
{"job": updated, "message": "Job executed inline (no gateway ticker running)."}
)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)

View File

@@ -255,7 +255,18 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int:
if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"):
print(f" Next run: {result['job']['next_run_at']}")
if action == "run":
print(" It will run on the next scheduler tick.")
msg = result.get("message") or " It will run on the next scheduler tick."
last_status = job.get("last_status")
if last_status == "ok":
print(f" {color('✓ Last run:', Colors.GREEN)} {job.get('last_run_at', '?')} {color('ok', Colors.GREEN)}")
elif last_status == "error":
last_err = job.get("last_error", "?")
print(f" {color('✗ Last run:', Colors.RED)} {job.get('last_run_at', '?')} "
f"{color('error: ' + str(last_err), Colors.RED)}")
print(f" {msg}")
warning = result.get("warning")
if warning:
print(color(f"{warning}", Colors.YELLOW))
return 0

View File

@@ -0,0 +1,290 @@
"""Tests for on-demand cron execution — cron.scheduler.run_job_now and the
tool/API ``action='run'`` branching between inline execution (no gateway)
and defer-to-ticker (gateway running).
Regression target: issue #16612 — ``cronjob run`` previously only called
``trigger_job()``, which just sets ``next_run_at=now``. When no gateway
ticker was running, the job never actually executed and the user saw
``state: scheduled`` with ``last_run_at: null`` forever.
"""
from __future__ import annotations
import json
from unittest.mock import patch
import pytest
# ---------------------------------------------------------------------------
# run_job_now — inline execution unit tests
# ---------------------------------------------------------------------------
class TestRunJobNow:
"""cron.scheduler.run_job_now executes a single job inline."""
def _make_job(self, job_id="inline-job", **overrides):
job = {
"id": job_id,
"name": "inline test",
"enabled": True,
"schedule": {"kind": "interval", "value": "every 1h"},
"state": "scheduled",
"prompt": "do a thing",
}
job.update(overrides)
return job
def test_returns_none_for_missing_job(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Reload scheduler so _LOCK_DIR picks up the temp HERMES_HOME.
import importlib
import cron.scheduler
importlib.reload(cron.scheduler)
from cron.scheduler import run_job_now
with patch("cron.jobs.get_job", return_value=None):
assert run_job_now("does-not-exist") is None
def test_executes_inline_and_returns_updated_job(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
import importlib
import cron.scheduler
importlib.reload(cron.scheduler)
from cron.scheduler import run_job_now
job = self._make_job()
updated_after_run = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "ok"}
# cron.scheduler.run_job_now imports get_job lazily from cron.jobs,
# so patch at the source (cron.jobs.get_job). Three calls:
# (1) existence check, (2) re-load after advance, (3) final return.
with patch("cron.jobs.get_job", side_effect=[job, job, updated_after_run]), \
patch("cron.scheduler.advance_next_run", return_value=True) as adv, \
patch("cron.scheduler.run_job", return_value=(True, "# output", "final reply", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run") as mark:
result = run_job_now("inline-job")
# advance_next_run is called BEFORE the agent run to preserve
# at-most-once semantics (matches tick() behaviour).
adv.assert_called_once_with("inline-job")
mark.assert_called_once()
# mark_job_run is called with (job_id, success=True, error=None, delivery_error=None)
args, kwargs = mark.call_args
assert args[0] == "inline-job"
assert args[1] is True
assert result == updated_after_run
def test_inline_records_failure_when_agent_errors(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
import importlib
import cron.scheduler
importlib.reload(cron.scheduler)
from cron.scheduler import run_job_now
job = self._make_job()
updated = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "error", "last_error": "boom"}
with patch("cron.jobs.get_job", side_effect=[job, job, updated]), \
patch("cron.scheduler.advance_next_run", return_value=True), \
patch("cron.scheduler.run_job", return_value=(False, "# output", "", "boom")), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None), \
patch("cron.scheduler.mark_job_run") as mark:
result = run_job_now("inline-job")
args, kwargs = mark.call_args
assert args[1] is False # success=False
assert args[2] == "boom" # error propagated
assert result["last_status"] == "error"
# ---------------------------------------------------------------------------
# cronjob(action="run") tool — gateway-aware branching
# ---------------------------------------------------------------------------
class TestCronjobRunTool:
"""tools.cronjob_tools._handle_run_action branches on gateway presence."""
def _fake_job(self, job_id="tool-job", **overrides):
job = {
"id": job_id,
"name": "tool test",
"enabled": True,
"schedule": {"kind": "interval", "value": "every 1h"},
"state": "scheduled",
"prompt": "do a thing",
"deliver": "local",
}
job.update(overrides)
return job
def test_gateway_running_defers_to_tick(self, tmp_path, monkeypatch):
"""When gateway PIDs exist, action='run' only calls trigger_job."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.cronjob_tools import cronjob
job = self._fake_job()
deferred = {**job, "next_run_at": "2026-04-28T00:00:00"}
with patch("tools.cronjob_tools.get_job", return_value=job), \
patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig, \
patch("tools.cronjob_tools._gateway_ticker_running", return_value=True):
out = cronjob(action="run", job_id="tool-job")
result = json.loads(out)
assert result["success"] is True
trig.assert_called_once_with("tool-job")
assert "next tick" in result["message"].lower() or "within" in result["message"].lower()
# The deferred path must NOT claim the job already executed.
assert result["job"].get("last_run_at") in (None, "")
def test_gateway_not_running_executes_inline(self, tmp_path, monkeypatch):
"""When no gateway PIDs exist, action='run' invokes run_job_now
and the returned job carries last_run_at + last_status=ok."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.cronjob_tools import cronjob
job = self._fake_job()
executed = {
**job,
"last_run_at": "2026-04-28T00:00:00",
"last_status": "ok",
}
with patch("tools.cronjob_tools.get_job", return_value=job), \
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
patch("cron.scheduler.run_job_now", return_value=executed) as run_now, \
patch("tools.cronjob_tools.trigger_job") as trig:
out = cronjob(action="run", job_id="tool-job")
result = json.loads(out)
assert result["success"] is True
run_now.assert_called_once_with("tool-job")
trig.assert_not_called() # defer path must not be taken
assert result["job"]["last_run_at"] == "2026-04-28T00:00:00"
assert result["job"]["last_status"] == "ok"
assert "inline" in result["message"].lower()
def test_gateway_not_running_falls_back_to_defer_on_lock_contention(self, tmp_path, monkeypatch):
"""If run_job_now returns None (tick lock held), fall back to defer
so the caller isn't left with a silent no-op."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.cronjob_tools import cronjob
job = self._fake_job()
deferred = {**job, "next_run_at": "2026-04-28T00:00:00"}
with patch("tools.cronjob_tools.get_job", return_value=job), \
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
patch("cron.scheduler.run_job_now", return_value=None), \
patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig:
out = cronjob(action="run", job_id="tool-job")
result = json.loads(out)
assert result["success"] is True
trig.assert_called_once_with("tool-job")
assert "another cron tick" in result["message"].lower() or "tick" in result["message"].lower()
def test_unknown_job_returns_error(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.cronjob_tools import cronjob
# get_job at the TOP of cronjob() is what short-circuits missing jobs.
with patch("tools.cronjob_tools.get_job", return_value=None):
out = cronjob(action="run", job_id="nope")
result = json.loads(out)
assert result["success"] is False
assert "not found" in result["error"].lower()
def test_inline_execution_surfaces_failure(self, tmp_path, monkeypatch):
"""Failed inline exec — the message must indicate the failure
instead of claiming a clean run."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from tools.cronjob_tools import cronjob
job = self._fake_job()
executed = {
**job,
"last_run_at": "2026-04-28T00:00:00",
"last_status": "error",
"last_error": "model timed out",
}
with patch("tools.cronjob_tools.get_job", return_value=job), \
patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \
patch("cron.scheduler.run_job_now", return_value=executed):
out = cronjob(action="run", job_id="tool-job")
result = json.loads(out)
assert result["success"] is True # the tool call itself succeeded
assert result["job"]["last_status"] == "error"
assert "model timed out" in result["message"]
# ---------------------------------------------------------------------------
# _execute_and_record — shared helper covering both tick and run_job_now
# ---------------------------------------------------------------------------
class TestExecuteAndRecord:
"""_execute_and_record is the shared end-to-end helper used by both
the tick loop and run_job_now."""
def _make_job(self):
return {
"id": "shared-helper-job",
"name": "shared helper",
"deliver": "local",
}
def test_success_path_delivers_and_marks_ok(self):
from cron.scheduler import _execute_and_record
with patch("cron.scheduler.run_job", return_value=(True, "# out", "final", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result", return_value=None) as deliver, \
patch("cron.scheduler.mark_job_run") as mark:
ok = _execute_and_record(self._make_job())
assert ok is True
deliver.assert_called_once()
mark.assert_called_once()
args, kwargs = mark.call_args
assert args[1] is True
# mark_job_run(job_id, success, error, delivery_error=...)
assert kwargs.get("delivery_error") is None
def test_empty_response_marked_as_failure(self):
"""Issue #8585 behaviour must be preserved after the refactor."""
from cron.scheduler import _execute_and_record
with patch("cron.scheduler.run_job", return_value=(True, "# out", "", None)), \
patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \
patch("cron.scheduler._deliver_result"), \
patch("cron.scheduler.mark_job_run") as mark:
_execute_and_record(self._make_job())
args, _ = mark.call_args
assert args[1] is False
assert "empty response" in args[2].lower()
def test_exception_in_run_job_marks_error(self):
from cron.scheduler import _execute_and_record
with patch("cron.scheduler.run_job", side_effect=RuntimeError("boom")), \
patch("cron.scheduler.mark_job_run") as mark:
ok = _execute_and_record(self._make_job())
assert ok is False
mark.assert_called_once()
args, _ = mark.call_args
assert args[1] is False
assert "boom" in args[2]

View File

@@ -447,23 +447,53 @@ class TestResumeJob:
class TestRunJob:
@pytest.mark.asyncio
async def test_run_job(self, adapter):
"""POST /api/jobs/{id}/run returns triggered job."""
async def test_run_job_defers_when_gateway_running(self, adapter):
"""POST /api/jobs/{id}/run calls trigger_job when a gateway ticker is up."""
app = _create_app(adapter)
triggered_job = {**SAMPLE_JOB, "last_run": "2025-01-01T00:00:00Z"}
mock_trigger = MagicMock(return_value=triggered_job)
async with TestClient(TestServer(app)) as cli:
with patch(
f"{_MOD}._CRON_AVAILABLE", True
), patch(
f"{_MOD}._cron_trigger", mock_trigger
):
with patch(f"{_MOD}._CRON_AVAILABLE", True), \
patch(f"{_MOD}._cron_trigger", mock_trigger), \
patch("hermes_cli.gateway.find_gateway_pids", return_value=[12345]):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
assert resp.status == 200
data = await resp.json()
assert data["job"] == triggered_job
assert "tick" in data.get("message", "").lower()
mock_trigger.assert_called_once_with(VALID_JOB_ID)
@pytest.mark.asyncio
async def test_run_job_executes_inline_when_no_gateway(self, adapter):
"""POST /api/jobs/{id}/run calls run_job_now when no gateway ticker is up.
Regression for issue #16612: the old handler only called
trigger_job(), which just set next_run_at=now without executing
anything. Users saw last_run_at=null forever.
"""
app = _create_app(adapter)
executed_job = {
**SAMPLE_JOB,
"last_run_at": "2026-04-28T00:00:00",
"last_status": "ok",
}
mock_run_now = MagicMock(return_value=executed_job)
mock_trigger = MagicMock()
async with TestClient(TestServer(app)) as cli:
with patch(f"{_MOD}._CRON_AVAILABLE", True), \
patch(f"{_MOD}._cron_trigger", mock_trigger), \
patch("hermes_cli.gateway.find_gateway_pids", return_value=[]), \
patch("cron.scheduler.run_job_now", mock_run_now):
resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run")
assert resp.status == 200
data = await resp.json()
assert data["job"]["last_run_at"] == "2026-04-28T00:00:00"
assert data["job"]["last_status"] == "ok"
assert "inline" in data.get("message", "").lower()
mock_run_now.assert_called_once_with(VALID_JOB_ID)
mock_trigger.assert_not_called()
# ---------------------------------------------------------------------------
# 17. test_auth_required

View File

@@ -243,6 +243,109 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
return result
def _gateway_ticker_running() -> bool:
"""Return True if a ``hermes gateway`` process with a live cron ticker
is running (either a different process or the current one).
The ticker fires due jobs every 60 seconds; when it's up, ``action="run"``
just needs to set ``next_run_at=now`` and let the ticker pick it up on the
next pass. When it isn't, ``action="run"`` would otherwise be a no-op —
so we fall back to executing the job inline in the caller's process.
"""
try:
from hermes_cli.gateway import find_gateway_pids
except Exception:
return False
try:
return bool(find_gateway_pids())
except Exception:
# If we can't tell, err on the side of inline execution so
# ``action="run"`` never silently drops on the floor.
return False
def _handle_run_action(job_id: str) -> str:
"""Handle ``cronjob(action="run")``.
When a gateway is running (ticker alive), set ``next_run_at=now`` and
return — the ticker will pick the job up within its interval (≤60s).
When no gateway is running, execute the job inline in this process so
the caller actually observes ``last_run_at`` / ``last_status`` updates.
Blocks the caller for the duration of the agent run.
"""
if _gateway_ticker_running():
updated = trigger_job(job_id)
if not updated:
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
return json.dumps(
{
"success": True,
"job": _format_job(updated),
"message": (
"Job scheduled for immediate execution. The gateway cron "
"ticker will fire it on its next pass (within ~60s). "
"Re-inspect with cronjob(action='list') to see last_run_at "
"and last_status once it completes."
),
},
indent=2,
)
# No gateway → inline execution so the user actually gets a run.
try:
from cron.scheduler import run_job_now
except Exception as e:
# Extremely unlikely, but if scheduler can't be imported (circular
# import, missing deps) fall back to the defer path with a warning.
logger.warning("run_job_now unavailable, falling back to defer: %s", e)
updated = trigger_job(job_id)
if not updated:
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
return json.dumps(
{
"success": True,
"job": _format_job(updated),
"warning": (
"Gateway is not running and inline execution is unavailable. "
"Job was re-scheduled (next_run_at=now) but will not execute "
"until you start the gateway with 'hermes gateway'."
),
},
indent=2,
)
updated = run_job_now(job_id)
if not updated:
# Either the job doesn't exist or the tick lock was held by a
# concurrent ticker that started after we checked _gateway_ticker_running.
# Double-check existence vs. lock contention.
if not get_job(job_id):
return tool_error(f"Job with ID '{job_id}' not found.", success=False)
# Lock was busy — fall back to defer.
updated = trigger_job(job_id)
return json.dumps(
{
"success": True,
"job": _format_job(updated) if updated else None,
"message": "Another cron tick is running; job scheduled for that tick.",
},
indent=2,
)
formatted = _format_job(updated)
last_status = updated.get("last_status")
if last_status == "ok":
msg = "Job executed inline (no gateway ticker running)."
elif last_status == "error":
msg = (
f"Job executed inline but failed: {updated.get('last_error') or 'unknown error'}"
)
else:
msg = "Job executed inline (no gateway ticker running)."
return json.dumps({"success": True, "job": formatted, "message": msg}, indent=2)
def cronjob(
action: str,
job_id: Optional[str] = None,
@@ -372,8 +475,7 @@ def cronjob(
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "run_now", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
return _handle_run_action(job_id)
if normalized == "update":
updates: Dict[str, Any] = {}
@@ -476,7 +578,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"properties": {
"action": {
"type": "string",
"description": "One of: create, list, update, pause, resume, remove, run"
"description": "One of: create, list, update, pause, resume, remove, run. 'run' executes the job now when no gateway ticker is up, or schedules it for the next tick (≤60s) when one is."
},
"job_id": {
"type": "string",

View File

@@ -200,7 +200,7 @@ Scripts that dump megabytes of output will slow down the agent and may hit token
```bash
hermes cron list # Show all jobs, states, next_run times
hermes cron run <job_id> # Schedule for next tick (for testing)
hermes cron run <job_id> # Run now (inline if no gateway; otherwise next tick)
hermes cron edit <job_id> # Fix configuration issues
hermes logs # View recent Hermes logs
hermes skills list # Verify installed skills
@@ -212,7 +212,7 @@ hermes skills list # Verify installed skills
If you've worked through this guide and the issue persists:
1. Run the job with `hermes cron run <job_id>` (fires on next gateway tick) and watch for errors in the chat output
1. Run the job with `hermes cron run <job_id>` (runs inline when no gateway is up, otherwise fires on next gateway tick) and watch for errors in the chat output
2. Check `~/.hermes/logs/agent.log` for scheduler messages and `~/.hermes/logs/errors.log` for warnings
3. Open an issue at [github.com/NousResearch/hermes-agent](https://github.com/NousResearch/hermes-agent) with:
- The job ID and schedule

View File

@@ -295,7 +295,7 @@ hermes cron <list|create|edit|pause|resume|run|remove|status|tick>
| `edit` | Update a job's schedule, prompt, name, delivery, repeat count, or attached skills. Supports `--clear-skills`, `--add-skill`, and `--remove-skill`. |
| `pause` | Pause a job without deleting it. |
| `resume` | Resume a paused job and compute its next future run. |
| `run` | Trigger a job on the next scheduler tick. |
| `run` | Run a job now. Executes inline when no gateway is running; otherwise defers to the next gateway tick (≤60s). |
| `remove` | Delete a scheduled job. |
| `status` | Check whether the cron scheduler is running. |
| `tick` | Run due jobs once and exit. |