diff --git a/cron/scheduler.py b/cron/scheduler.py index d41c7ed860c..595fb4601d1 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -1214,6 +1214,135 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: logger.debug("Job '%s': failed to reap stale auxiliary clients: %s", job_id, e) +def _execute_and_record( + job: dict, + adapters=None, + loop=None, + verbose: bool = False, +) -> bool: + """Run one due job end-to-end: execute, save output, deliver, mark. + + Returns True if execution + bookkeeping completed (even if the job's + agent run itself failed — in that case last_status is recorded as + ``error``). Returns False only on an unhandled exception in this + wrapper; such failures are also recorded via ``mark_job_run``. + + Shared between the periodic ``tick()`` loop and the on-demand + ``run_job_now()`` path, so ``cronjob(action="run")`` behaves + identically to scheduled execution. + """ + try: + success, output, final_response, error = run_job(job) + + output_file = save_job_output(job["id"], output) + if verbose: + logger.info("Output saved to: %s", output_file) + + # Deliver the final response to the origin/target chat. + # If the agent responded with [SILENT], skip delivery (but + # output is already saved above). Failed jobs always deliver. + deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" + should_deliver = bool(deliver_content) + if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): + logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) + should_deliver = False + + delivery_error = None + if should_deliver: + try: + delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) + except Exception as de: + delivery_error = str(de) + logger.error("Delivery failed for job %s: %s", job["id"], de) + + # Treat empty final_response as a soft failure so last_status + # is not "ok" — the agent ran but produced nothing useful. + # (issue #8585) + if success and not final_response: + success = False + error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" + + mark_job_run(job["id"], success, error, delivery_error=delivery_error) + return True + + except Exception as e: + logger.error("Error processing job %s: %s", job['id'], e) + mark_job_run(job["id"], False, str(e)) + return False + + +def run_job_now( + job_id: str, + adapters=None, + loop=None, + verbose: bool = False, +) -> Optional[dict]: + """Execute a single job inline, right now, and return the updated job. + + Used by ``cronjob(action="run")`` and ``POST /api/jobs/{id}/run`` when + no gateway ticker is running. Blocks the caller for the duration of + the agent run. + + Grabs the same file lock ``tick()`` uses so an in-process gateway + ticker firing at the same moment can't double-execute the job. If + the lock is held (another tick is mid-flight), returns None without + touching the job so the caller can fall back to the defer path. + + Also advances ``next_run_at`` for recurring jobs **before** execution + to preserve at-most-once semantics matching the tick loop. + + Returns the updated job dict (post ``mark_job_run``) on successful + inline execution, or None if the job does not exist or the lock + could not be acquired. + """ + from cron.jobs import get_job + + job = get_job(job_id) + if not job: + return None + + _LOCK_DIR.mkdir(parents=True, exist_ok=True) + + lock_fd = None + try: + lock_fd = open(_LOCK_FILE, "w") + if fcntl: + fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + elif msvcrt: + msvcrt.locking(lock_fd.fileno(), msvcrt.LK_NBLCK, 1) + except (OSError, IOError): + logger.debug("run_job_now: tick lock held, skipping inline exec for %s", job_id) + if lock_fd is not None: + lock_fd.close() + return None + + try: + # Match the tick() semantics: advance recurring jobs' next_run_at + # BEFORE the agent runs, so a crash mid-execution doesn't cause + # a re-fire on the next ticker pass. + advance_next_run(job_id) + + # Re-load the job so the adjusted next_run_at is reflected in + # the working copy passed to _execute_and_record (_apply_skill_fields + # and any other live-computed fields also get refreshed). + job = get_job(job_id) or job + + _execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose) + return get_job(job_id) + finally: + if fcntl: + try: + fcntl.flock(lock_fd, fcntl.LOCK_UN) + except (OSError, IOError): + pass + elif msvcrt: + try: + msvcrt.locking(lock_fd.fileno(), msvcrt.LK_UNLCK, 1) + except (OSError, IOError): + pass + lock_fd.close() + + def tick(verbose: bool = True, adapters=None, loop=None) -> int: """ Check and run all due jobs. @@ -1288,45 +1417,7 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: ) def _process_job(job: dict) -> bool: - """Run one due job end-to-end: execute, save, deliver, mark.""" - try: - success, output, final_response, error = run_job(job) - - output_file = save_job_output(job["id"], output) - if verbose: - logger.info("Output saved to: %s", output_file) - - # Deliver the final response to the origin/target chat. - # If the agent responded with [SILENT], skip delivery (but - # output is already saved above). Failed jobs always deliver. - deliver_content = final_response if success else f"⚠️ Cron job '{job.get('name', job['id'])}' failed:\n{error}" - should_deliver = bool(deliver_content) - if should_deliver and success and SILENT_MARKER in deliver_content.strip().upper(): - logger.info("Job '%s': agent returned %s — skipping delivery", job["id"], SILENT_MARKER) - should_deliver = False - - delivery_error = None - if should_deliver: - try: - delivery_error = _deliver_result(job, deliver_content, adapters=adapters, loop=loop) - except Exception as de: - delivery_error = str(de) - logger.error("Delivery failed for job %s: %s", job["id"], de) - - # Treat empty final_response as a soft failure so last_status - # is not "ok" — the agent ran but produced nothing useful. - # (issue #8585) - if success and not final_response: - success = False - error = "Agent completed but produced empty response (model error, timeout, or misconfiguration)" - - mark_job_run(job["id"], success, error, delivery_error=delivery_error) - return True - - except Exception as e: - logger.error("Error processing job %s: %s", job['id'], e) - mark_job_run(job["id"], False, str(e)) - return False + return _execute_and_record(job, adapters=adapters, loop=loop, verbose=verbose) # Partition due jobs: those with a per-job workdir mutate # os.environ["TERMINAL_CWD"] inside run_job, which is process-global — diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 1b2e2e156b9..6c7646a1465 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -2216,7 +2216,17 @@ class APIServerAdapter(BasePlatformAdapter): return web.json_response({"error": str(e)}, status=500) async def _handle_run_job(self, request: "web.Request") -> "web.Response": - """POST /api/jobs/{job_id}/run — trigger immediate execution.""" + """POST /api/jobs/{job_id}/run — trigger immediate execution. + + When a gateway ticker is up (which it is for any live api_server + process hosted inside ``hermes gateway``), this sets ``next_run_at`` + to now and returns — the ticker will fire the job on its next pass + (within ~60s). + + When no gateway ticker is up (rare, e.g. api_server running standalone + via a one-off invocation), the job is executed inline off the event + loop and the response includes ``last_run_at`` / ``last_status``. + """ auth_err = self._check_auth(request) if auth_err: return auth_err @@ -2227,10 +2237,61 @@ class APIServerAdapter(BasePlatformAdapter): if id_err: return id_err try: - job = _cron_trigger(job_id) - if not job: - return web.json_response({"error": "Job not found"}, status=404) - return web.json_response({"job": job}) + ticker_up = True + try: + from hermes_cli.gateway import find_gateway_pids + ticker_up = bool(find_gateway_pids()) + except Exception: + pass + + if ticker_up: + job = _cron_trigger(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response( + { + "job": job, + "message": ( + "Job scheduled for next tick (within ~60s). " + "Poll /api/jobs/{job_id} for updated last_run_at/last_status." + ), + } + ) + + # Inline execution off the event loop. + try: + from cron.scheduler import run_job_now + except Exception: + # Fall back to defer with a warning. + job = _cron_trigger(job_id) + if not job: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response( + { + "job": job, + "warning": ( + "Gateway is not running and inline execution is unavailable. " + "Start the gateway with 'hermes gateway' so jobs fire automatically." + ), + } + ) + + loop = asyncio.get_running_loop() + updated = await loop.run_in_executor(None, run_job_now, job_id) + if not updated: + if not _cron_get(job_id): + return web.json_response({"error": "Job not found"}, status=404) + # Lock contention — fall back to defer. + job = _cron_trigger(job_id) + return web.json_response( + { + "job": job, + "message": "Another cron tick is running; job scheduled for that tick.", + } + ) + return web.json_response( + {"job": updated, "message": "Job executed inline (no gateway ticker running)."} + ) except Exception as e: return web.json_response({"error": str(e)}, status=500) diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index 78639d465a5..f7e349d7e1f 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -255,7 +255,18 @@ def _job_action(action: str, job_id: str, success_verb: str) -> int: if action in {"resume", "run"} and result.get("job", {}).get("next_run_at"): print(f" Next run: {result['job']['next_run_at']}") if action == "run": - print(" It will run on the next scheduler tick.") + msg = result.get("message") or " It will run on the next scheduler tick." + last_status = job.get("last_status") + if last_status == "ok": + print(f" {color('✓ Last run:', Colors.GREEN)} {job.get('last_run_at', '?')} {color('ok', Colors.GREEN)}") + elif last_status == "error": + last_err = job.get("last_error", "?") + print(f" {color('✗ Last run:', Colors.RED)} {job.get('last_run_at', '?')} " + f"{color('error: ' + str(last_err), Colors.RED)}") + print(f" {msg}") + warning = result.get("warning") + if warning: + print(color(f" ⚠ {warning}", Colors.YELLOW)) return 0 diff --git a/tests/cron/test_run_job_now.py b/tests/cron/test_run_job_now.py new file mode 100644 index 00000000000..4a9ee3e223a --- /dev/null +++ b/tests/cron/test_run_job_now.py @@ -0,0 +1,290 @@ +"""Tests for on-demand cron execution — cron.scheduler.run_job_now and the +tool/API ``action='run'`` branching between inline execution (no gateway) +and defer-to-ticker (gateway running). + +Regression target: issue #16612 — ``cronjob run`` previously only called +``trigger_job()``, which just sets ``next_run_at=now``. When no gateway +ticker was running, the job never actually executed and the user saw +``state: scheduled`` with ``last_run_at: null`` forever. +""" + +from __future__ import annotations + +import json +from unittest.mock import patch + +import pytest + + +# --------------------------------------------------------------------------- +# run_job_now — inline execution unit tests +# --------------------------------------------------------------------------- + + +class TestRunJobNow: + """cron.scheduler.run_job_now executes a single job inline.""" + + def _make_job(self, job_id="inline-job", **overrides): + job = { + "id": job_id, + "name": "inline test", + "enabled": True, + "schedule": {"kind": "interval", "value": "every 1h"}, + "state": "scheduled", + "prompt": "do a thing", + } + job.update(overrides) + return job + + def test_returns_none_for_missing_job(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # Reload scheduler so _LOCK_DIR picks up the temp HERMES_HOME. + import importlib + import cron.scheduler + importlib.reload(cron.scheduler) + from cron.scheduler import run_job_now + + with patch("cron.jobs.get_job", return_value=None): + assert run_job_now("does-not-exist") is None + + def test_executes_inline_and_returns_updated_job(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + import importlib + import cron.scheduler + importlib.reload(cron.scheduler) + from cron.scheduler import run_job_now + + job = self._make_job() + updated_after_run = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "ok"} + + # cron.scheduler.run_job_now imports get_job lazily from cron.jobs, + # so patch at the source (cron.jobs.get_job). Three calls: + # (1) existence check, (2) re-load after advance, (3) final return. + with patch("cron.jobs.get_job", side_effect=[job, job, updated_after_run]), \ + patch("cron.scheduler.advance_next_run", return_value=True) as adv, \ + patch("cron.scheduler.run_job", return_value=(True, "# output", "final reply", None)), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None), \ + patch("cron.scheduler.mark_job_run") as mark: + + result = run_job_now("inline-job") + + # advance_next_run is called BEFORE the agent run to preserve + # at-most-once semantics (matches tick() behaviour). + adv.assert_called_once_with("inline-job") + mark.assert_called_once() + # mark_job_run is called with (job_id, success=True, error=None, delivery_error=None) + args, kwargs = mark.call_args + assert args[0] == "inline-job" + assert args[1] is True + assert result == updated_after_run + + def test_inline_records_failure_when_agent_errors(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + import importlib + import cron.scheduler + importlib.reload(cron.scheduler) + from cron.scheduler import run_job_now + + job = self._make_job() + updated = {**job, "last_run_at": "2026-04-28T00:00:00", "last_status": "error", "last_error": "boom"} + + with patch("cron.jobs.get_job", side_effect=[job, job, updated]), \ + patch("cron.scheduler.advance_next_run", return_value=True), \ + patch("cron.scheduler.run_job", return_value=(False, "# output", "", "boom")), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None), \ + patch("cron.scheduler.mark_job_run") as mark: + + result = run_job_now("inline-job") + + args, kwargs = mark.call_args + assert args[1] is False # success=False + assert args[2] == "boom" # error propagated + assert result["last_status"] == "error" + + +# --------------------------------------------------------------------------- +# cronjob(action="run") tool — gateway-aware branching +# --------------------------------------------------------------------------- + + +class TestCronjobRunTool: + """tools.cronjob_tools._handle_run_action branches on gateway presence.""" + + def _fake_job(self, job_id="tool-job", **overrides): + job = { + "id": job_id, + "name": "tool test", + "enabled": True, + "schedule": {"kind": "interval", "value": "every 1h"}, + "state": "scheduled", + "prompt": "do a thing", + "deliver": "local", + } + job.update(overrides) + return job + + def test_gateway_running_defers_to_tick(self, tmp_path, monkeypatch): + """When gateway PIDs exist, action='run' only calls trigger_job.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools.cronjob_tools import cronjob + + job = self._fake_job() + deferred = {**job, "next_run_at": "2026-04-28T00:00:00"} + + with patch("tools.cronjob_tools.get_job", return_value=job), \ + patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig, \ + patch("tools.cronjob_tools._gateway_ticker_running", return_value=True): + out = cronjob(action="run", job_id="tool-job") + + result = json.loads(out) + assert result["success"] is True + trig.assert_called_once_with("tool-job") + assert "next tick" in result["message"].lower() or "within" in result["message"].lower() + # The deferred path must NOT claim the job already executed. + assert result["job"].get("last_run_at") in (None, "") + + def test_gateway_not_running_executes_inline(self, tmp_path, monkeypatch): + """When no gateway PIDs exist, action='run' invokes run_job_now + and the returned job carries last_run_at + last_status=ok.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools.cronjob_tools import cronjob + + job = self._fake_job() + executed = { + **job, + "last_run_at": "2026-04-28T00:00:00", + "last_status": "ok", + } + + with patch("tools.cronjob_tools.get_job", return_value=job), \ + patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \ + patch("cron.scheduler.run_job_now", return_value=executed) as run_now, \ + patch("tools.cronjob_tools.trigger_job") as trig: + out = cronjob(action="run", job_id="tool-job") + + result = json.loads(out) + assert result["success"] is True + run_now.assert_called_once_with("tool-job") + trig.assert_not_called() # defer path must not be taken + assert result["job"]["last_run_at"] == "2026-04-28T00:00:00" + assert result["job"]["last_status"] == "ok" + assert "inline" in result["message"].lower() + + def test_gateway_not_running_falls_back_to_defer_on_lock_contention(self, tmp_path, monkeypatch): + """If run_job_now returns None (tick lock held), fall back to defer + so the caller isn't left with a silent no-op.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools.cronjob_tools import cronjob + + job = self._fake_job() + deferred = {**job, "next_run_at": "2026-04-28T00:00:00"} + + with patch("tools.cronjob_tools.get_job", return_value=job), \ + patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \ + patch("cron.scheduler.run_job_now", return_value=None), \ + patch("tools.cronjob_tools.trigger_job", return_value=deferred) as trig: + out = cronjob(action="run", job_id="tool-job") + + result = json.loads(out) + assert result["success"] is True + trig.assert_called_once_with("tool-job") + assert "another cron tick" in result["message"].lower() or "tick" in result["message"].lower() + + def test_unknown_job_returns_error(self, tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools.cronjob_tools import cronjob + + # get_job at the TOP of cronjob() is what short-circuits missing jobs. + with patch("tools.cronjob_tools.get_job", return_value=None): + out = cronjob(action="run", job_id="nope") + + result = json.loads(out) + assert result["success"] is False + assert "not found" in result["error"].lower() + + def test_inline_execution_surfaces_failure(self, tmp_path, monkeypatch): + """Failed inline exec — the message must indicate the failure + instead of claiming a clean run.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + from tools.cronjob_tools import cronjob + + job = self._fake_job() + executed = { + **job, + "last_run_at": "2026-04-28T00:00:00", + "last_status": "error", + "last_error": "model timed out", + } + + with patch("tools.cronjob_tools.get_job", return_value=job), \ + patch("tools.cronjob_tools._gateway_ticker_running", return_value=False), \ + patch("cron.scheduler.run_job_now", return_value=executed): + out = cronjob(action="run", job_id="tool-job") + + result = json.loads(out) + assert result["success"] is True # the tool call itself succeeded + assert result["job"]["last_status"] == "error" + assert "model timed out" in result["message"] + + +# --------------------------------------------------------------------------- +# _execute_and_record — shared helper covering both tick and run_job_now +# --------------------------------------------------------------------------- + + +class TestExecuteAndRecord: + """_execute_and_record is the shared end-to-end helper used by both + the tick loop and run_job_now.""" + + def _make_job(self): + return { + "id": "shared-helper-job", + "name": "shared helper", + "deliver": "local", + } + + def test_success_path_delivers_and_marks_ok(self): + from cron.scheduler import _execute_and_record + + with patch("cron.scheduler.run_job", return_value=(True, "# out", "final", None)), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result", return_value=None) as deliver, \ + patch("cron.scheduler.mark_job_run") as mark: + ok = _execute_and_record(self._make_job()) + + assert ok is True + deliver.assert_called_once() + mark.assert_called_once() + args, kwargs = mark.call_args + assert args[1] is True + # mark_job_run(job_id, success, error, delivery_error=...) + assert kwargs.get("delivery_error") is None + + def test_empty_response_marked_as_failure(self): + """Issue #8585 behaviour must be preserved after the refactor.""" + from cron.scheduler import _execute_and_record + + with patch("cron.scheduler.run_job", return_value=(True, "# out", "", None)), \ + patch("cron.scheduler.save_job_output", return_value="/tmp/out.md"), \ + patch("cron.scheduler._deliver_result"), \ + patch("cron.scheduler.mark_job_run") as mark: + _execute_and_record(self._make_job()) + + args, _ = mark.call_args + assert args[1] is False + assert "empty response" in args[2].lower() + + def test_exception_in_run_job_marks_error(self): + from cron.scheduler import _execute_and_record + + with patch("cron.scheduler.run_job", side_effect=RuntimeError("boom")), \ + patch("cron.scheduler.mark_job_run") as mark: + ok = _execute_and_record(self._make_job()) + + assert ok is False + mark.assert_called_once() + args, _ = mark.call_args + assert args[1] is False + assert "boom" in args[2] diff --git a/tests/gateway/test_api_server_jobs.py b/tests/gateway/test_api_server_jobs.py index a1476578386..3927f7b30f6 100644 --- a/tests/gateway/test_api_server_jobs.py +++ b/tests/gateway/test_api_server_jobs.py @@ -447,23 +447,53 @@ class TestResumeJob: class TestRunJob: @pytest.mark.asyncio - async def test_run_job(self, adapter): - """POST /api/jobs/{id}/run returns triggered job.""" + async def test_run_job_defers_when_gateway_running(self, adapter): + """POST /api/jobs/{id}/run calls trigger_job when a gateway ticker is up.""" app = _create_app(adapter) triggered_job = {**SAMPLE_JOB, "last_run": "2025-01-01T00:00:00Z"} mock_trigger = MagicMock(return_value=triggered_job) async with TestClient(TestServer(app)) as cli: - with patch( - f"{_MOD}._CRON_AVAILABLE", True - ), patch( - f"{_MOD}._cron_trigger", mock_trigger - ): + with patch(f"{_MOD}._CRON_AVAILABLE", True), \ + patch(f"{_MOD}._cron_trigger", mock_trigger), \ + patch("hermes_cli.gateway.find_gateway_pids", return_value=[12345]): resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run") assert resp.status == 200 data = await resp.json() assert data["job"] == triggered_job + assert "tick" in data.get("message", "").lower() mock_trigger.assert_called_once_with(VALID_JOB_ID) + @pytest.mark.asyncio + async def test_run_job_executes_inline_when_no_gateway(self, adapter): + """POST /api/jobs/{id}/run calls run_job_now when no gateway ticker is up. + + Regression for issue #16612: the old handler only called + trigger_job(), which just set next_run_at=now without executing + anything. Users saw last_run_at=null forever. + """ + app = _create_app(adapter) + executed_job = { + **SAMPLE_JOB, + "last_run_at": "2026-04-28T00:00:00", + "last_status": "ok", + } + mock_run_now = MagicMock(return_value=executed_job) + mock_trigger = MagicMock() + + async with TestClient(TestServer(app)) as cli: + with patch(f"{_MOD}._CRON_AVAILABLE", True), \ + patch(f"{_MOD}._cron_trigger", mock_trigger), \ + patch("hermes_cli.gateway.find_gateway_pids", return_value=[]), \ + patch("cron.scheduler.run_job_now", mock_run_now): + resp = await cli.post(f"/api/jobs/{VALID_JOB_ID}/run") + assert resp.status == 200 + data = await resp.json() + assert data["job"]["last_run_at"] == "2026-04-28T00:00:00" + assert data["job"]["last_status"] == "ok" + assert "inline" in data.get("message", "").lower() + mock_run_now.assert_called_once_with(VALID_JOB_ID) + mock_trigger.assert_not_called() + # --------------------------------------------------------------------------- # 17. test_auth_required diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 53e778a7dbf..28ae15df608 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -243,6 +243,109 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]: return result +def _gateway_ticker_running() -> bool: + """Return True if a ``hermes gateway`` process with a live cron ticker + is running (either a different process or the current one). + + The ticker fires due jobs every 60 seconds; when it's up, ``action="run"`` + just needs to set ``next_run_at=now`` and let the ticker pick it up on the + next pass. When it isn't, ``action="run"`` would otherwise be a no-op — + so we fall back to executing the job inline in the caller's process. + """ + try: + from hermes_cli.gateway import find_gateway_pids + except Exception: + return False + try: + return bool(find_gateway_pids()) + except Exception: + # If we can't tell, err on the side of inline execution so + # ``action="run"`` never silently drops on the floor. + return False + + +def _handle_run_action(job_id: str) -> str: + """Handle ``cronjob(action="run")``. + + When a gateway is running (ticker alive), set ``next_run_at=now`` and + return — the ticker will pick the job up within its interval (≤60s). + + When no gateway is running, execute the job inline in this process so + the caller actually observes ``last_run_at`` / ``last_status`` updates. + Blocks the caller for the duration of the agent run. + """ + if _gateway_ticker_running(): + updated = trigger_job(job_id) + if not updated: + return tool_error(f"Job with ID '{job_id}' not found.", success=False) + return json.dumps( + { + "success": True, + "job": _format_job(updated), + "message": ( + "Job scheduled for immediate execution. The gateway cron " + "ticker will fire it on its next pass (within ~60s). " + "Re-inspect with cronjob(action='list') to see last_run_at " + "and last_status once it completes." + ), + }, + indent=2, + ) + + # No gateway → inline execution so the user actually gets a run. + try: + from cron.scheduler import run_job_now + except Exception as e: + # Extremely unlikely, but if scheduler can't be imported (circular + # import, missing deps) fall back to the defer path with a warning. + logger.warning("run_job_now unavailable, falling back to defer: %s", e) + updated = trigger_job(job_id) + if not updated: + return tool_error(f"Job with ID '{job_id}' not found.", success=False) + return json.dumps( + { + "success": True, + "job": _format_job(updated), + "warning": ( + "Gateway is not running and inline execution is unavailable. " + "Job was re-scheduled (next_run_at=now) but will not execute " + "until you start the gateway with 'hermes gateway'." + ), + }, + indent=2, + ) + + updated = run_job_now(job_id) + if not updated: + # Either the job doesn't exist or the tick lock was held by a + # concurrent ticker that started after we checked _gateway_ticker_running. + # Double-check existence vs. lock contention. + if not get_job(job_id): + return tool_error(f"Job with ID '{job_id}' not found.", success=False) + # Lock was busy — fall back to defer. + updated = trigger_job(job_id) + return json.dumps( + { + "success": True, + "job": _format_job(updated) if updated else None, + "message": "Another cron tick is running; job scheduled for that tick.", + }, + indent=2, + ) + + formatted = _format_job(updated) + last_status = updated.get("last_status") + if last_status == "ok": + msg = "Job executed inline (no gateway ticker running)." + elif last_status == "error": + msg = ( + f"Job executed inline but failed: {updated.get('last_error') or 'unknown error'}" + ) + else: + msg = "Job executed inline (no gateway ticker running)." + return json.dumps({"success": True, "job": formatted, "message": msg}, indent=2) + + def cronjob( action: str, job_id: Optional[str] = None, @@ -372,8 +475,7 @@ def cronjob( return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) if normalized in {"run", "run_now", "trigger"}: - updated = trigger_job(job_id) - return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + return _handle_run_action(job_id) if normalized == "update": updates: Dict[str, Any] = {} @@ -476,7 +578,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr "properties": { "action": { "type": "string", - "description": "One of: create, list, update, pause, resume, remove, run" + "description": "One of: create, list, update, pause, resume, remove, run. 'run' executes the job now when no gateway ticker is up, or schedules it for the next tick (≤60s) when one is." }, "job_id": { "type": "string", diff --git a/website/docs/guides/cron-troubleshooting.md b/website/docs/guides/cron-troubleshooting.md index d85a1530909..d114a0c20dd 100644 --- a/website/docs/guides/cron-troubleshooting.md +++ b/website/docs/guides/cron-troubleshooting.md @@ -200,7 +200,7 @@ Scripts that dump megabytes of output will slow down the agent and may hit token ```bash hermes cron list # Show all jobs, states, next_run times -hermes cron run # Schedule for next tick (for testing) +hermes cron run # Run now (inline if no gateway; otherwise next tick) hermes cron edit # Fix configuration issues hermes logs # View recent Hermes logs hermes skills list # Verify installed skills @@ -212,7 +212,7 @@ hermes skills list # Verify installed skills If you've worked through this guide and the issue persists: -1. Run the job with `hermes cron run ` (fires on next gateway tick) and watch for errors in the chat output +1. Run the job with `hermes cron run ` (runs inline when no gateway is up, otherwise fires on next gateway tick) and watch for errors in the chat output 2. Check `~/.hermes/logs/agent.log` for scheduler messages and `~/.hermes/logs/errors.log` for warnings 3. Open an issue at [github.com/NousResearch/hermes-agent](https://github.com/NousResearch/hermes-agent) with: - The job ID and schedule diff --git a/website/docs/reference/cli-commands.md b/website/docs/reference/cli-commands.md index ca1fb0817a9..be48bbc6504 100644 --- a/website/docs/reference/cli-commands.md +++ b/website/docs/reference/cli-commands.md @@ -295,7 +295,7 @@ hermes cron | `edit` | Update a job's schedule, prompt, name, delivery, repeat count, or attached skills. Supports `--clear-skills`, `--add-skill`, and `--remove-skill`. | | `pause` | Pause a job without deleting it. | | `resume` | Resume a paused job and compute its next future run. | -| `run` | Trigger a job on the next scheduler tick. | +| `run` | Run a job now. Executes inline when no gateway is running; otherwise defers to the next gateway tick (≤60s). | | `remove` | Delete a scheduled job. | | `status` | Check whether the cron scheduler is running. | | `tick` | Run due jobs once and exit. |