Compare commits

...

1 Commits

Author SHA1 Message Date
Frederico Ribeiro
bcc1edc3a6 fix(cron): recover recent one-shot jobs 2026-03-18 03:51:23 -07:00
2 changed files with 129 additions and 5 deletions

View File

@@ -34,6 +34,7 @@ HERMES_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
CRON_DIR = HERMES_DIR / "cron" CRON_DIR = HERMES_DIR / "cron"
JOBS_FILE = CRON_DIR / "jobs.json" JOBS_FILE = CRON_DIR / "jobs.json"
OUTPUT_DIR = CRON_DIR / "output" OUTPUT_DIR = CRON_DIR / "output"
ONESHOT_GRACE_SECONDS = 120
def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]: def _normalize_skill_list(skill: Optional[str] = None, skills: Optional[Any] = None) -> List[str]:
@@ -220,6 +221,33 @@ def _ensure_aware(dt: datetime) -> datetime:
return dt.astimezone(target_tz) return dt.astimezone(target_tz)
def _recoverable_oneshot_run_at(
schedule: Dict[str, Any],
now: datetime,
*,
last_run_at: Optional[str] = None,
) -> Optional[str]:
"""Return a one-shot run time if it is still eligible to fire.
One-shot jobs get a small grace window so jobs created a few seconds after
their requested minute still run on the next tick. Once a one-shot has
already run, it is never eligible again.
"""
if schedule.get("kind") != "once":
return None
if last_run_at:
return None
run_at = schedule.get("run_at")
if not run_at:
return None
run_at_dt = _ensure_aware(datetime.fromisoformat(run_at))
if run_at_dt >= now - timedelta(seconds=ONESHOT_GRACE_SECONDS):
return run_at
return None
def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]: def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None) -> Optional[str]:
""" """
Compute the next run time for a schedule. Compute the next run time for a schedule.
@@ -229,9 +257,7 @@ def compute_next_run(schedule: Dict[str, Any], last_run_at: Optional[str] = None
now = _hermes_now() now = _hermes_now()
if schedule["kind"] == "once": if schedule["kind"] == "once":
run_at = _ensure_aware(datetime.fromisoformat(schedule["run_at"])) return _recoverable_oneshot_run_at(schedule, now, last_run_at=last_run_at)
# If in the future, return it; if in the past, no more runs
return schedule["run_at"] if run_at > now else None
elif schedule["kind"] == "interval": elif schedule["kind"] == "interval":
minutes = schedule["minutes"] minutes = schedule["minutes"]
@@ -555,8 +581,27 @@ def get_due_jobs() -> List[Dict[str, Any]]:
next_run = job.get("next_run_at") next_run = job.get("next_run_at")
if not next_run: if not next_run:
recovered_next = _recoverable_oneshot_run_at(
job.get("schedule", {}),
now,
last_run_at=job.get("last_run_at"),
)
if not recovered_next:
continue continue
job["next_run_at"] = recovered_next
next_run = recovered_next
logger.info(
"Job '%s' had no next_run_at; recovering one-shot run at %s",
job.get("name", job["id"]),
recovered_next,
)
for rj in raw_jobs:
if rj["id"] == job["id"]:
rj["next_run_at"] = recovered_next
needs_save = True
break
next_run_dt = _ensure_aware(datetime.fromisoformat(next_run)) next_run_dt = _ensure_aware(datetime.fromisoformat(next_run))
if next_run_dt <= now: if next_run_dt <= now:
schedule = job.get("schedule", {}) schedule = job.get("schedule", {})

View File

@@ -2,7 +2,7 @@
import json import json
import pytest import pytest
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
@@ -122,11 +122,29 @@ class TestComputeNextRun:
schedule = {"kind": "once", "run_at": future} schedule = {"kind": "once", "run_at": future}
assert compute_next_run(schedule) == future assert compute_next_run(schedule) == future
def test_once_recent_past_within_grace_returns_time(self, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
run_at = "2026-03-18T04:22:00+00:00"
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
schedule = {"kind": "once", "run_at": run_at}
assert compute_next_run(schedule) == run_at
def test_once_past_returns_none(self): def test_once_past_returns_none(self):
past = (datetime.now() - timedelta(hours=1)).isoformat() past = (datetime.now() - timedelta(hours=1)).isoformat()
schedule = {"kind": "once", "run_at": past} schedule = {"kind": "once", "run_at": past}
assert compute_next_run(schedule) is None assert compute_next_run(schedule) is None
def test_once_with_last_run_returns_none_even_within_grace(self, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 3, tzinfo=timezone.utc)
run_at = "2026-03-18T04:22:00+00:00"
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
schedule = {"kind": "once", "run_at": run_at}
assert compute_next_run(schedule, last_run_at=now.isoformat()) is None
def test_interval_first_run(self): def test_interval_first_run(self):
schedule = {"kind": "interval", "minutes": 60} schedule = {"kind": "interval", "minutes": 60}
result = compute_next_run(schedule) result = compute_next_run(schedule)
@@ -347,6 +365,67 @@ class TestGetDueJobs:
due = get_due_jobs() due = get_due_jobs()
assert len(due) == 0 assert len(due) == 0
def test_broken_recent_one_shot_without_next_run_is_recovered(self, tmp_cron_dir, monkeypatch):
now = datetime(2026, 3, 18, 4, 22, 30, tzinfo=timezone.utc)
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
run_at = "2026-03-18T04:22:00+00:00"
save_jobs(
[{
"id": "oneshot-recover",
"name": "Recover me",
"prompt": "Word of the day",
"schedule": {"kind": "once", "run_at": run_at, "display": "once at 2026-03-18 04:22"},
"schedule_display": "once at 2026-03-18 04:22",
"repeat": {"times": 1, "completed": 0},
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": "2026-03-18T04:21:00+00:00",
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"deliver": "local",
"origin": None,
}]
)
due = get_due_jobs()
assert [job["id"] for job in due] == ["oneshot-recover"]
assert get_job("oneshot-recover")["next_run_at"] == run_at
def test_broken_stale_one_shot_without_next_run_is_not_recovered(self, tmp_cron_dir, monkeypatch):
now = datetime(2026, 3, 18, 4, 30, 0, tzinfo=timezone.utc)
monkeypatch.setattr("cron.jobs._hermes_now", lambda: now)
save_jobs(
[{
"id": "oneshot-stale",
"name": "Too old",
"prompt": "Word of the day",
"schedule": {"kind": "once", "run_at": "2026-03-18T04:22:00+00:00", "display": "once at 2026-03-18 04:22"},
"schedule_display": "once at 2026-03-18 04:22",
"repeat": {"times": 1, "completed": 0},
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"created_at": "2026-03-18T04:21:00+00:00",
"next_run_at": None,
"last_run_at": None,
"last_status": None,
"last_error": None,
"deliver": "local",
"origin": None,
}]
)
assert get_due_jobs() == []
assert get_job("oneshot-stale")["next_run_at"] is None
class TestSaveJobOutput: class TestSaveJobOutput:
def test_creates_output_file(self, tmp_cron_dir): def test_creates_output_file(self, tmp_cron_dir):